Print peer address

This commit is contained in:
2026-03-16 02:45:47 +01:00
parent 1c7637fbc3
commit d3160c7dd4
2 changed files with 22 additions and 4 deletions

View File

@@ -8,6 +8,7 @@ import (
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
@@ -64,15 +65,23 @@ func (srv *Server) ListTargets(_ context.Context, _ *pb.ListTargetsRequest) (*pb
return resp, nil return resp, nil
} }
func peerAddr(ctx context.Context) string {
if p, ok := peer.FromContext(ctx); ok {
return p.Addr.String()
}
return "unknown"
}
func (srv *Server) StreamSnapshots(_ *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error { func (srv *Server) StreamSnapshots(_ *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error {
ch := srv.cache.Subscribe() ch := srv.cache.Subscribe()
defer srv.cache.Unsubscribe(ch) defer srv.cache.Unsubscribe(ch)
log.Printf("server: new StreamSnapshots subscriber") addr := peerAddr(stream.Context())
log.Printf("server: new StreamSnapshots subscriber from %s", addr)
for { for {
select { select {
case <-stream.Context().Done(): case <-stream.Context().Done():
log.Printf("server: StreamSnapshots subscriber disconnected") log.Printf("server: StreamSnapshots subscriber disconnected: %s", addr)
return nil return nil
case snap, ok := <-ch: case snap, ok := <-ch:
if !ok { if !ok {

View File

@@ -8,6 +8,7 @@ import (
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
@@ -62,15 +63,23 @@ func (srv *Server) ListTargets(_ context.Context, _ *pb.ListTargetsRequest) (*pb
}, nil }, nil
} }
func peerAddr(ctx context.Context) string {
if p, ok := peer.FromContext(ctx); ok {
return p.Addr.String()
}
return "unknown"
}
func (srv *Server) StreamSnapshots(req *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error { func (srv *Server) StreamSnapshots(req *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error {
ch := srv.store.Subscribe() ch := srv.store.Subscribe()
defer srv.store.Unsubscribe(ch) defer srv.store.Unsubscribe(ch)
log.Printf("server: new StreamSnapshots subscriber from %v", stream.Context().Value("peer")) addr := peerAddr(stream.Context())
log.Printf("server: new StreamSnapshots subscriber from %s", addr)
for { for {
select { select {
case <-stream.Context().Done(): case <-stream.Context().Done():
log.Printf("server: StreamSnapshots subscriber disconnected") log.Printf("server: StreamSnapshots subscriber disconnected: %s", addr)
return nil return nil
case snap, ok := <-ch: case snap, ok := <-ch:
if !ok { if !ok {