package main import ( "context" "log" "time" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // Server implements pb.LogtailServiceServer backed by a Store. type Server struct { pb.UnimplementedLogtailServiceServer store *Store source string } func NewServer(store *Store, source string) *Server { return &Server{store: store, source: source} } func (srv *Server) TopN(_ context.Context, req *pb.TopNRequest) (*pb.TopNResponse, error) { if req == nil { return nil, status.Error(codes.InvalidArgument, "request is nil") } n := int(req.N) if n <= 0 { n = 10 } entries := srv.store.QueryTopN(req.Filter, req.GroupBy, n, req.Window) resp := &pb.TopNResponse{Source: srv.source} for _, e := range entries { resp.Entries = append(resp.Entries, &pb.TopNEntry{ Label: e.Label, Count: e.Count, }) } return resp, nil } func (srv *Server) Trend(_ context.Context, req *pb.TrendRequest) (*pb.TrendResponse, error) { if req == nil { return nil, status.Error(codes.InvalidArgument, "request is nil") } points := srv.store.QueryTrend(req.Filter, req.Window) resp := &pb.TrendResponse{Source: srv.source} for _, p := range points { resp.Points = append(resp.Points, &pb.TrendPoint{ TimestampUnix: p.Timestamp.Unix(), Count: p.Count, }) } return resp, nil } func (srv *Server) ListTargets(_ context.Context, _ *pb.ListTargetsRequest) (*pb.ListTargetsResponse, error) { return &pb.ListTargetsResponse{ Targets: []*pb.TargetInfo{{Name: srv.source, Addr: ""}}, }, nil } func (srv *Server) StreamSnapshots(req *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error { ch := srv.store.Subscribe() defer srv.store.Unsubscribe(ch) log.Printf("server: new StreamSnapshots subscriber from %v", stream.Context().Value("peer")) for { select { case <-stream.Context().Done(): log.Printf("server: StreamSnapshots subscriber disconnected") return nil case snap, ok := <-ch: if !ok { return nil } msg := &pb.Snapshot{ Source: srv.source, Timestamp: snap.Timestamp.Unix(), } for _, e := range snap.Entries { msg.Entries = append(msg.Entries, &pb.TopNEntry{ Label: e.Label, Count: e.Count, }) } if err := stream.Send(msg); err != nil { log.Printf("server: send error: %v", err) return err } case <-time.After(30 * time.Second): // unblock select when server is quiet; gRPC keepalives handle the rest } } }