Execute PLAN_AGGREGATOR.md

This commit is contained in:
2026-03-14 20:22:16 +01:00
parent 6ca296b2e8
commit 76612c1cb8
11 changed files with 1428 additions and 282 deletions

View File

@@ -0,0 +1,425 @@
package main
import (
"context"
"fmt"
"net"
"sync"
"testing"
"time"
st "git.ipng.ch/ipng/nginx-logtail/internal/store"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// --- Merger tests ---
func makeSnap(source string, entries map[string]int64) *pb.Snapshot {
snap := &pb.Snapshot{Source: source, Timestamp: time.Now().Unix()}
for label, count := range entries {
snap.Entries = append(snap.Entries, &pb.TopNEntry{Label: label, Count: count})
}
return snap
}
func TestMergerApply(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{"a": 10, "b": 20}))
m.Apply(makeSnap("c2", map[string]int64{"a": 5, "c": 15}))
top := m.TopK(10)
totals := map[string]int64{}
for _, e := range top {
totals[e.Label] = e.Count
}
if totals["a"] != 15 { // 10 + 5
t.Errorf("a = %d, want 15", totals["a"])
}
if totals["b"] != 20 {
t.Errorf("b = %d, want 20", totals["b"])
}
if totals["c"] != 15 {
t.Errorf("c = %d, want 15", totals["c"])
}
}
func TestMergerReplacement(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{"a": 100}))
// Second snapshot from same collector replaces the first, not adds.
m.Apply(makeSnap("c1", map[string]int64{"a": 50, "b": 30}))
top := m.TopK(10)
totals := map[string]int64{}
for _, e := range top {
totals[e.Label] = e.Count
}
if totals["a"] != 50 {
t.Errorf("a = %d, want 50 (not 150)", totals["a"])
}
if totals["b"] != 30 {
t.Errorf("b = %d, want 30", totals["b"])
}
}
func TestMergerZero(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{"a": 100}))
m.Apply(makeSnap("c2", map[string]int64{"a": 50}))
m.Zero("c1")
top := m.TopK(10)
if len(top) != 1 || top[0].Label != "a" || top[0].Count != 50 {
t.Errorf("after Zero(c1): %v", top)
}
}
func TestMergerZeroNonexistent(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{"a": 10}))
// Zeroing an unknown addr should not panic.
m.Zero("unknown")
top := m.TopK(10)
if len(top) != 1 || top[0].Count != 10 {
t.Errorf("unexpected: %v", top)
}
}
func TestMergerConcurrent(t *testing.T) {
m := NewMerger()
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
addr := fmt.Sprintf("c%d", i%3)
for j := 0; j < 100; j++ {
m.Apply(makeSnap(addr, map[string]int64{"x": int64(j)}))
}
}(i)
}
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 30; i++ {
m.Zero(fmt.Sprintf("c%d", i%3))
}
}()
wg.Wait()
// No race, no panic — the race detector will catch issues if run with -race.
}
// --- Cache tests ---
func TestCacheRotation(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{"a": 100, "b": 50}))
cache := NewCache(m, "test")
cache.rotate(time.Now())
cache.mu.RLock()
defer cache.mu.RUnlock()
if cache.fineFilled != 1 {
t.Fatalf("fineFilled = %d, want 1", cache.fineFilled)
}
snap := cache.fineRing[(cache.fineHead-1+st.FineRingSize)%st.FineRingSize]
if len(snap.Entries) != 2 {
t.Fatalf("got %d entries, want 2", len(snap.Entries))
}
if snap.Entries[0].Count != 100 {
t.Errorf("top count = %d, want 100", snap.Entries[0].Count)
}
}
func TestCacheCoarseRing(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{"a": 10}))
cache := NewCache(m, "test")
now := time.Now()
for i := 0; i < st.CoarseEvery; i++ {
cache.rotate(now.Add(time.Duration(i) * time.Minute))
}
cache.mu.RLock()
defer cache.mu.RUnlock()
if cache.coarseFilled != 1 {
t.Fatalf("coarseFilled = %d, want 1", cache.coarseFilled)
}
coarse := cache.coarseRing[(cache.coarseHead-1+st.CoarseRingSize)%st.CoarseRingSize]
if len(coarse.Entries) == 0 {
t.Fatal("coarse snapshot is empty")
}
// 5 fine ticks × 10 counts = 50
if coarse.Entries[0].Count != 50 {
t.Errorf("coarse count = %d, want 50", coarse.Entries[0].Count)
}
}
func TestCacheQueryTopN(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{
st.EncodeTuple(st.Tuple4{"busy.com", "1.0.0.0/24", "/", "200"}): 300,
st.EncodeTuple(st.Tuple4{"quiet.com", "2.0.0.0/24", "/", "200"}): 50,
}))
cache := NewCache(m, "test")
cache.rotate(time.Now())
entries := cache.QueryTopN(nil, pb.GroupBy_WEBSITE, 2, pb.Window_W1M)
if len(entries) != 2 {
t.Fatalf("got %d entries, want 2", len(entries))
}
if entries[0].Label != "busy.com" || entries[0].Count != 300 {
t.Errorf("top = %+v, want {busy.com 300}", entries[0])
}
}
func TestCacheQueryTopNWithFilter(t *testing.T) {
m := NewMerger()
status429 := st.EncodeTuple(st.Tuple4{"example.com", "1.0.0.0/24", "/api", "429"})
status200 := st.EncodeTuple(st.Tuple4{"example.com", "2.0.0.0/24", "/api", "200"})
m.Apply(makeSnap("c1", map[string]int64{status429: 200, status200: 500}))
cache := NewCache(m, "test")
cache.rotate(time.Now())
f429 := int32(429)
entries := cache.QueryTopN(&pb.Filter{HttpResponse: &f429}, pb.GroupBy_WEBSITE, 10, pb.Window_W1M)
if len(entries) != 1 || entries[0].Label != "example.com" || entries[0].Count != 200 {
t.Errorf("filtered result: %v", entries)
}
}
func TestCacheQueryTrend(t *testing.T) {
m := NewMerger()
cache := NewCache(m, "test")
now := time.Now()
for i, count := range []int64{10, 20, 30} {
m.Apply(makeSnap("c1", map[string]int64{
st.EncodeTuple(st.Tuple4{"x.com", "1.0.0.0/24", "/", "200"}): count,
}))
cache.rotate(now.Add(time.Duration(i) * time.Minute))
}
points := cache.QueryTrend(nil, pb.Window_W5M)
if len(points) != 3 {
t.Fatalf("got %d points, want 3", len(points))
}
if points[0].Count != 10 || points[1].Count != 20 || points[2].Count != 30 {
t.Errorf("counts: %v %v %v", points[0].Count, points[1].Count, points[2].Count)
}
}
func TestCacheSubscribe(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{"x": 5}))
cache := NewCache(m, "test")
ch := cache.Subscribe()
cache.rotate(time.Now())
select {
case snap := <-ch:
if len(snap.Entries) == 0 {
t.Error("received empty snapshot")
}
case <-time.After(time.Second):
t.Fatal("no snapshot received")
}
cache.Unsubscribe(ch)
}
// --- gRPC end-to-end test ---
// fakeCollector is an in-process gRPC collector that streams a fixed set of
// snapshots then blocks until the context is cancelled.
type fakeCollector struct {
pb.UnimplementedLogtailServiceServer
snaps []*pb.Snapshot
}
func (f *fakeCollector) StreamSnapshots(_ *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error {
for _, s := range f.snaps {
if err := stream.Send(s); err != nil {
return err
}
}
<-stream.Context().Done()
return nil
}
func startFakeCollector(t *testing.T, snaps []*pb.Snapshot) string {
t.Helper()
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
srv := grpc.NewServer()
pb.RegisterLogtailServiceServer(srv, &fakeCollector{snaps: snaps})
go srv.Serve(lis)
t.Cleanup(srv.Stop)
return lis.Addr().String()
}
func TestGRPCEndToEnd(t *testing.T) {
// Two fake collectors with overlapping labels.
snap1 := makeSnap("col1", map[string]int64{
st.EncodeTuple(st.Tuple4{"busy.com", "1.0.0.0/24", "/", "200"}): 500,
st.EncodeTuple(st.Tuple4{"quiet.com", "2.0.0.0/24", "/", "429"}): 100,
})
snap2 := makeSnap("col2", map[string]int64{
st.EncodeTuple(st.Tuple4{"busy.com", "3.0.0.0/24", "/", "200"}): 300,
st.EncodeTuple(st.Tuple4{"other.com", "4.0.0.0/24", "/", "200"}): 50,
})
addr1 := startFakeCollector(t, []*pb.Snapshot{snap1})
addr2 := startFakeCollector(t, []*pb.Snapshot{snap2})
// Start aggregator components.
merger := NewMerger()
cache := NewCache(merger, "agg-test")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go NewCollectorSub(addr1, merger).Run(ctx)
go NewCollectorSub(addr2, merger).Run(ctx)
// Wait for both snapshots to be applied.
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
top := merger.TopK(1)
if len(top) > 0 && top[0].Count >= 800 { // busy.com: 500+300
break
}
time.Sleep(10 * time.Millisecond)
}
// Rotate the cache so the data is queryable.
cache.rotate(time.Now())
// Start a real gRPC server in front of the cache.
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
grpcSrv := grpc.NewServer()
pb.RegisterLogtailServiceServer(grpcSrv, NewServer(cache, "agg-test"))
go grpcSrv.Serve(lis)
defer grpcSrv.Stop()
conn, err := grpc.NewClient(lis.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatal(err)
}
defer conn.Close()
client := pb.NewLogtailServiceClient(conn)
qctx, qcancel := context.WithTimeout(context.Background(), 5*time.Second)
defer qcancel()
// TopN — busy.com should have 800 (500+300) across both collectors.
resp, err := client.TopN(qctx, &pb.TopNRequest{
GroupBy: pb.GroupBy_WEBSITE,
N: 5,
Window: pb.Window_W1M,
})
if err != nil {
t.Fatalf("TopN: %v", err)
}
if len(resp.Entries) == 0 {
t.Fatal("TopN returned no entries")
}
if resp.Entries[0].Label != "busy.com" || resp.Entries[0].Count != 800 {
t.Errorf("top entry = %+v, want {busy.com 800}", resp.Entries[0])
}
t.Logf("TopN: %v", resp.Entries)
// Filtered TopN — only 429s: quiet.com=100.
f429 := int32(429)
resp, err = client.TopN(qctx, &pb.TopNRequest{
Filter: &pb.Filter{HttpResponse: &f429},
GroupBy: pb.GroupBy_WEBSITE,
N: 5,
Window: pb.Window_W1M,
})
if err != nil {
t.Fatalf("TopN filtered: %v", err)
}
if len(resp.Entries) != 1 || resp.Entries[0].Label != "quiet.com" {
t.Errorf("filtered: %v", resp.Entries)
}
// Trend.
tresp, err := client.Trend(qctx, &pb.TrendRequest{Window: pb.Window_W5M})
if err != nil {
t.Fatalf("Trend: %v", err)
}
if len(tresp.Points) != 1 || tresp.Points[0].Count != 950 { // 500+100+300+50
t.Errorf("trend: %v", tresp.Points)
}
t.Logf("Trend: %v", tresp.Points)
// StreamSnapshots — trigger a rotation and verify we receive a snapshot.
streamCtx, streamCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer streamCancel()
stream, err := client.StreamSnapshots(streamCtx, &pb.SnapshotRequest{})
if err != nil {
t.Fatalf("StreamSnapshots: %v", err)
}
// Wait for the server goroutine to call cache.Subscribe() before rotating.
time.Sleep(50 * time.Millisecond)
cache.rotate(time.Now()) // trigger a broadcast
snap, err := stream.Recv()
if err != nil {
t.Fatalf("stream.Recv: %v", err)
}
if snap.Source != "agg-test" {
t.Errorf("source = %q, want agg-test", snap.Source)
}
t.Logf("StreamSnapshots: %d entries from %s", len(snap.Entries), snap.Source)
}
func TestDegradedCollector(t *testing.T) {
// Start one real and one immediately-gone collector.
snap1 := makeSnap("col1", map[string]int64{
st.EncodeTuple(st.Tuple4{"good.com", "1.0.0.0/24", "/", "200"}): 100,
})
addr1 := startFakeCollector(t, []*pb.Snapshot{snap1})
// addr2 points at nothing — connections will fail immediately.
addr2 := "127.0.0.1:1" // port 1 is always refused
merger := NewMerger()
cache := NewCache(merger, "test")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go NewCollectorSub(addr1, merger).Run(ctx)
go NewCollectorSub(addr2, merger).Run(ctx)
// Wait for col1's data to appear.
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
top := merger.TopK(1)
if len(top) > 0 {
break
}
time.Sleep(10 * time.Millisecond)
}
cache.rotate(time.Now())
// Results should reflect col1 only.
entries := cache.QueryTopN(nil, pb.GroupBy_WEBSITE, 5, pb.Window_W1M)
if len(entries) == 0 {
t.Fatal("no entries despite col1 being healthy")
}
if entries[0].Label != "good.com" {
t.Errorf("top = %q, want good.com", entries[0].Label)
}
t.Logf("degraded test: got %d entries, top = %s %d", len(entries), entries[0].Label, entries[0].Count)
}

170
cmd/aggregator/cache.go Normal file
View File

@@ -0,0 +1,170 @@
package main
import (
"context"
"sync"
"time"
st "git.ipng.ch/ipng/nginx-logtail/internal/store"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
)
// Cache holds the tiered ring buffers for the aggregator and answers TopN,
// Trend, and StreamSnapshots queries from them. It is populated by a
// 1-minute ticker that snapshots the current merged view from the Merger.
//
// Tick-based (not snapshot-triggered) so the ring stays on the same 1-minute
// cadence as the collectors, regardless of how many collectors are connected.
type Cache struct {
source string
merger *Merger
mu sync.RWMutex
fineRing [st.FineRingSize]st.Snapshot
fineHead int
fineFilled int
coarseRing [st.CoarseRingSize]st.Snapshot
coarseHead int
coarseFilled int
fineTick int
subMu sync.Mutex
subs map[chan st.Snapshot]struct{}
}
func NewCache(merger *Merger, source string) *Cache {
return &Cache{
merger: merger,
source: source,
subs: make(map[chan st.Snapshot]struct{}),
}
}
// Run drives the 1-minute rotation ticker. Blocks until ctx is cancelled.
func (c *Cache) Run(ctx context.Context) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case t := <-ticker.C:
c.rotate(t)
}
}
}
func (c *Cache) rotate(now time.Time) {
fine := st.Snapshot{Timestamp: now, Entries: c.merger.TopK(st.FineTopK)}
c.mu.Lock()
c.fineRing[c.fineHead] = fine
c.fineHead = (c.fineHead + 1) % st.FineRingSize
if c.fineFilled < st.FineRingSize {
c.fineFilled++
}
c.fineTick++
if c.fineTick >= st.CoarseEvery {
c.fineTick = 0
coarse := c.mergeFineBuckets(now)
c.coarseRing[c.coarseHead] = coarse
c.coarseHead = (c.coarseHead + 1) % st.CoarseRingSize
if c.coarseFilled < st.CoarseRingSize {
c.coarseFilled++
}
}
c.mu.Unlock()
c.broadcast(fine)
}
func (c *Cache) mergeFineBuckets(now time.Time) st.Snapshot {
merged := make(map[string]int64)
count := min(st.CoarseEvery, c.fineFilled)
for i := 0; i < count; i++ {
idx := (c.fineHead - 1 - i + st.FineRingSize) % st.FineRingSize
for _, e := range c.fineRing[idx].Entries {
merged[e.Label] += e.Count
}
}
return st.Snapshot{Timestamp: now, Entries: st.TopKFromMap(merged, st.CoarseTopK)}
}
// QueryTopN answers a TopN request from the ring buffers.
func (c *Cache) QueryTopN(filter *pb.Filter, groupBy pb.GroupBy, n int, window pb.Window) []st.Entry {
c.mu.RLock()
defer c.mu.RUnlock()
buckets, count := st.BucketsForWindow(window, c.fineView(), c.coarseView(), c.fineFilled, c.coarseFilled)
grouped := make(map[string]int64)
for i := 0; i < count; i++ {
idx := (buckets.Head - 1 - i + buckets.Size) % buckets.Size
for _, e := range buckets.Ring[idx].Entries {
t := st.LabelTuple(e.Label)
if !st.MatchesFilter(t, filter) {
continue
}
grouped[st.DimensionLabel(t, groupBy)] += e.Count
}
}
return st.TopKFromMap(grouped, n)
}
// QueryTrend answers a Trend request from the ring buffers.
func (c *Cache) QueryTrend(filter *pb.Filter, window pb.Window) []st.TrendPoint {
c.mu.RLock()
defer c.mu.RUnlock()
buckets, count := st.BucketsForWindow(window, c.fineView(), c.coarseView(), c.fineFilled, c.coarseFilled)
points := make([]st.TrendPoint, count)
for i := 0; i < count; i++ {
idx := (buckets.Head - count + i + buckets.Size) % buckets.Size
snap := buckets.Ring[idx]
var total int64
for _, e := range snap.Entries {
if st.MatchesFilter(st.LabelTuple(e.Label), filter) {
total += e.Count
}
}
points[i] = st.TrendPoint{Timestamp: snap.Timestamp, Count: total}
}
return points
}
func (c *Cache) fineView() st.RingView {
ring := make([]st.Snapshot, st.FineRingSize)
copy(ring, c.fineRing[:])
return st.RingView{Ring: ring, Head: c.fineHead, Size: st.FineRingSize}
}
func (c *Cache) coarseView() st.RingView {
ring := make([]st.Snapshot, st.CoarseRingSize)
copy(ring, c.coarseRing[:])
return st.RingView{Ring: ring, Head: c.coarseHead, Size: st.CoarseRingSize}
}
func (c *Cache) Subscribe() chan st.Snapshot {
ch := make(chan st.Snapshot, 4)
c.subMu.Lock()
c.subs[ch] = struct{}{}
c.subMu.Unlock()
return ch
}
func (c *Cache) Unsubscribe(ch chan st.Snapshot) {
c.subMu.Lock()
delete(c.subs, ch)
c.subMu.Unlock()
close(ch)
}
func (c *Cache) broadcast(snap st.Snapshot) {
c.subMu.Lock()
defer c.subMu.Unlock()
for ch := range c.subs {
select {
case ch <- snap:
default:
}
}
}

69
cmd/aggregator/main.go Normal file
View File

@@ -0,0 +1,69 @@
package main
import (
"context"
"flag"
"log"
"net"
"os"
"os/signal"
"strings"
"syscall"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc"
)
func main() {
listen := flag.String("listen", ":9091", "gRPC listen address")
collectors := flag.String("collectors", "", "comma-separated collector host:port addresses")
source := flag.String("source", hostname(), "name for this aggregator in responses")
flag.Parse()
if *collectors == "" {
log.Fatal("aggregator: --collectors is required")
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
merger := NewMerger()
cache := NewCache(merger, *source)
go cache.Run(ctx)
for _, addr := range strings.Split(*collectors, ",") {
addr = strings.TrimSpace(addr)
if addr == "" {
continue
}
sub := NewCollectorSub(addr, merger)
go sub.Run(ctx)
log.Printf("aggregator: subscribing to collector %s", addr)
}
lis, err := net.Listen("tcp", *listen)
if err != nil {
log.Fatalf("aggregator: failed to listen on %s: %v", *listen, err)
}
grpcServer := grpc.NewServer()
pb.RegisterLogtailServiceServer(grpcServer, NewServer(cache, *source))
go func() {
log.Printf("aggregator: gRPC listening on %s (source=%s)", *listen, *source)
if err := grpcServer.Serve(lis); err != nil {
log.Printf("aggregator: gRPC server stopped: %v", err)
}
}()
<-ctx.Done()
log.Printf("aggregator: shutting down")
grpcServer.GracefulStop()
}
func hostname() string {
h, err := os.Hostname()
if err != nil {
return "unknown"
}
return h
}

70
cmd/aggregator/merger.go Normal file
View File

@@ -0,0 +1,70 @@
package main
import (
"sync"
st "git.ipng.ch/ipng/nginx-logtail/internal/store"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
)
// Merger maintains per-collector maps and a running merged map using a delta
// strategy: on each new snapshot from collector X, subtract X's old entries
// and add the new ones. This is O(snapshot_size) rather than
// O(N_collectors × snapshot_size).
type Merger struct {
mu sync.Mutex
perCollector map[string]map[string]int64 // source → label → count
merged map[string]int64 // label → total across all collectors
}
func NewMerger() *Merger {
return &Merger{
perCollector: make(map[string]map[string]int64),
merged: make(map[string]int64),
}
}
// Apply incorporates a snapshot from a collector, replacing that collector's
// previous contribution in the merged map.
func (m *Merger) Apply(snap *pb.Snapshot) {
addr := snap.Source
m.mu.Lock()
defer m.mu.Unlock()
// Subtract the old contribution.
for label, count := range m.perCollector[addr] {
m.merged[label] -= count
if m.merged[label] <= 0 {
delete(m.merged, label)
}
}
// Build the new per-collector map and add to merged.
newMap := make(map[string]int64, len(snap.Entries))
for _, e := range snap.Entries {
newMap[e.Label] += e.Count
m.merged[e.Label] += e.Count
}
m.perCollector[addr] = newMap
}
// Zero removes a degraded collector's entire contribution from the merged map.
func (m *Merger) Zero(addr string) {
m.mu.Lock()
defer m.mu.Unlock()
for label, count := range m.perCollector[addr] {
m.merged[label] -= count
if m.merged[label] <= 0 {
delete(m.merged, label)
}
}
delete(m.perCollector, addr)
}
// TopK returns the top-k entries from the current merged view.
func (m *Merger) TopK(k int) []st.Entry {
m.mu.Lock()
defer m.mu.Unlock()
return st.TopKFromMap(m.merged, k)
}

80
cmd/aggregator/server.go Normal file
View File

@@ -0,0 +1,80 @@
package main
import (
"context"
"log"
"time"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Server implements pb.LogtailServiceServer backed by the aggregator Cache.
type Server struct {
pb.UnimplementedLogtailServiceServer
cache *Cache
source string
}
func NewServer(cache *Cache, source string) *Server {
return &Server{cache: cache, source: source}
}
func (srv *Server) TopN(_ context.Context, req *pb.TopNRequest) (*pb.TopNResponse, error) {
if req == nil {
return nil, status.Error(codes.InvalidArgument, "request is nil")
}
n := int(req.N)
if n <= 0 {
n = 10
}
entries := srv.cache.QueryTopN(req.Filter, req.GroupBy, n, req.Window)
resp := &pb.TopNResponse{Source: srv.source}
for _, e := range entries {
resp.Entries = append(resp.Entries, &pb.TopNEntry{Label: e.Label, Count: e.Count})
}
return resp, nil
}
func (srv *Server) Trend(_ context.Context, req *pb.TrendRequest) (*pb.TrendResponse, error) {
if req == nil {
return nil, status.Error(codes.InvalidArgument, "request is nil")
}
points := srv.cache.QueryTrend(req.Filter, req.Window)
resp := &pb.TrendResponse{Source: srv.source}
for _, p := range points {
resp.Points = append(resp.Points, &pb.TrendPoint{
TimestampUnix: p.Timestamp.Unix(),
Count: p.Count,
})
}
return resp, nil
}
func (srv *Server) StreamSnapshots(_ *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error {
ch := srv.cache.Subscribe()
defer srv.cache.Unsubscribe(ch)
log.Printf("server: new StreamSnapshots subscriber")
for {
select {
case <-stream.Context().Done():
log.Printf("server: StreamSnapshots subscriber disconnected")
return nil
case snap, ok := <-ch:
if !ok {
return nil
}
msg := &pb.Snapshot{Source: srv.source, Timestamp: snap.Timestamp.Unix()}
for _, e := range snap.Entries {
msg.Entries = append(msg.Entries, &pb.TopNEntry{Label: e.Label, Count: e.Count})
}
if err := stream.Send(msg); err != nil {
return err
}
case <-time.After(30 * time.Second):
}
}
}

View File

@@ -0,0 +1,97 @@
package main
import (
"context"
"log"
"time"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// CollectorSub maintains a persistent StreamSnapshots connection to one
// collector. It reconnects with exponential backoff on any error and marks
// the collector degraded (zeroing its contribution) after 3 consecutive
// failures.
type CollectorSub struct {
addr string
merger *Merger
}
func NewCollectorSub(addr string, merger *Merger) *CollectorSub {
return &CollectorSub{addr: addr, merger: merger}
}
// Run blocks until ctx is cancelled.
func (cs *CollectorSub) Run(ctx context.Context) {
backoff := 100 * time.Millisecond
const maxBackoff = 30 * time.Second
fails := 0
degraded := false
for {
if ctx.Err() != nil {
return
}
gotOne, err := cs.stream(ctx)
if ctx.Err() != nil {
return
}
if gotOne && degraded {
// Recovered: contribution is already flowing in again via Apply.
degraded = false
fails = 0
backoff = 100 * time.Millisecond
log.Printf("subscriber: collector %s recovered", cs.addr)
}
if err != nil {
fails++
log.Printf("subscriber: collector %s error (fail %d): %v", cs.addr, fails, err)
if fails >= 3 && !degraded {
degraded = true
cs.merger.Zero(cs.addr)
log.Printf("subscriber: collector %s degraded — contribution zeroed", cs.addr)
}
}
select {
case <-ctx.Done():
return
case <-time.After(backoff):
}
backoff = min(backoff*2, maxBackoff)
}
}
// stream opens a single StreamSnapshots RPC and feeds snapshots into the
// merger until the stream errors or ctx is cancelled. Returns (gotAtLeastOne, err).
func (cs *CollectorSub) stream(ctx context.Context) (bool, error) {
conn, err := grpc.NewClient(cs.addr,
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return false, err
}
defer conn.Close()
client := pb.NewLogtailServiceClient(conn)
stream, err := client.StreamSnapshots(ctx, &pb.SnapshotRequest{})
if err != nil {
return false, err
}
log.Printf("subscriber: connected to collector %s", cs.addr)
gotOne := false
for {
snap, err := stream.Recv()
if err != nil {
return gotOne, err
}
gotOne = true
cs.merger.Apply(snap)
}
}