# Aggregator v0 — Implementation Plan Module path: `git.ipng.ch/ipng/nginx-logtail` **Scope:** A working aggregator that subscribes to `StreamSnapshots` from all configured collectors, maintains a merged in-memory cache, and serves the same `LogtailService` gRPC interface as the collector. Tolerates partial collector failures. --- ## Step 1 — Extract shared logic to `internal/store` The aggregator's cache is structurally identical to the collector's store: same `Entry` and `snapshot` types, same tiered ring buffers, same heap-based top-K, same label encoding (`encodeTuple`, `labelTuple`), same `matchesFilter` and `dimensionLabel` functions. Rather than duplicating ~200 lines of load-bearing code, extract these to a shared internal package before writing any aggregator code. Then refactor the collector to import it. **New package: `internal/store`** Move from `cmd/collector/store.go` into `internal/store/store.go`: - `Tuple4` struct - `Entry` struct - `snapshot` struct (unexported → exported: `Snapshot`) - `entryHeap` + heap interface methods - `encodeTuple`, `labelTuple`, `splitN`, `indexOf` - `matchesFilter`, `dimensionLabel` - `topKFromMap`, `topK` - `trendPoint` - `ringView`, `bucketsForWindow` - All ring-buffer constants (`fineRingSize`, `coarseRingSize`, `fineTopK`, `coarseTopK`, `coarseEvery`) Keep in `cmd/collector/store.go` (collector-specific): - `liveMapCap` - `Store` struct (live map + ring buffers + subscriber fan-out + `Run` goroutine) - `ingest`, `rotate`, `mergeFineBuckets` - `QueryTopN`, `QueryTrend`, `Subscribe`, `Unsubscribe`, `broadcast` - The `Store` embeds the ring buffers using the types from `internal/store` Collector tests must continue to pass unchanged after the refactor. --- ## Step 2 — subscriber.go One goroutine per collector. Dials the collector, calls `StreamSnapshots`, and forwards each received `pb.Snapshot` to the merger. Reconnects with exponential backoff on any stream error. ``` CollectorSub struct: addr string merger *Merger source string // filled from first snapshot received fails int // consecutive failures ``` Lifecycle: 1. `Dial(addr)` → `client.StreamSnapshots(ctx, &pb.SnapshotRequest{})` 2. Loop: `stream.Recv()` → `merger.Apply(snap)`; on error: close, `fails++` 3. If `fails >= 3`: call `merger.Zero(addr)`, log degraded warning 4. Backoff sleep (100 ms → doubles → cap 30 s), then go to step 1 5. On successful `Recv()` after degraded: `fails = 0`, log recovery Context cancellation exits the goroutine cleanly. --- ## Step 3 — merger.go Maintains the per-collector maps and a single running merged map. Uses a delta strategy: when a new snapshot arrives from collector X, subtract X's previous entries from `merged`, add the new entries, and replace X's stored map. This is O(snapshot_size) rather than O(N_collectors × snapshot_size). ``` Merger struct: mu sync.Mutex perCollector map[string]map[string]int64 // addr → (label → count) merged map[string]int64 // label → total count across all collectors ``` Methods: - `Apply(snap *pb.Snapshot)` — lock, subtract old, add new, store new, unlock - `Zero(addr string)` — lock, subtract perCollector[addr] from merged, delete entry, unlock - `TopK(k int) []store.Entry` — lock, call `store.TopKFromMap(merged, k)`, unlock `Apply` is called from multiple subscriber goroutines concurrently — the mutex is the only synchronisation point. No channels needed here. --- ## Step 4 — cache.go The aggregator's equivalent of the collector's `Store`. Holds the tiered ring buffers and answers `TopN`/`Trend`/`StreamSnapshots` queries. Populated by a 1-minute ticker that snapshots the current merged view from the merger. ``` Cache struct: source string merger *Merger mu sync.RWMutex fineRing [fineRingSize]store.Snapshot fineHead int fineFilled int coarseRing [coarseRingSize]store.Snapshot coarseHead int coarseFilled int fineTick int subMu sync.Mutex subs map[chan store.Snapshot]struct{} ``` `Run(ctx context.Context)`: - 1-minute ticker → `rotate(time.Now())` - `rotate`: `merger.TopK(fineTopK)` → fine ring slot; every 5 ticks → merge last 5 fine slots into coarse ring slot (identical logic to collector `Store.rotate`) - After writing: broadcast fine snapshot to subscribers `QueryTopN`, `QueryTrend`, `Subscribe`, `Unsubscribe`, `broadcast`: identical to collector `Store`, backed by `internal/store` helpers. **Why tick-based and not snapshot-triggered?** Collectors send snapshots roughly once per minute but not in sync. Triggering a ring write on every incoming snapshot would produce N writes per minute (one per collector), inflating the ring and misaligning time windows. A single ticker keeps the aggregator ring aligned with the same 1-minute cadence as the collectors. --- ## Step 5 — server.go Identical structure to `cmd/collector/server.go`. Implements `pb.LogtailServiceServer` backed by the `Cache` instead of the collector's `Store`. No new logic; just a different backing type. `StreamSnapshots` sends merged fine snapshots (from `cache.Subscribe`) to downstream consumers (frontend, CLI, or a second-tier aggregator). --- ## Step 6 — main.go Flags: | Flag | Default | Description | |----------------|--------------|--------------------------------------------------------| | `--listen` | `:9091` | gRPC listen address | | `--collectors` | — | Comma-separated `host:port` addresses of collectors | | `--source` | hostname | Name for this aggregator in query responses | Wire-up: 1. Parse collector addresses 2. Create `Merger` 3. Create `Cache(merger, source)` 4. Start `cache.Run(ctx)` goroutine (ticker + ring rotation) 5. Start one `CollectorSub.Run(ctx)` goroutine per collector address 6. Start gRPC server 7. `signal.NotifyContext` for clean shutdown on SIGINT/SIGTERM --- ## Step 7 — Tests | Test | What it covers | |------|----------------| | `TestMergerApply` | Two collectors send snapshots; merged map sums correctly | | `TestMergerReplacement` | Second snapshot from same collector replaces first, not adds | | `TestMergerZero` | Marking a collector degraded removes its contribution from merged | | `TestMergerConcurrent` | `Apply` and `Zero` from concurrent goroutines; no race (run with `-race`) | | `TestCacheRotation` | After one ticker fire, fine ring has 1 entry with correct counts | | `TestCacheCoarseRing` | After 5 ticker fires, coarse ring has 1 entry | | `TestCacheQueryTopN` | TopN returns correct merged rankings | | `TestCacheQueryTrend` | Trend returns per-bucket sums oldest-first | | `TestCacheSubscribe` | Subscriber receives snapshot after each rotation | | `TestGRPCEndToEnd` | Two in-process fake collector servers; real aggregator dials them; TopN, Trend, StreamSnapshots verified | All existing collector tests must continue to pass after the `internal/store` refactor. --- ## Step 8 — Smoke test - Start two collector instances pointing at generated log files - Start the aggregator pointing at both - Use `grpcurl` to call `TopN` on the aggregator and confirm counts match the sum of the two individual collector `TopN` results - Kill one collector; confirm the aggregator continues serving and logs a degraded warning - Restart the killed collector; confirm the aggregator recovers and resumes merging --- ## ✓ COMPLETE — Implementation notes ### Deviations from the plan - **`TestMergerZeroNonexistent` added**: Plan listed 10 tests; an extra test was added to cover `Zero()` on a source that never sent a snapshot (should be a no-op). Total: 13 tests. - **`TestDegradedCollector` in end-to-end section**: Rather than a separate block, degraded behaviour is tested with one real fake collector + one unreachable port in the same test file. - **Race in `TestGRPCEndToEnd`**: The `cache.rotate()` call to trigger a broadcast needed a 50 ms sleep after `client.StreamSnapshots()` to allow the server goroutine to register its subscriber before the broadcast fired. Without it the test was intermittently flaky under the race detector and parallel test runs. - **`source` field not stored on `CollectorSub`**: Plan mentioned storing `source` from the first snapshot, but `Apply` uses `snap.Source` directly (keying `perCollector` by address). The `source` field was not needed on the struct. ### Test results ``` $ go test ./... -count=1 -race -timeout 60s ok git.ipng.ch/ipng/nginx-logtail/cmd/aggregator 4.1s ok git.ipng.ch/ipng/nginx-logtail/cmd/collector 9.7s ``` All 13 aggregator tests and all 17 collector tests pass with `-race`. ### Test inventory | Test | Package | What it covers | |------|---------|----------------| | `TestMergerApply` | aggregator | Two collectors sum correctly | | `TestMergerReplacement` | aggregator | Second snapshot replaces, not adds | | `TestMergerZero` | aggregator | Degraded collector removed from merged | | `TestMergerZeroNonexistent` | aggregator | Zero on unknown source is a no-op | | `TestMergerConcurrent` | aggregator | Apply + Zero from concurrent goroutines; -race | | `TestCacheRotation` | aggregator | Fine ring written after one ticker fire | | `TestCacheCoarseRing` | aggregator | Coarse ring written after 5 ticker fires | | `TestCacheQueryTopN` | aggregator | TopN returns correct merged rankings | | `TestCacheQueryTopNWithFilter` | aggregator | TopN with website filter | | `TestCacheQueryTrend` | aggregator | Trend per-bucket sums oldest-first | | `TestCacheSubscribe` | aggregator | Subscriber receives snapshot on rotation | | `TestGRPCEndToEnd` | aggregator | Two fake collectors; real gRPC TopN/Trend/Stream | | `TestDegradedCollector` | aggregator | Bad address zeroed; good collector still visible | --- ## Deferred (not in v0) - Per-source (busiest nginx) breakdown — requires adding `SOURCE` to the `GroupBy` proto enum and encoding the source into the merged snapshot entries; deferred until the proto is stable - `cmd/cli` — covered in PLAN_CLI.md - `cmd/frontend` — covered in PLAN_FRONTEND.md - ClickHouse export - TLS / auth - Prometheus metrics endpoint