diff --git a/cmd/aggregator/server.go b/cmd/aggregator/server.go index 29c6659..92b7bbc 100644 --- a/cmd/aggregator/server.go +++ b/cmd/aggregator/server.go @@ -8,6 +8,7 @@ import ( pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" "google.golang.org/grpc/status" ) @@ -64,15 +65,23 @@ func (srv *Server) ListTargets(_ context.Context, _ *pb.ListTargetsRequest) (*pb return resp, nil } +func peerAddr(ctx context.Context) string { + if p, ok := peer.FromContext(ctx); ok { + return p.Addr.String() + } + return "unknown" +} + func (srv *Server) StreamSnapshots(_ *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error { ch := srv.cache.Subscribe() defer srv.cache.Unsubscribe(ch) - log.Printf("server: new StreamSnapshots subscriber") + addr := peerAddr(stream.Context()) + log.Printf("server: new StreamSnapshots subscriber from %s", addr) for { select { case <-stream.Context().Done(): - log.Printf("server: StreamSnapshots subscriber disconnected") + log.Printf("server: StreamSnapshots subscriber disconnected: %s", addr) return nil case snap, ok := <-ch: if !ok { diff --git a/cmd/collector/server.go b/cmd/collector/server.go index 45d6e6f..2ebcfc5 100644 --- a/cmd/collector/server.go +++ b/cmd/collector/server.go @@ -8,6 +8,7 @@ import ( pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" "google.golang.org/grpc/status" ) @@ -62,15 +63,23 @@ func (srv *Server) ListTargets(_ context.Context, _ *pb.ListTargetsRequest) (*pb }, nil } +func peerAddr(ctx context.Context) string { + if p, ok := peer.FromContext(ctx); ok { + return p.Addr.String() + } + return "unknown" +} + func (srv *Server) StreamSnapshots(req *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error { ch := srv.store.Subscribe() defer srv.store.Unsubscribe(ch) - log.Printf("server: new StreamSnapshots subscriber from %v", stream.Context().Value("peer")) + addr := peerAddr(stream.Context()) + log.Printf("server: new StreamSnapshots subscriber from %s", addr) for { select { case <-stream.Context().Done(): - log.Printf("server: StreamSnapshots subscriber disconnected") + log.Printf("server: StreamSnapshots subscriber disconnected: %s", addr) return nil case snap, ok := <-ch: if !ok {