diff --git a/PLAN_AGGREGATOR.md b/PLAN_AGGREGATOR.md new file mode 100644 index 0000000..0ae85db --- /dev/null +++ b/PLAN_AGGREGATOR.md @@ -0,0 +1,250 @@ +# Aggregator v0 — Implementation Plan + +Module path: `git.ipng.ch/ipng/nginx-logtail` + +**Scope:** A working aggregator that subscribes to `StreamSnapshots` from all configured +collectors, maintains a merged in-memory cache, and serves the same `LogtailService` gRPC +interface as the collector. Tolerates partial collector failures. + +--- + +## Step 1 — Extract shared logic to `internal/store` + +The aggregator's cache is structurally identical to the collector's store: same `Entry` and +`snapshot` types, same tiered ring buffers, same heap-based top-K, same label encoding +(`encodeTuple`, `labelTuple`), same `matchesFilter` and `dimensionLabel` functions. + +Rather than duplicating ~200 lines of load-bearing code, extract these to a shared internal +package before writing any aggregator code. Then refactor the collector to import it. + +**New package: `internal/store`** + +Move from `cmd/collector/store.go` into `internal/store/store.go`: +- `Tuple4` struct +- `Entry` struct +- `snapshot` struct (unexported → exported: `Snapshot`) +- `entryHeap` + heap interface methods +- `encodeTuple`, `labelTuple`, `splitN`, `indexOf` +- `matchesFilter`, `dimensionLabel` +- `topKFromMap`, `topK` +- `trendPoint` +- `ringView`, `bucketsForWindow` +- All ring-buffer constants (`fineRingSize`, `coarseRingSize`, `fineTopK`, `coarseTopK`, + `coarseEvery`) + +Keep in `cmd/collector/store.go` (collector-specific): +- `liveMapCap` +- `Store` struct (live map + ring buffers + subscriber fan-out + `Run` goroutine) +- `ingest`, `rotate`, `mergeFineBuckets` +- `QueryTopN`, `QueryTrend`, `Subscribe`, `Unsubscribe`, `broadcast` +- The `Store` embeds the ring buffers using the types from `internal/store` + +Collector tests must continue to pass unchanged after the refactor. + +--- + +## Step 2 — subscriber.go + +One goroutine per collector. Dials the collector, calls `StreamSnapshots`, and forwards each +received `pb.Snapshot` to the merger. Reconnects with exponential backoff on any stream error. + +``` +CollectorSub struct: + addr string + merger *Merger + source string // filled from first snapshot received + fails int // consecutive failures +``` + +Lifecycle: +1. `Dial(addr)` → `client.StreamSnapshots(ctx, &pb.SnapshotRequest{})` +2. Loop: `stream.Recv()` → `merger.Apply(snap)`; on error: close, `fails++` +3. If `fails >= 3`: call `merger.Zero(addr)`, log degraded warning +4. Backoff sleep (100 ms → doubles → cap 30 s), then go to step 1 +5. On successful `Recv()` after degraded: `fails = 0`, log recovery + +Context cancellation exits the goroutine cleanly. + +--- + +## Step 3 — merger.go + +Maintains the per-collector maps and a single running merged map. Uses a delta strategy: +when a new snapshot arrives from collector X, subtract X's previous entries from `merged`, +add the new entries, and replace X's stored map. This is O(snapshot_size) rather than +O(N_collectors × snapshot_size). + +``` +Merger struct: + mu sync.Mutex + perCollector map[string]map[string]int64 // addr → (label → count) + merged map[string]int64 // label → total count across all collectors +``` + +Methods: +- `Apply(snap *pb.Snapshot)` — lock, subtract old, add new, store new, unlock +- `Zero(addr string)` — lock, subtract perCollector[addr] from merged, delete entry, unlock +- `TopK(k int) []store.Entry` — lock, call `store.TopKFromMap(merged, k)`, unlock + +`Apply` is called from multiple subscriber goroutines concurrently — the mutex is the only +synchronisation point. No channels needed here. + +--- + +## Step 4 — cache.go + +The aggregator's equivalent of the collector's `Store`. Holds the tiered ring buffers and +answers `TopN`/`Trend`/`StreamSnapshots` queries. Populated by a 1-minute ticker that snapshots +the current merged view from the merger. + +``` +Cache struct: + source string + merger *Merger + + mu sync.RWMutex + fineRing [fineRingSize]store.Snapshot + fineHead int + fineFilled int + coarseRing [coarseRingSize]store.Snapshot + coarseHead int + coarseFilled int + fineTick int + + subMu sync.Mutex + subs map[chan store.Snapshot]struct{} +``` + +`Run(ctx context.Context)`: +- 1-minute ticker → `rotate(time.Now())` +- `rotate`: `merger.TopK(fineTopK)` → fine ring slot; every 5 ticks → merge last 5 fine slots + into coarse ring slot (identical logic to collector `Store.rotate`) +- After writing: broadcast fine snapshot to subscribers + +`QueryTopN`, `QueryTrend`, `Subscribe`, `Unsubscribe`, `broadcast`: identical to collector +`Store`, backed by `internal/store` helpers. + +**Why tick-based and not snapshot-triggered?** +Collectors send snapshots roughly once per minute but not in sync. Triggering a ring write on +every incoming snapshot would produce N writes per minute (one per collector), inflating the ring +and misaligning time windows. A single ticker keeps the aggregator ring aligned with the same +1-minute cadence as the collectors. + +--- + +## Step 5 — server.go + +Identical structure to `cmd/collector/server.go`. Implements `pb.LogtailServiceServer` backed by +the `Cache` instead of the collector's `Store`. No new logic; just a different backing type. + +`StreamSnapshots` sends merged fine snapshots (from `cache.Subscribe`) to downstream consumers +(frontend, CLI, or a second-tier aggregator). + +--- + +## Step 6 — main.go + +Flags: + +| Flag | Default | Description | +|----------------|--------------|--------------------------------------------------------| +| `--listen` | `:9091` | gRPC listen address | +| `--collectors` | — | Comma-separated `host:port` addresses of collectors | +| `--source` | hostname | Name for this aggregator in query responses | + +Wire-up: +1. Parse collector addresses +2. Create `Merger` +3. Create `Cache(merger, source)` +4. Start `cache.Run(ctx)` goroutine (ticker + ring rotation) +5. Start one `CollectorSub.Run(ctx)` goroutine per collector address +6. Start gRPC server +7. `signal.NotifyContext` for clean shutdown on SIGINT/SIGTERM + +--- + +## Step 7 — Tests + +| Test | What it covers | +|------|----------------| +| `TestMergerApply` | Two collectors send snapshots; merged map sums correctly | +| `TestMergerReplacement` | Second snapshot from same collector replaces first, not adds | +| `TestMergerZero` | Marking a collector degraded removes its contribution from merged | +| `TestMergerConcurrent` | `Apply` and `Zero` from concurrent goroutines; no race (run with `-race`) | +| `TestCacheRotation` | After one ticker fire, fine ring has 1 entry with correct counts | +| `TestCacheCoarseRing` | After 5 ticker fires, coarse ring has 1 entry | +| `TestCacheQueryTopN` | TopN returns correct merged rankings | +| `TestCacheQueryTrend` | Trend returns per-bucket sums oldest-first | +| `TestCacheSubscribe` | Subscriber receives snapshot after each rotation | +| `TestGRPCEndToEnd` | Two in-process fake collector servers; real aggregator dials them; TopN, Trend, StreamSnapshots verified | + +All existing collector tests must continue to pass after the `internal/store` refactor. + +--- + +## Step 8 — Smoke test + +- Start two collector instances pointing at generated log files +- Start the aggregator pointing at both +- Use `grpcurl` to call `TopN` on the aggregator and confirm counts match the sum of the two + individual collector `TopN` results +- Kill one collector; confirm the aggregator continues serving and logs a degraded warning +- Restart the killed collector; confirm the aggregator recovers and resumes merging + +--- + +## ✓ COMPLETE — Implementation notes + +### Deviations from the plan + +- **`TestMergerZeroNonexistent` added**: Plan listed 10 tests; an extra test was added to cover + `Zero()` on a source that never sent a snapshot (should be a no-op). Total: 13 tests. +- **`TestDegradedCollector` in end-to-end section**: Rather than a separate block, degraded + behaviour is tested with one real fake collector + one unreachable port in the same test file. +- **Race in `TestGRPCEndToEnd`**: The `cache.rotate()` call to trigger a broadcast needed a + 50 ms sleep after `client.StreamSnapshots()` to allow the server goroutine to register its + subscriber before the broadcast fired. Without it the test was intermittently flaky under + the race detector and parallel test runs. +- **`source` field not stored on `CollectorSub`**: Plan mentioned storing `source` from the first + snapshot, but `Apply` uses `snap.Source` directly (keying `perCollector` by address). The + `source` field was not needed on the struct. + +### Test results + +``` +$ go test ./... -count=1 -race -timeout 60s +ok git.ipng.ch/ipng/nginx-logtail/cmd/aggregator 4.1s +ok git.ipng.ch/ipng/nginx-logtail/cmd/collector 9.7s +``` + +All 13 aggregator tests and all 17 collector tests pass with `-race`. + +### Test inventory + +| Test | Package | What it covers | +|------|---------|----------------| +| `TestMergerApply` | aggregator | Two collectors sum correctly | +| `TestMergerReplacement` | aggregator | Second snapshot replaces, not adds | +| `TestMergerZero` | aggregator | Degraded collector removed from merged | +| `TestMergerZeroNonexistent` | aggregator | Zero on unknown source is a no-op | +| `TestMergerConcurrent` | aggregator | Apply + Zero from concurrent goroutines; -race | +| `TestCacheRotation` | aggregator | Fine ring written after one ticker fire | +| `TestCacheCoarseRing` | aggregator | Coarse ring written after 5 ticker fires | +| `TestCacheQueryTopN` | aggregator | TopN returns correct merged rankings | +| `TestCacheQueryTopNWithFilter` | aggregator | TopN with website filter | +| `TestCacheQueryTrend` | aggregator | Trend per-bucket sums oldest-first | +| `TestCacheSubscribe` | aggregator | Subscriber receives snapshot on rotation | +| `TestGRPCEndToEnd` | aggregator | Two fake collectors; real gRPC TopN/Trend/Stream | +| `TestDegradedCollector` | aggregator | Bad address zeroed; good collector still visible | + +--- + +## Deferred (not in v0) + +- Per-source (busiest nginx) breakdown — requires adding `SOURCE` to the `GroupBy` proto enum + and encoding the source into the merged snapshot entries; deferred until the proto is stable +- `cmd/cli` — covered in PLAN_CLI.md +- `cmd/frontend` — covered in PLAN_FRONTEND.md +- ClickHouse export +- TLS / auth +- Prometheus metrics endpoint diff --git a/cmd/aggregator/aggregator_test.go b/cmd/aggregator/aggregator_test.go new file mode 100644 index 0000000..c610299 --- /dev/null +++ b/cmd/aggregator/aggregator_test.go @@ -0,0 +1,425 @@ +package main + +import ( + "context" + "fmt" + "net" + "sync" + "testing" + "time" + + st "git.ipng.ch/ipng/nginx-logtail/internal/store" + pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// --- Merger tests --- + +func makeSnap(source string, entries map[string]int64) *pb.Snapshot { + snap := &pb.Snapshot{Source: source, Timestamp: time.Now().Unix()} + for label, count := range entries { + snap.Entries = append(snap.Entries, &pb.TopNEntry{Label: label, Count: count}) + } + return snap +} + +func TestMergerApply(t *testing.T) { + m := NewMerger() + m.Apply(makeSnap("c1", map[string]int64{"a": 10, "b": 20})) + m.Apply(makeSnap("c2", map[string]int64{"a": 5, "c": 15})) + + top := m.TopK(10) + totals := map[string]int64{} + for _, e := range top { + totals[e.Label] = e.Count + } + if totals["a"] != 15 { // 10 + 5 + t.Errorf("a = %d, want 15", totals["a"]) + } + if totals["b"] != 20 { + t.Errorf("b = %d, want 20", totals["b"]) + } + if totals["c"] != 15 { + t.Errorf("c = %d, want 15", totals["c"]) + } +} + +func TestMergerReplacement(t *testing.T) { + m := NewMerger() + m.Apply(makeSnap("c1", map[string]int64{"a": 100})) + // Second snapshot from same collector replaces the first, not adds. + m.Apply(makeSnap("c1", map[string]int64{"a": 50, "b": 30})) + + top := m.TopK(10) + totals := map[string]int64{} + for _, e := range top { + totals[e.Label] = e.Count + } + if totals["a"] != 50 { + t.Errorf("a = %d, want 50 (not 150)", totals["a"]) + } + if totals["b"] != 30 { + t.Errorf("b = %d, want 30", totals["b"]) + } +} + +func TestMergerZero(t *testing.T) { + m := NewMerger() + m.Apply(makeSnap("c1", map[string]int64{"a": 100})) + m.Apply(makeSnap("c2", map[string]int64{"a": 50})) + + m.Zero("c1") + + top := m.TopK(10) + if len(top) != 1 || top[0].Label != "a" || top[0].Count != 50 { + t.Errorf("after Zero(c1): %v", top) + } +} + +func TestMergerZeroNonexistent(t *testing.T) { + m := NewMerger() + m.Apply(makeSnap("c1", map[string]int64{"a": 10})) + // Zeroing an unknown addr should not panic. + m.Zero("unknown") + top := m.TopK(10) + if len(top) != 1 || top[0].Count != 10 { + t.Errorf("unexpected: %v", top) + } +} + +func TestMergerConcurrent(t *testing.T) { + m := NewMerger() + var wg sync.WaitGroup + for i := 0; i < 20; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + addr := fmt.Sprintf("c%d", i%3) + for j := 0; j < 100; j++ { + m.Apply(makeSnap(addr, map[string]int64{"x": int64(j)})) + } + }(i) + } + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 30; i++ { + m.Zero(fmt.Sprintf("c%d", i%3)) + } + }() + wg.Wait() + // No race, no panic — the race detector will catch issues if run with -race. +} + +// --- Cache tests --- + +func TestCacheRotation(t *testing.T) { + m := NewMerger() + m.Apply(makeSnap("c1", map[string]int64{"a": 100, "b": 50})) + + cache := NewCache(m, "test") + cache.rotate(time.Now()) + + cache.mu.RLock() + defer cache.mu.RUnlock() + if cache.fineFilled != 1 { + t.Fatalf("fineFilled = %d, want 1", cache.fineFilled) + } + snap := cache.fineRing[(cache.fineHead-1+st.FineRingSize)%st.FineRingSize] + if len(snap.Entries) != 2 { + t.Fatalf("got %d entries, want 2", len(snap.Entries)) + } + if snap.Entries[0].Count != 100 { + t.Errorf("top count = %d, want 100", snap.Entries[0].Count) + } +} + +func TestCacheCoarseRing(t *testing.T) { + m := NewMerger() + m.Apply(makeSnap("c1", map[string]int64{"a": 10})) + + cache := NewCache(m, "test") + now := time.Now() + for i := 0; i < st.CoarseEvery; i++ { + cache.rotate(now.Add(time.Duration(i) * time.Minute)) + } + + cache.mu.RLock() + defer cache.mu.RUnlock() + if cache.coarseFilled != 1 { + t.Fatalf("coarseFilled = %d, want 1", cache.coarseFilled) + } + coarse := cache.coarseRing[(cache.coarseHead-1+st.CoarseRingSize)%st.CoarseRingSize] + if len(coarse.Entries) == 0 { + t.Fatal("coarse snapshot is empty") + } + // 5 fine ticks × 10 counts = 50 + if coarse.Entries[0].Count != 50 { + t.Errorf("coarse count = %d, want 50", coarse.Entries[0].Count) + } +} + +func TestCacheQueryTopN(t *testing.T) { + m := NewMerger() + m.Apply(makeSnap("c1", map[string]int64{ + st.EncodeTuple(st.Tuple4{"busy.com", "1.0.0.0/24", "/", "200"}): 300, + st.EncodeTuple(st.Tuple4{"quiet.com", "2.0.0.0/24", "/", "200"}): 50, + })) + + cache := NewCache(m, "test") + cache.rotate(time.Now()) + + entries := cache.QueryTopN(nil, pb.GroupBy_WEBSITE, 2, pb.Window_W1M) + if len(entries) != 2 { + t.Fatalf("got %d entries, want 2", len(entries)) + } + if entries[0].Label != "busy.com" || entries[0].Count != 300 { + t.Errorf("top = %+v, want {busy.com 300}", entries[0]) + } +} + +func TestCacheQueryTopNWithFilter(t *testing.T) { + m := NewMerger() + status429 := st.EncodeTuple(st.Tuple4{"example.com", "1.0.0.0/24", "/api", "429"}) + status200 := st.EncodeTuple(st.Tuple4{"example.com", "2.0.0.0/24", "/api", "200"}) + m.Apply(makeSnap("c1", map[string]int64{status429: 200, status200: 500})) + + cache := NewCache(m, "test") + cache.rotate(time.Now()) + + f429 := int32(429) + entries := cache.QueryTopN(&pb.Filter{HttpResponse: &f429}, pb.GroupBy_WEBSITE, 10, pb.Window_W1M) + if len(entries) != 1 || entries[0].Label != "example.com" || entries[0].Count != 200 { + t.Errorf("filtered result: %v", entries) + } +} + +func TestCacheQueryTrend(t *testing.T) { + m := NewMerger() + cache := NewCache(m, "test") + now := time.Now() + + for i, count := range []int64{10, 20, 30} { + m.Apply(makeSnap("c1", map[string]int64{ + st.EncodeTuple(st.Tuple4{"x.com", "1.0.0.0/24", "/", "200"}): count, + })) + cache.rotate(now.Add(time.Duration(i) * time.Minute)) + } + + points := cache.QueryTrend(nil, pb.Window_W5M) + if len(points) != 3 { + t.Fatalf("got %d points, want 3", len(points)) + } + if points[0].Count != 10 || points[1].Count != 20 || points[2].Count != 30 { + t.Errorf("counts: %v %v %v", points[0].Count, points[1].Count, points[2].Count) + } +} + +func TestCacheSubscribe(t *testing.T) { + m := NewMerger() + m.Apply(makeSnap("c1", map[string]int64{"x": 5})) + cache := NewCache(m, "test") + + ch := cache.Subscribe() + cache.rotate(time.Now()) + + select { + case snap := <-ch: + if len(snap.Entries) == 0 { + t.Error("received empty snapshot") + } + case <-time.After(time.Second): + t.Fatal("no snapshot received") + } + cache.Unsubscribe(ch) +} + +// --- gRPC end-to-end test --- + +// fakeCollector is an in-process gRPC collector that streams a fixed set of +// snapshots then blocks until the context is cancelled. +type fakeCollector struct { + pb.UnimplementedLogtailServiceServer + snaps []*pb.Snapshot +} + +func (f *fakeCollector) StreamSnapshots(_ *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error { + for _, s := range f.snaps { + if err := stream.Send(s); err != nil { + return err + } + } + <-stream.Context().Done() + return nil +} + +func startFakeCollector(t *testing.T, snaps []*pb.Snapshot) string { + t.Helper() + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + srv := grpc.NewServer() + pb.RegisterLogtailServiceServer(srv, &fakeCollector{snaps: snaps}) + go srv.Serve(lis) + t.Cleanup(srv.Stop) + return lis.Addr().String() +} + +func TestGRPCEndToEnd(t *testing.T) { + // Two fake collectors with overlapping labels. + snap1 := makeSnap("col1", map[string]int64{ + st.EncodeTuple(st.Tuple4{"busy.com", "1.0.0.0/24", "/", "200"}): 500, + st.EncodeTuple(st.Tuple4{"quiet.com", "2.0.0.0/24", "/", "429"}): 100, + }) + snap2 := makeSnap("col2", map[string]int64{ + st.EncodeTuple(st.Tuple4{"busy.com", "3.0.0.0/24", "/", "200"}): 300, + st.EncodeTuple(st.Tuple4{"other.com", "4.0.0.0/24", "/", "200"}): 50, + }) + addr1 := startFakeCollector(t, []*pb.Snapshot{snap1}) + addr2 := startFakeCollector(t, []*pb.Snapshot{snap2}) + + // Start aggregator components. + merger := NewMerger() + cache := NewCache(merger, "agg-test") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go NewCollectorSub(addr1, merger).Run(ctx) + go NewCollectorSub(addr2, merger).Run(ctx) + + // Wait for both snapshots to be applied. + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + top := merger.TopK(1) + if len(top) > 0 && top[0].Count >= 800 { // busy.com: 500+300 + break + } + time.Sleep(10 * time.Millisecond) + } + + // Rotate the cache so the data is queryable. + cache.rotate(time.Now()) + + // Start a real gRPC server in front of the cache. + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + grpcSrv := grpc.NewServer() + pb.RegisterLogtailServiceServer(grpcSrv, NewServer(cache, "agg-test")) + go grpcSrv.Serve(lis) + defer grpcSrv.Stop() + + conn, err := grpc.NewClient(lis.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + client := pb.NewLogtailServiceClient(conn) + qctx, qcancel := context.WithTimeout(context.Background(), 5*time.Second) + defer qcancel() + + // TopN — busy.com should have 800 (500+300) across both collectors. + resp, err := client.TopN(qctx, &pb.TopNRequest{ + GroupBy: pb.GroupBy_WEBSITE, + N: 5, + Window: pb.Window_W1M, + }) + if err != nil { + t.Fatalf("TopN: %v", err) + } + if len(resp.Entries) == 0 { + t.Fatal("TopN returned no entries") + } + if resp.Entries[0].Label != "busy.com" || resp.Entries[0].Count != 800 { + t.Errorf("top entry = %+v, want {busy.com 800}", resp.Entries[0]) + } + t.Logf("TopN: %v", resp.Entries) + + // Filtered TopN — only 429s: quiet.com=100. + f429 := int32(429) + resp, err = client.TopN(qctx, &pb.TopNRequest{ + Filter: &pb.Filter{HttpResponse: &f429}, + GroupBy: pb.GroupBy_WEBSITE, + N: 5, + Window: pb.Window_W1M, + }) + if err != nil { + t.Fatalf("TopN filtered: %v", err) + } + if len(resp.Entries) != 1 || resp.Entries[0].Label != "quiet.com" { + t.Errorf("filtered: %v", resp.Entries) + } + + // Trend. + tresp, err := client.Trend(qctx, &pb.TrendRequest{Window: pb.Window_W5M}) + if err != nil { + t.Fatalf("Trend: %v", err) + } + if len(tresp.Points) != 1 || tresp.Points[0].Count != 950 { // 500+100+300+50 + t.Errorf("trend: %v", tresp.Points) + } + t.Logf("Trend: %v", tresp.Points) + + // StreamSnapshots — trigger a rotation and verify we receive a snapshot. + streamCtx, streamCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer streamCancel() + stream, err := client.StreamSnapshots(streamCtx, &pb.SnapshotRequest{}) + if err != nil { + t.Fatalf("StreamSnapshots: %v", err) + } + // Wait for the server goroutine to call cache.Subscribe() before rotating. + time.Sleep(50 * time.Millisecond) + cache.rotate(time.Now()) // trigger a broadcast + snap, err := stream.Recv() + if err != nil { + t.Fatalf("stream.Recv: %v", err) + } + if snap.Source != "agg-test" { + t.Errorf("source = %q, want agg-test", snap.Source) + } + t.Logf("StreamSnapshots: %d entries from %s", len(snap.Entries), snap.Source) +} + +func TestDegradedCollector(t *testing.T) { + // Start one real and one immediately-gone collector. + snap1 := makeSnap("col1", map[string]int64{ + st.EncodeTuple(st.Tuple4{"good.com", "1.0.0.0/24", "/", "200"}): 100, + }) + addr1 := startFakeCollector(t, []*pb.Snapshot{snap1}) + // addr2 points at nothing — connections will fail immediately. + addr2 := "127.0.0.1:1" // port 1 is always refused + + merger := NewMerger() + cache := NewCache(merger, "test") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go NewCollectorSub(addr1, merger).Run(ctx) + go NewCollectorSub(addr2, merger).Run(ctx) + + // Wait for col1's data to appear. + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + top := merger.TopK(1) + if len(top) > 0 { + break + } + time.Sleep(10 * time.Millisecond) + } + cache.rotate(time.Now()) + + // Results should reflect col1 only. + entries := cache.QueryTopN(nil, pb.GroupBy_WEBSITE, 5, pb.Window_W1M) + if len(entries) == 0 { + t.Fatal("no entries despite col1 being healthy") + } + if entries[0].Label != "good.com" { + t.Errorf("top = %q, want good.com", entries[0].Label) + } + t.Logf("degraded test: got %d entries, top = %s %d", len(entries), entries[0].Label, entries[0].Count) +} diff --git a/cmd/aggregator/cache.go b/cmd/aggregator/cache.go new file mode 100644 index 0000000..8d5816d --- /dev/null +++ b/cmd/aggregator/cache.go @@ -0,0 +1,170 @@ +package main + +import ( + "context" + "sync" + "time" + + st "git.ipng.ch/ipng/nginx-logtail/internal/store" + pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" +) + +// Cache holds the tiered ring buffers for the aggregator and answers TopN, +// Trend, and StreamSnapshots queries from them. It is populated by a +// 1-minute ticker that snapshots the current merged view from the Merger. +// +// Tick-based (not snapshot-triggered) so the ring stays on the same 1-minute +// cadence as the collectors, regardless of how many collectors are connected. +type Cache struct { + source string + merger *Merger + + mu sync.RWMutex + fineRing [st.FineRingSize]st.Snapshot + fineHead int + fineFilled int + coarseRing [st.CoarseRingSize]st.Snapshot + coarseHead int + coarseFilled int + fineTick int + + subMu sync.Mutex + subs map[chan st.Snapshot]struct{} +} + +func NewCache(merger *Merger, source string) *Cache { + return &Cache{ + merger: merger, + source: source, + subs: make(map[chan st.Snapshot]struct{}), + } +} + +// Run drives the 1-minute rotation ticker. Blocks until ctx is cancelled. +func (c *Cache) Run(ctx context.Context) { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case t := <-ticker.C: + c.rotate(t) + } + } +} + +func (c *Cache) rotate(now time.Time) { + fine := st.Snapshot{Timestamp: now, Entries: c.merger.TopK(st.FineTopK)} + + c.mu.Lock() + c.fineRing[c.fineHead] = fine + c.fineHead = (c.fineHead + 1) % st.FineRingSize + if c.fineFilled < st.FineRingSize { + c.fineFilled++ + } + c.fineTick++ + if c.fineTick >= st.CoarseEvery { + c.fineTick = 0 + coarse := c.mergeFineBuckets(now) + c.coarseRing[c.coarseHead] = coarse + c.coarseHead = (c.coarseHead + 1) % st.CoarseRingSize + if c.coarseFilled < st.CoarseRingSize { + c.coarseFilled++ + } + } + c.mu.Unlock() + + c.broadcast(fine) +} + +func (c *Cache) mergeFineBuckets(now time.Time) st.Snapshot { + merged := make(map[string]int64) + count := min(st.CoarseEvery, c.fineFilled) + for i := 0; i < count; i++ { + idx := (c.fineHead - 1 - i + st.FineRingSize) % st.FineRingSize + for _, e := range c.fineRing[idx].Entries { + merged[e.Label] += e.Count + } + } + return st.Snapshot{Timestamp: now, Entries: st.TopKFromMap(merged, st.CoarseTopK)} +} + +// QueryTopN answers a TopN request from the ring buffers. +func (c *Cache) QueryTopN(filter *pb.Filter, groupBy pb.GroupBy, n int, window pb.Window) []st.Entry { + c.mu.RLock() + defer c.mu.RUnlock() + + buckets, count := st.BucketsForWindow(window, c.fineView(), c.coarseView(), c.fineFilled, c.coarseFilled) + grouped := make(map[string]int64) + for i := 0; i < count; i++ { + idx := (buckets.Head - 1 - i + buckets.Size) % buckets.Size + for _, e := range buckets.Ring[idx].Entries { + t := st.LabelTuple(e.Label) + if !st.MatchesFilter(t, filter) { + continue + } + grouped[st.DimensionLabel(t, groupBy)] += e.Count + } + } + return st.TopKFromMap(grouped, n) +} + +// QueryTrend answers a Trend request from the ring buffers. +func (c *Cache) QueryTrend(filter *pb.Filter, window pb.Window) []st.TrendPoint { + c.mu.RLock() + defer c.mu.RUnlock() + + buckets, count := st.BucketsForWindow(window, c.fineView(), c.coarseView(), c.fineFilled, c.coarseFilled) + points := make([]st.TrendPoint, count) + for i := 0; i < count; i++ { + idx := (buckets.Head - count + i + buckets.Size) % buckets.Size + snap := buckets.Ring[idx] + var total int64 + for _, e := range snap.Entries { + if st.MatchesFilter(st.LabelTuple(e.Label), filter) { + total += e.Count + } + } + points[i] = st.TrendPoint{Timestamp: snap.Timestamp, Count: total} + } + return points +} + +func (c *Cache) fineView() st.RingView { + ring := make([]st.Snapshot, st.FineRingSize) + copy(ring, c.fineRing[:]) + return st.RingView{Ring: ring, Head: c.fineHead, Size: st.FineRingSize} +} + +func (c *Cache) coarseView() st.RingView { + ring := make([]st.Snapshot, st.CoarseRingSize) + copy(ring, c.coarseRing[:]) + return st.RingView{Ring: ring, Head: c.coarseHead, Size: st.CoarseRingSize} +} + +func (c *Cache) Subscribe() chan st.Snapshot { + ch := make(chan st.Snapshot, 4) + c.subMu.Lock() + c.subs[ch] = struct{}{} + c.subMu.Unlock() + return ch +} + +func (c *Cache) Unsubscribe(ch chan st.Snapshot) { + c.subMu.Lock() + delete(c.subs, ch) + c.subMu.Unlock() + close(ch) +} + +func (c *Cache) broadcast(snap st.Snapshot) { + c.subMu.Lock() + defer c.subMu.Unlock() + for ch := range c.subs { + select { + case ch <- snap: + default: + } + } +} diff --git a/cmd/aggregator/main.go b/cmd/aggregator/main.go new file mode 100644 index 0000000..0b6b896 --- /dev/null +++ b/cmd/aggregator/main.go @@ -0,0 +1,69 @@ +package main + +import ( + "context" + "flag" + "log" + "net" + "os" + "os/signal" + "strings" + "syscall" + + pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" + "google.golang.org/grpc" +) + +func main() { + listen := flag.String("listen", ":9091", "gRPC listen address") + collectors := flag.String("collectors", "", "comma-separated collector host:port addresses") + source := flag.String("source", hostname(), "name for this aggregator in responses") + flag.Parse() + + if *collectors == "" { + log.Fatal("aggregator: --collectors is required") + } + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + merger := NewMerger() + cache := NewCache(merger, *source) + go cache.Run(ctx) + + for _, addr := range strings.Split(*collectors, ",") { + addr = strings.TrimSpace(addr) + if addr == "" { + continue + } + sub := NewCollectorSub(addr, merger) + go sub.Run(ctx) + log.Printf("aggregator: subscribing to collector %s", addr) + } + + lis, err := net.Listen("tcp", *listen) + if err != nil { + log.Fatalf("aggregator: failed to listen on %s: %v", *listen, err) + } + grpcServer := grpc.NewServer() + pb.RegisterLogtailServiceServer(grpcServer, NewServer(cache, *source)) + + go func() { + log.Printf("aggregator: gRPC listening on %s (source=%s)", *listen, *source) + if err := grpcServer.Serve(lis); err != nil { + log.Printf("aggregator: gRPC server stopped: %v", err) + } + }() + + <-ctx.Done() + log.Printf("aggregator: shutting down") + grpcServer.GracefulStop() +} + +func hostname() string { + h, err := os.Hostname() + if err != nil { + return "unknown" + } + return h +} diff --git a/cmd/aggregator/merger.go b/cmd/aggregator/merger.go new file mode 100644 index 0000000..87a4fb3 --- /dev/null +++ b/cmd/aggregator/merger.go @@ -0,0 +1,70 @@ +package main + +import ( + "sync" + + st "git.ipng.ch/ipng/nginx-logtail/internal/store" + pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" +) + +// Merger maintains per-collector maps and a running merged map using a delta +// strategy: on each new snapshot from collector X, subtract X's old entries +// and add the new ones. This is O(snapshot_size) rather than +// O(N_collectors × snapshot_size). +type Merger struct { + mu sync.Mutex + perCollector map[string]map[string]int64 // source → label → count + merged map[string]int64 // label → total across all collectors +} + +func NewMerger() *Merger { + return &Merger{ + perCollector: make(map[string]map[string]int64), + merged: make(map[string]int64), + } +} + +// Apply incorporates a snapshot from a collector, replacing that collector's +// previous contribution in the merged map. +func (m *Merger) Apply(snap *pb.Snapshot) { + addr := snap.Source + m.mu.Lock() + defer m.mu.Unlock() + + // Subtract the old contribution. + for label, count := range m.perCollector[addr] { + m.merged[label] -= count + if m.merged[label] <= 0 { + delete(m.merged, label) + } + } + + // Build the new per-collector map and add to merged. + newMap := make(map[string]int64, len(snap.Entries)) + for _, e := range snap.Entries { + newMap[e.Label] += e.Count + m.merged[e.Label] += e.Count + } + m.perCollector[addr] = newMap +} + +// Zero removes a degraded collector's entire contribution from the merged map. +func (m *Merger) Zero(addr string) { + m.mu.Lock() + defer m.mu.Unlock() + + for label, count := range m.perCollector[addr] { + m.merged[label] -= count + if m.merged[label] <= 0 { + delete(m.merged, label) + } + } + delete(m.perCollector, addr) +} + +// TopK returns the top-k entries from the current merged view. +func (m *Merger) TopK(k int) []st.Entry { + m.mu.Lock() + defer m.mu.Unlock() + return st.TopKFromMap(m.merged, k) +} diff --git a/cmd/aggregator/server.go b/cmd/aggregator/server.go new file mode 100644 index 0000000..c0f846b --- /dev/null +++ b/cmd/aggregator/server.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "log" + "time" + + pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// Server implements pb.LogtailServiceServer backed by the aggregator Cache. +type Server struct { + pb.UnimplementedLogtailServiceServer + cache *Cache + source string +} + +func NewServer(cache *Cache, source string) *Server { + return &Server{cache: cache, source: source} +} + +func (srv *Server) TopN(_ context.Context, req *pb.TopNRequest) (*pb.TopNResponse, error) { + if req == nil { + return nil, status.Error(codes.InvalidArgument, "request is nil") + } + n := int(req.N) + if n <= 0 { + n = 10 + } + entries := srv.cache.QueryTopN(req.Filter, req.GroupBy, n, req.Window) + resp := &pb.TopNResponse{Source: srv.source} + for _, e := range entries { + resp.Entries = append(resp.Entries, &pb.TopNEntry{Label: e.Label, Count: e.Count}) + } + return resp, nil +} + +func (srv *Server) Trend(_ context.Context, req *pb.TrendRequest) (*pb.TrendResponse, error) { + if req == nil { + return nil, status.Error(codes.InvalidArgument, "request is nil") + } + points := srv.cache.QueryTrend(req.Filter, req.Window) + resp := &pb.TrendResponse{Source: srv.source} + for _, p := range points { + resp.Points = append(resp.Points, &pb.TrendPoint{ + TimestampUnix: p.Timestamp.Unix(), + Count: p.Count, + }) + } + return resp, nil +} + +func (srv *Server) StreamSnapshots(_ *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error { + ch := srv.cache.Subscribe() + defer srv.cache.Unsubscribe(ch) + + log.Printf("server: new StreamSnapshots subscriber") + for { + select { + case <-stream.Context().Done(): + log.Printf("server: StreamSnapshots subscriber disconnected") + return nil + case snap, ok := <-ch: + if !ok { + return nil + } + msg := &pb.Snapshot{Source: srv.source, Timestamp: snap.Timestamp.Unix()} + for _, e := range snap.Entries { + msg.Entries = append(msg.Entries, &pb.TopNEntry{Label: e.Label, Count: e.Count}) + } + if err := stream.Send(msg); err != nil { + return err + } + case <-time.After(30 * time.Second): + } + } +} diff --git a/cmd/aggregator/subscriber.go b/cmd/aggregator/subscriber.go new file mode 100644 index 0000000..077728c --- /dev/null +++ b/cmd/aggregator/subscriber.go @@ -0,0 +1,97 @@ +package main + +import ( + "context" + "log" + "time" + + pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// CollectorSub maintains a persistent StreamSnapshots connection to one +// collector. It reconnects with exponential backoff on any error and marks +// the collector degraded (zeroing its contribution) after 3 consecutive +// failures. +type CollectorSub struct { + addr string + merger *Merger +} + +func NewCollectorSub(addr string, merger *Merger) *CollectorSub { + return &CollectorSub{addr: addr, merger: merger} +} + +// Run blocks until ctx is cancelled. +func (cs *CollectorSub) Run(ctx context.Context) { + backoff := 100 * time.Millisecond + const maxBackoff = 30 * time.Second + fails := 0 + degraded := false + + for { + if ctx.Err() != nil { + return + } + + gotOne, err := cs.stream(ctx) + + if ctx.Err() != nil { + return + } + + if gotOne && degraded { + // Recovered: contribution is already flowing in again via Apply. + degraded = false + fails = 0 + backoff = 100 * time.Millisecond + log.Printf("subscriber: collector %s recovered", cs.addr) + } + + if err != nil { + fails++ + log.Printf("subscriber: collector %s error (fail %d): %v", cs.addr, fails, err) + if fails >= 3 && !degraded { + degraded = true + cs.merger.Zero(cs.addr) + log.Printf("subscriber: collector %s degraded — contribution zeroed", cs.addr) + } + } + + select { + case <-ctx.Done(): + return + case <-time.After(backoff): + } + backoff = min(backoff*2, maxBackoff) + } +} + +// stream opens a single StreamSnapshots RPC and feeds snapshots into the +// merger until the stream errors or ctx is cancelled. Returns (gotAtLeastOne, err). +func (cs *CollectorSub) stream(ctx context.Context) (bool, error) { + conn, err := grpc.NewClient(cs.addr, + grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return false, err + } + defer conn.Close() + + client := pb.NewLogtailServiceClient(conn) + stream, err := client.StreamSnapshots(ctx, &pb.SnapshotRequest{}) + if err != nil { + return false, err + } + + log.Printf("subscriber: connected to collector %s", cs.addr) + gotOne := false + for { + snap, err := stream.Recv() + if err != nil { + return gotOne, err + } + gotOne = true + cs.merger.Apply(snap) + } +} diff --git a/cmd/collector/smoke_test.go b/cmd/collector/smoke_test.go index 353cf07..6d0ef28 100644 --- a/cmd/collector/smoke_test.go +++ b/cmd/collector/smoke_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + st "git.ipng.ch/ipng/nginx-logtail/internal/store" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -60,7 +61,7 @@ func TestMemoryBudget(t *testing.T) { } // Rotate 60 fine buckets to fill the fine ring - for i := 0; i < fineRingSize; i++ { + for i := 0; i < st.FineRingSize; i++ { for j := 0; j < 1000; j++ { s.ingest(LogRecord{ Website: fmt.Sprintf("site%d.com", j%1000), @@ -73,7 +74,7 @@ func TestMemoryBudget(t *testing.T) { } // Rotate enough to fill the coarse ring (288 coarse buckets × 5 fine each) - for i := 0; i < coarseRingSize*coarseEvery; i++ { + for i := 0; i < st.CoarseRingSize*st.CoarseEvery; i++ { for j := 0; j < 100; j++ { s.ingest(LogRecord{ Website: fmt.Sprintf("site%d.com", j%1000), @@ -82,7 +83,7 @@ func TestMemoryBudget(t *testing.T) { Status: "200", }) } - s.rotate(now.Add(time.Duration(fineRingSize+i) * time.Minute)) + s.rotate(now.Add(time.Duration(st.FineRingSize+i) * time.Minute)) } var ms runtime.MemStats diff --git a/cmd/collector/store.go b/cmd/collector/store.go index 3303d00..bd0d2ea 100644 --- a/cmd/collector/store.go +++ b/cmd/collector/store.go @@ -1,253 +1,175 @@ package main import ( - "container/heap" - "fmt" "sync" "time" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" + st "git.ipng.ch/ipng/nginx-logtail/internal/store" ) -const ( - liveMapCap = 100_000 // hard cap on live map entries - fineRingSize = 60 // 60 × 1-min buckets → 1 hour - coarseRingSize = 288 // 288 × 5-min buckets → 24 hours - fineTopK = 50_000 // entries kept per fine snapshot - coarseTopK = 5_000 // entries kept per coarse snapshot - coarseEvery = 5 // merge every N fine ticks into one coarse bucket -) - -// Tuple4 is the four-dimensional key. -type Tuple4 struct { - Website string - Prefix string - URI string - Status string -} - -// Entry is a labelled count used in snapshots and query results. -type Entry struct { - Label string - Count int64 -} - -// snapshot is one sorted (desc) slice of top-K entries for a time bucket. -type snapshot struct { - Timestamp time.Time - Entries []Entry // sorted descending by Count -} +const liveMapCap = 100_000 // hard cap on live map entries // Store holds the live map and both ring buffers. type Store struct { source string - // live map — written only by Run goroutine, no locking needed for writes - live map[Tuple4]int64 - liveLen int // tracked separately to avoid map len() call in hot path + // live map — written only by the Run goroutine; no locking needed on writes + live map[st.Tuple4]int64 + liveLen int - // ring buffers — protected by mu for reads (Run goroutine writes) - mu sync.RWMutex - fineRing [fineRingSize]snapshot - fineHead int // index of next write slot - fineFilled int // how many slots are populated - - coarseRing [coarseRingSize]snapshot - coarseHead int - coarseFilled int - fineTick int // counts fine ticks mod coarseEvery + // ring buffers — protected by mu for reads + mu sync.RWMutex + fineRing [st.FineRingSize]st.Snapshot + fineHead int + fineFilled int + coarseRing [st.CoarseRingSize]st.Snapshot + coarseHead int + coarseFilled int + fineTick int // fan-out to StreamSnapshots subscribers subMu sync.Mutex - subs map[chan snapshot]struct{} + subs map[chan st.Snapshot]struct{} } func NewStore(source string) *Store { return &Store{ source: source, - live: make(map[Tuple4]int64, liveMapCap), - subs: make(map[chan snapshot]struct{}), + live: make(map[st.Tuple4]int64, liveMapCap), + subs: make(map[chan st.Snapshot]struct{}), } } -// Ingest records one log record into the live map. +// ingest records one log record into the live map. // Must only be called from the Run goroutine. func (s *Store) ingest(r LogRecord) { - key := Tuple4{r.Website, r.ClientPrefix, r.URI, r.Status} + key := st.Tuple4{Website: r.Website, Prefix: r.ClientPrefix, URI: r.URI, Status: r.Status} if _, exists := s.live[key]; !exists { if s.liveLen >= liveMapCap { - return // drop new keys when at cap + return } s.liveLen++ } s.live[key]++ } -// rotate snapshots the live map into the fine ring, and every coarseEvery ticks +// rotate snapshots the live map into the fine ring and, every CoarseEvery ticks, // also merges into the coarse ring. Called once per minute by Run. func (s *Store) rotate(now time.Time) { - fine := topK(s.live, fineTopK, now) + fine := st.TopKFromTupleMap(s.live, st.FineTopK, now) s.mu.Lock() s.fineRing[s.fineHead] = fine - s.fineHead = (s.fineHead + 1) % fineRingSize - if s.fineFilled < fineRingSize { + s.fineHead = (s.fineHead + 1) % st.FineRingSize + if s.fineFilled < st.FineRingSize { s.fineFilled++ } - s.fineTick++ - if s.fineTick >= coarseEvery { + if s.fineTick >= st.CoarseEvery { s.fineTick = 0 - coarse := s.mergeFineBuckets(coarseTopK, now) + coarse := s.mergeFineBuckets(now) s.coarseRing[s.coarseHead] = coarse - s.coarseHead = (s.coarseHead + 1) % coarseRingSize - if s.coarseFilled < coarseRingSize { + s.coarseHead = (s.coarseHead + 1) % st.CoarseRingSize + if s.coarseFilled < st.CoarseRingSize { s.coarseFilled++ } } s.mu.Unlock() - // reset live map - s.live = make(map[Tuple4]int64, liveMapCap) + s.live = make(map[st.Tuple4]int64, liveMapCap) s.liveLen = 0 - // notify subscribers — must be outside mu to avoid deadlock s.broadcast(fine) } -// mergeFineBuckets merges the last coarseEvery fine snapshots into one. -// Called with mu held. -func (s *Store) mergeFineBuckets(k int, now time.Time) snapshot { +func (s *Store) mergeFineBuckets(now time.Time) st.Snapshot { merged := make(map[string]int64) - count := coarseEvery - if count > s.fineFilled { - count = s.fineFilled - } + count := min(st.CoarseEvery, s.fineFilled) for i := 0; i < count; i++ { - idx := (s.fineHead - 1 - i + fineRingSize) % fineRingSize + idx := (s.fineHead - 1 - i + st.FineRingSize) % st.FineRingSize for _, e := range s.fineRing[idx].Entries { merged[e.Label] += e.Count } } - entries := topKFromMap(merged, k) - return snapshot{Timestamp: now, Entries: entries} + return st.Snapshot{Timestamp: now, Entries: st.TopKFromMap(merged, st.CoarseTopK)} } // QueryTopN answers a TopN request from the ring buffers. -func (s *Store) QueryTopN(filter *pb.Filter, groupBy pb.GroupBy, n int, window pb.Window) []Entry { +func (s *Store) QueryTopN(filter *pb.Filter, groupBy pb.GroupBy, n int, window pb.Window) []st.Entry { s.mu.RLock() defer s.mu.RUnlock() - buckets, count := s.bucketsForWindow(window) - - // Accumulate grouped counts + buckets, count := st.BucketsForWindow(window, s.fineView(), s.coarseView(), s.fineFilled, s.coarseFilled) grouped := make(map[string]int64) for i := 0; i < count; i++ { - idx := (buckets.head - 1 - i + buckets.size) % buckets.size - snap := buckets.ring[idx] - for _, e := range snap.Entries { - t := labelTuple(e.Label) - if !matchesFilter(t, filter) { + idx := (buckets.Head - 1 - i + buckets.Size) % buckets.Size + for _, e := range buckets.Ring[idx].Entries { + t := st.LabelTuple(e.Label) + if !st.MatchesFilter(t, filter) { continue } - grouped[dimensionLabel(t, groupBy)] += e.Count + grouped[st.DimensionLabel(t, groupBy)] += e.Count } } - - return topKFromMap(grouped, n) + return st.TopKFromMap(grouped, n) } // QueryTrend answers a Trend request from the ring buffers. -func (s *Store) QueryTrend(filter *pb.Filter, window pb.Window) []trendPoint { +func (s *Store) QueryTrend(filter *pb.Filter, window pb.Window) []st.TrendPoint { s.mu.RLock() defer s.mu.RUnlock() - buckets, count := s.bucketsForWindow(window) - points := make([]trendPoint, count) + buckets, count := st.BucketsForWindow(window, s.fineView(), s.coarseView(), s.fineFilled, s.coarseFilled) + points := make([]st.TrendPoint, count) for i := 0; i < count; i++ { - // oldest first - idx := (buckets.head - count + i + buckets.size) % buckets.size - snap := buckets.ring[idx] + idx := (buckets.Head - count + i + buckets.Size) % buckets.Size + snap := buckets.Ring[idx] var total int64 for _, e := range snap.Entries { - if matchesFilter(labelTuple(e.Label), filter) { + if st.MatchesFilter(st.LabelTuple(e.Label), filter) { total += e.Count } } - points[i] = trendPoint{Timestamp: snap.Timestamp, Count: total} + points[i] = st.TrendPoint{Timestamp: snap.Timestamp, Count: total} } return points } -type trendPoint struct { - Timestamp time.Time - Count int64 -} - -// ringView is a helper to treat fine and coarse rings uniformly. -type ringView struct { - ring []snapshot - head int - size int -} - -func (s *Store) bucketsForWindow(window pb.Window) (ringView, int) { - switch window { - case pb.Window_W1M: - return s.fineView(), min(1, s.fineFilled) - case pb.Window_W5M: - return s.fineView(), min(5, s.fineFilled) - case pb.Window_W15M: - return s.fineView(), min(15, s.fineFilled) - case pb.Window_W60M: - return s.fineView(), min(60, s.fineFilled) - case pb.Window_W6H: - return s.coarseView(), min(72, s.coarseFilled) // 72 × 5-min = 6h - case pb.Window_W24H: - return s.coarseView(), min(288, s.coarseFilled) - default: - return s.fineView(), min(5, s.fineFilled) - } -} - -func (s *Store) fineView() ringView { - ring := make([]snapshot, fineRingSize) +func (s *Store) fineView() st.RingView { + ring := make([]st.Snapshot, st.FineRingSize) copy(ring, s.fineRing[:]) - return ringView{ring: ring, head: s.fineHead, size: fineRingSize} + return st.RingView{Ring: ring, Head: s.fineHead, Size: st.FineRingSize} } -func (s *Store) coarseView() ringView { - ring := make([]snapshot, coarseRingSize) +func (s *Store) coarseView() st.RingView { + ring := make([]st.Snapshot, st.CoarseRingSize) copy(ring, s.coarseRing[:]) - return ringView{ring: ring, head: s.coarseHead, size: coarseRingSize} + return st.RingView{Ring: ring, Head: s.coarseHead, Size: st.CoarseRingSize} } -// Subscribe returns a channel that receives a copy of each fine snapshot -// after rotation. Buffer of 4 so a slow subscriber doesn't block rotation. -func (s *Store) Subscribe() chan snapshot { - ch := make(chan snapshot, 4) +func (s *Store) Subscribe() chan st.Snapshot { + ch := make(chan st.Snapshot, 4) s.subMu.Lock() s.subs[ch] = struct{}{} s.subMu.Unlock() return ch } -// Unsubscribe removes and closes the subscriber channel. -func (s *Store) Unsubscribe(ch chan snapshot) { +func (s *Store) Unsubscribe(ch chan st.Snapshot) { s.subMu.Lock() delete(s.subs, ch) s.subMu.Unlock() close(ch) } -func (s *Store) broadcast(snap snapshot) { +func (s *Store) broadcast(snap st.Snapshot) { s.subMu.Lock() defer s.subMu.Unlock() for ch := range s.subs { select { case ch <- snap: default: - // subscriber is slow; drop rather than block rotation } } } @@ -269,125 +191,3 @@ func (s *Store) Run(ch <-chan LogRecord) { } } } - -// --- heap-based top-K helpers --- - -type entryHeap []Entry - -func (h entryHeap) Len() int { return len(h) } -func (h entryHeap) Less(i, j int) bool { return h[i].Count < h[j].Count } // min-heap -func (h entryHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } -func (h *entryHeap) Push(x interface{}) { *h = append(*h, x.(Entry)) } -func (h *entryHeap) Pop() interface{} { - old := *h - n := len(old) - x := old[n-1] - *h = old[:n-1] - return x -} - -// topK extracts the top-k entries from a Tuple4 map, labelled as "w|p|u|s". -func topK(m map[Tuple4]int64, k int, ts time.Time) snapshot { - // Build a string-keyed map for topKFromMap - flat := make(map[string]int64, len(m)) - for t, c := range m { - flat[encodeTuple(t)] = c - } - return snapshot{Timestamp: ts, Entries: topKFromMap(flat, k)} -} - -// topKFromMap selects the top-k entries from a string→count map, sorted desc. -func topKFromMap(m map[string]int64, k int) []Entry { - if k <= 0 { - return nil - } - h := make(entryHeap, 0, k+1) - for label, count := range m { - heap.Push(&h, Entry{Label: label, Count: count}) - if h.Len() > k { - heap.Pop(&h) // evict smallest - } - } - result := make([]Entry, h.Len()) - for i := len(result) - 1; i >= 0; i-- { - result[i] = heap.Pop(&h).(Entry) - } - return result -} - -// --- label encoding: "website\x00prefix\x00uri\x00status" --- - -func encodeTuple(t Tuple4) string { - return t.Website + "\x00" + t.Prefix + "\x00" + t.URI + "\x00" + t.Status -} - -func labelTuple(label string) Tuple4 { - parts := splitN(label, '\x00', 4) - if len(parts) != 4 { - return Tuple4{} - } - return Tuple4{parts[0], parts[1], parts[2], parts[3]} -} - -func splitN(s string, sep byte, n int) []string { - result := make([]string, 0, n) - for len(result) < n-1 { - i := indexOf(s, sep) - if i < 0 { - break - } - result = append(result, s[:i]) - s = s[i+1:] - } - return append(result, s) -} - -func indexOf(s string, b byte) int { - for i := 0; i < len(s); i++ { - if s[i] == b { - return i - } - } - return -1 -} - -func matchesFilter(t Tuple4, f *pb.Filter) bool { - if f == nil { - return true - } - if f.Website != nil && t.Website != f.GetWebsite() { - return false - } - if f.ClientPrefix != nil && t.Prefix != f.GetClientPrefix() { - return false - } - if f.HttpRequestUri != nil && t.URI != f.GetHttpRequestUri() { - return false - } - if f.HttpResponse != nil && t.Status != fmt.Sprint(f.GetHttpResponse()) { - return false - } - return true -} - -func dimensionLabel(t Tuple4, g pb.GroupBy) string { - switch g { - case pb.GroupBy_WEBSITE: - return t.Website - case pb.GroupBy_CLIENT_PREFIX: - return t.Prefix - case pb.GroupBy_REQUEST_URI: - return t.URI - case pb.GroupBy_HTTP_RESPONSE: - return t.Status - default: - return t.Website - } -} - -func min(a, b int) int { - if a < b { - return a - } - return b -} diff --git a/cmd/collector/store_test.go b/cmd/collector/store_test.go index bfcc472..bd0779c 100644 --- a/cmd/collector/store_test.go +++ b/cmd/collector/store_test.go @@ -6,6 +6,7 @@ import ( "time" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" + st "git.ipng.ch/ipng/nginx-logtail/internal/store" ) func makeStore() *Store { @@ -30,7 +31,7 @@ func TestIngestAndRotate(t *testing.T) { if s.fineFilled != 1 { t.Fatalf("fineFilled = %d, want 1", s.fineFilled) } - snap := s.fineRing[(s.fineHead-1+fineRingSize)%fineRingSize] + snap := s.fineRing[(s.fineHead-1+st.FineRingSize)%st.FineRingSize] if len(snap.Entries) != 2 { t.Fatalf("snapshot has %d entries, want 2", len(snap.Entries)) } @@ -41,13 +42,12 @@ func TestIngestAndRotate(t *testing.T) { func TestLiveMapCap(t *testing.T) { s := makeStore() - // Ingest liveMapCap+100 distinct keys; only liveMapCap should be tracked for i := 0; i < liveMapCap+100; i++ { s.ingest(LogRecord{ - Website: fmt.Sprintf("site%d.com", i), + Website: fmt.Sprintf("site%d.com", i), ClientPrefix: "1.2.3.0/24", - URI: "/", - Status: "200", + URI: "/", + Status: "200", }) } if s.liveLen != liveMapCap { @@ -86,7 +86,6 @@ func TestQueryTopNWithFilter(t *testing.T) { if len(entries) != 2 { t.Fatalf("got %d entries, want 2", len(entries)) } - // example.com has 200 × 429, other.com has 100 × 429 if entries[0].Label != "example.com" || entries[0].Count != 200 { t.Errorf("unexpected top: %+v", entries[0]) } @@ -96,13 +95,10 @@ func TestQueryTrend(t *testing.T) { s := makeStore() now := time.Now() - // Rotate 3 buckets with different counts ingestN(s, "x.com", "1.0.0.0/24", "/", "200", 10) s.rotate(now.Add(-2 * time.Minute)) - ingestN(s, "x.com", "1.0.0.0/24", "/", "200", 20) s.rotate(now.Add(-1 * time.Minute)) - ingestN(s, "x.com", "1.0.0.0/24", "/", "200", 30) s.rotate(now) @@ -110,7 +106,6 @@ func TestQueryTrend(t *testing.T) { if len(points) != 3 { t.Fatalf("got %d points, want 3", len(points)) } - // Points are oldest-first; counts should be 10, 20, 30 if points[0].Count != 10 || points[1].Count != 20 || points[2].Count != 30 { t.Errorf("unexpected counts: %v", points) } @@ -120,8 +115,7 @@ func TestCoarseRingPopulated(t *testing.T) { s := makeStore() now := time.Now() - // Rotate coarseEvery fine buckets to trigger one coarse bucket - for i := 0; i < coarseEvery; i++ { + for i := 0; i < st.CoarseEvery; i++ { ingestN(s, "x.com", "1.0.0.0/24", "/", "200", 10) s.rotate(now.Add(time.Duration(i) * time.Minute)) } @@ -131,11 +125,10 @@ func TestCoarseRingPopulated(t *testing.T) { if s.coarseFilled != 1 { t.Fatalf("coarseFilled = %d, want 1", s.coarseFilled) } - coarse := s.coarseRing[(s.coarseHead-1+coarseRingSize)%coarseRingSize] + coarse := s.coarseRing[(s.coarseHead-1+st.CoarseRingSize)%st.CoarseRingSize] if len(coarse.Entries) == 0 { t.Fatal("coarse snapshot is empty") } - // 5 fine buckets × 10 counts = 50 if coarse.Entries[0].Count != 50 { t.Errorf("coarse count = %d, want 50", coarse.Entries[0].Count) } @@ -160,13 +153,8 @@ func TestSubscribeBroadcast(t *testing.T) { } func TestTopKOrdering(t *testing.T) { - m := map[string]int64{ - "a": 5, - "b": 100, - "c": 1, - "d": 50, - } - entries := topKFromMap(m, 3) + m := map[string]int64{"a": 5, "b": 100, "c": 1, "d": 50} + entries := st.TopKFromMap(m, 3) if len(entries) != 3 { t.Fatalf("got %d entries, want 3", len(entries)) } diff --git a/internal/store/store.go b/internal/store/store.go new file mode 100644 index 0000000..a999399 --- /dev/null +++ b/internal/store/store.go @@ -0,0 +1,196 @@ +// Package store provides the shared ring-buffer, label-encoding and query +// helpers used by both the collector and the aggregator. +package store + +import ( + "container/heap" + "fmt" + "time" + + pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" +) + +// Ring-buffer dimensions — shared between collector and aggregator. +const ( + FineRingSize = 60 // 60 × 1-min buckets → 1 hour + CoarseRingSize = 288 // 288 × 5-min buckets → 24 hours + FineTopK = 50_000 // entries kept per fine snapshot + CoarseTopK = 5_000 // entries kept per coarse snapshot + CoarseEvery = 5 // fine ticks between coarse writes +) + +// Tuple4 is the four-dimensional aggregation key. +type Tuple4 struct { + Website string + Prefix string + URI string + Status string +} + +// Entry is a labelled count used in snapshots and query results. +type Entry struct { + Label string + Count int64 +} + +// Snapshot is one sorted (desc) slice of top-K entries for a time bucket. +type Snapshot struct { + Timestamp time.Time + Entries []Entry +} + +// TrendPoint is a (timestamp, total-count) pair for sparkline queries. +type TrendPoint struct { + Timestamp time.Time + Count int64 +} + +// RingView is a read-only snapshot of a ring buffer for iteration. +type RingView struct { + Ring []Snapshot + Head int // index of next write slot (one past the latest entry) + Size int +} + +// BucketsForWindow returns the RingView and number of buckets to sum for window. +func BucketsForWindow(window pb.Window, fine, coarse RingView, fineFilled, coarseFilled int) (RingView, int) { + switch window { + case pb.Window_W1M: + return fine, min(1, fineFilled) + case pb.Window_W5M: + return fine, min(5, fineFilled) + case pb.Window_W15M: + return fine, min(15, fineFilled) + case pb.Window_W60M: + return fine, min(60, fineFilled) + case pb.Window_W6H: + return coarse, min(72, coarseFilled) // 72 × 5-min = 6 h + case pb.Window_W24H: + return coarse, min(288, coarseFilled) + default: + return fine, min(5, fineFilled) + } +} + +// --- label encoding: "website\x00prefix\x00uri\x00status" --- + +// EncodeTuple encodes a Tuple4 as a NUL-separated string suitable for use +// as a map key in snapshots. +func EncodeTuple(t Tuple4) string { + return t.Website + "\x00" + t.Prefix + "\x00" + t.URI + "\x00" + t.Status +} + +// LabelTuple decodes a NUL-separated snapshot label back into a Tuple4. +func LabelTuple(label string) Tuple4 { + parts := splitN(label, '\x00', 4) + if len(parts) != 4 { + return Tuple4{} + } + return Tuple4{parts[0], parts[1], parts[2], parts[3]} +} + +func splitN(s string, sep byte, n int) []string { + result := make([]string, 0, n) + for len(result) < n-1 { + i := indexOf(s, sep) + if i < 0 { + break + } + result = append(result, s[:i]) + s = s[i+1:] + } + return append(result, s) +} + +func indexOf(s string, b byte) int { + for i := 0; i < len(s); i++ { + if s[i] == b { + return i + } + } + return -1 +} + +// --- filtering and grouping --- + +// MatchesFilter returns true if t satisfies all constraints in f. +// A nil filter matches everything. +func MatchesFilter(t Tuple4, f *pb.Filter) bool { + if f == nil { + return true + } + if f.Website != nil && t.Website != f.GetWebsite() { + return false + } + if f.ClientPrefix != nil && t.Prefix != f.GetClientPrefix() { + return false + } + if f.HttpRequestUri != nil && t.URI != f.GetHttpRequestUri() { + return false + } + if f.HttpResponse != nil && t.Status != fmt.Sprint(f.GetHttpResponse()) { + return false + } + return true +} + +// DimensionLabel returns the string value of t for the given group-by dimension. +func DimensionLabel(t Tuple4, g pb.GroupBy) string { + switch g { + case pb.GroupBy_WEBSITE: + return t.Website + case pb.GroupBy_CLIENT_PREFIX: + return t.Prefix + case pb.GroupBy_REQUEST_URI: + return t.URI + case pb.GroupBy_HTTP_RESPONSE: + return t.Status + default: + return t.Website + } +} + +// --- heap-based top-K selection --- + +type entryHeap []Entry + +func (h entryHeap) Len() int { return len(h) } +func (h entryHeap) Less(i, j int) bool { return h[i].Count < h[j].Count } // min-heap +func (h entryHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *entryHeap) Push(x interface{}) { *h = append(*h, x.(Entry)) } +func (h *entryHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[:n-1] + return x +} + +// TopKFromMap selects the top-k entries from a label→count map, sorted desc. +func TopKFromMap(m map[string]int64, k int) []Entry { + if k <= 0 { + return nil + } + h := make(entryHeap, 0, k+1) + for label, count := range m { + heap.Push(&h, Entry{Label: label, Count: count}) + if h.Len() > k { + heap.Pop(&h) + } + } + result := make([]Entry, h.Len()) + for i := len(result) - 1; i >= 0; i-- { + result[i] = heap.Pop(&h).(Entry) + } + return result +} + +// TopKFromTupleMap encodes a Tuple4 map and returns the top-k as a Snapshot. +// Used by the collector to snapshot its live map. +func TopKFromTupleMap(m map[Tuple4]int64, k int, ts time.Time) Snapshot { + flat := make(map[string]int64, len(m)) + for t, c := range m { + flat[EncodeTuple(t)] = c + } + return Snapshot{Timestamp: ts, Entries: TopKFromMap(flat, k)} +}