10 KiB
Aggregator v0 — Implementation Plan
Module path: git.ipng.ch/ipng/nginx-logtail
Scope: A working aggregator that subscribes to StreamSnapshots from all configured
collectors, maintains a merged in-memory cache, and serves the same LogtailService gRPC
interface as the collector. Tolerates partial collector failures.
Step 1 — Extract shared logic to internal/store
The aggregator's cache is structurally identical to the collector's store: same Entry and
snapshot types, same tiered ring buffers, same heap-based top-K, same label encoding
(encodeTuple, labelTuple), same matchesFilter and dimensionLabel functions.
Rather than duplicating ~200 lines of load-bearing code, extract these to a shared internal package before writing any aggregator code. Then refactor the collector to import it.
New package: internal/store
Move from cmd/collector/store.go into internal/store/store.go:
Tuple4structEntrystructsnapshotstruct (unexported → exported:Snapshot)entryHeap+ heap interface methodsencodeTuple,labelTuple,splitN,indexOfmatchesFilter,dimensionLabeltopKFromMap,topKtrendPointringView,bucketsForWindow- All ring-buffer constants (
fineRingSize,coarseRingSize,fineTopK,coarseTopK,coarseEvery)
Keep in cmd/collector/store.go (collector-specific):
liveMapCapStorestruct (live map + ring buffers + subscriber fan-out +Rungoroutine)ingest,rotate,mergeFineBucketsQueryTopN,QueryTrend,Subscribe,Unsubscribe,broadcast- The
Storeembeds the ring buffers using the types frominternal/store
Collector tests must continue to pass unchanged after the refactor.
Step 2 — subscriber.go
One goroutine per collector. Dials the collector, calls StreamSnapshots, and forwards each
received pb.Snapshot to the merger. Reconnects with exponential backoff on any stream error.
CollectorSub struct:
addr string
merger *Merger
source string // filled from first snapshot received
fails int // consecutive failures
Lifecycle:
Dial(addr)→client.StreamSnapshots(ctx, &pb.SnapshotRequest{})- Loop:
stream.Recv()→merger.Apply(snap); on error: close,fails++ - If
fails >= 3: callmerger.Zero(addr), log degraded warning - Backoff sleep (100 ms → doubles → cap 30 s), then go to step 1
- On successful
Recv()after degraded:fails = 0, log recovery
Context cancellation exits the goroutine cleanly.
Step 3 — merger.go
Maintains the per-collector maps and a single running merged map. Uses a delta strategy:
when a new snapshot arrives from collector X, subtract X's previous entries from merged,
add the new entries, and replace X's stored map. This is O(snapshot_size) rather than
O(N_collectors × snapshot_size).
Merger struct:
mu sync.Mutex
perCollector map[string]map[string]int64 // addr → (label → count)
merged map[string]int64 // label → total count across all collectors
Methods:
Apply(snap *pb.Snapshot)— lock, subtract old, add new, store new, unlockZero(addr string)— lock, subtract perCollector[addr] from merged, delete entry, unlockTopK(k int) []store.Entry— lock, callstore.TopKFromMap(merged, k), unlock
Apply is called from multiple subscriber goroutines concurrently — the mutex is the only
synchronisation point. No channels needed here.
Step 4 — cache.go
The aggregator's equivalent of the collector's Store. Holds the tiered ring buffers and
answers TopN/Trend/StreamSnapshots queries. Populated by a 1-minute ticker that snapshots
the current merged view from the merger.
Cache struct:
source string
merger *Merger
mu sync.RWMutex
fineRing [fineRingSize]store.Snapshot
fineHead int
fineFilled int
coarseRing [coarseRingSize]store.Snapshot
coarseHead int
coarseFilled int
fineTick int
subMu sync.Mutex
subs map[chan store.Snapshot]struct{}
Run(ctx context.Context):
- 1-minute ticker →
rotate(time.Now()) rotate:merger.TopK(fineTopK)→ fine ring slot; every 5 ticks → merge last 5 fine slots into coarse ring slot (identical logic to collectorStore.rotate)- After writing: broadcast fine snapshot to subscribers
QueryTopN, QueryTrend, Subscribe, Unsubscribe, broadcast: identical to collector
Store, backed by internal/store helpers.
Why tick-based and not snapshot-triggered? Collectors send snapshots roughly once per minute but not in sync. Triggering a ring write on every incoming snapshot would produce N writes per minute (one per collector), inflating the ring and misaligning time windows. A single ticker keeps the aggregator ring aligned with the same 1-minute cadence as the collectors.
Step 5 — server.go
Identical structure to cmd/collector/server.go. Implements pb.LogtailServiceServer backed by
the Cache instead of the collector's Store. No new logic; just a different backing type.
StreamSnapshots sends merged fine snapshots (from cache.Subscribe) to downstream consumers
(frontend, CLI, or a second-tier aggregator).
Step 6 — main.go
Flags:
| Flag | Default | Description |
|---|---|---|
--listen |
:9091 |
gRPC listen address |
--collectors |
— | Comma-separated host:port addresses of collectors |
--source |
hostname | Name for this aggregator in query responses |
Wire-up:
- Parse collector addresses
- Create
Merger - Create
Cache(merger, source) - Start
cache.Run(ctx)goroutine (ticker + ring rotation) - Start one
CollectorSub.Run(ctx)goroutine per collector address - Start gRPC server
signal.NotifyContextfor clean shutdown on SIGINT/SIGTERM
Step 7 — Tests
| Test | What it covers |
|---|---|
TestMergerApply |
Two collectors send snapshots; merged map sums correctly |
TestMergerReplacement |
Second snapshot from same collector replaces first, not adds |
TestMergerZero |
Marking a collector degraded removes its contribution from merged |
TestMergerConcurrent |
Apply and Zero from concurrent goroutines; no race (run with -race) |
TestCacheRotation |
After one ticker fire, fine ring has 1 entry with correct counts |
TestCacheCoarseRing |
After 5 ticker fires, coarse ring has 1 entry |
TestCacheQueryTopN |
TopN returns correct merged rankings |
TestCacheQueryTrend |
Trend returns per-bucket sums oldest-first |
TestCacheSubscribe |
Subscriber receives snapshot after each rotation |
TestGRPCEndToEnd |
Two in-process fake collector servers; real aggregator dials them; TopN, Trend, StreamSnapshots verified |
All existing collector tests must continue to pass after the internal/store refactor.
Step 8 — Smoke test
- Start two collector instances pointing at generated log files
- Start the aggregator pointing at both
- Use
grpcurlto callTopNon the aggregator and confirm counts match the sum of the two individual collectorTopNresults - Kill one collector; confirm the aggregator continues serving and logs a degraded warning
- Restart the killed collector; confirm the aggregator recovers and resumes merging
✓ COMPLETE — Implementation notes
Deviations from the plan
TestMergerZeroNonexistentadded: Plan listed 10 tests; an extra test was added to coverZero()on a source that never sent a snapshot (should be a no-op). Total: 13 tests.TestDegradedCollectorin end-to-end section: Rather than a separate block, degraded behaviour is tested with one real fake collector + one unreachable port in the same test file.- Race in
TestGRPCEndToEnd: Thecache.rotate()call to trigger a broadcast needed a 50 ms sleep afterclient.StreamSnapshots()to allow the server goroutine to register its subscriber before the broadcast fired. Without it the test was intermittently flaky under the race detector and parallel test runs. sourcefield not stored onCollectorSub: Plan mentioned storingsourcefrom the first snapshot, butApplyusessnap.Sourcedirectly (keyingperCollectorby address). Thesourcefield was not needed on the struct.
Test results
$ go test ./... -count=1 -race -timeout 60s
ok git.ipng.ch/ipng/nginx-logtail/cmd/aggregator 4.1s
ok git.ipng.ch/ipng/nginx-logtail/cmd/collector 9.7s
All 13 aggregator tests and all 17 collector tests pass with -race.
Test inventory
| Test | Package | What it covers |
|---|---|---|
TestMergerApply |
aggregator | Two collectors sum correctly |
TestMergerReplacement |
aggregator | Second snapshot replaces, not adds |
TestMergerZero |
aggregator | Degraded collector removed from merged |
TestMergerZeroNonexistent |
aggregator | Zero on unknown source is a no-op |
TestMergerConcurrent |
aggregator | Apply + Zero from concurrent goroutines; -race |
TestCacheRotation |
aggregator | Fine ring written after one ticker fire |
TestCacheCoarseRing |
aggregator | Coarse ring written after 5 ticker fires |
TestCacheQueryTopN |
aggregator | TopN returns correct merged rankings |
TestCacheQueryTopNWithFilter |
aggregator | TopN with website filter |
TestCacheQueryTrend |
aggregator | Trend per-bucket sums oldest-first |
TestCacheSubscribe |
aggregator | Subscriber receives snapshot on rotation |
TestGRPCEndToEnd |
aggregator | Two fake collectors; real gRPC TopN/Trend/Stream |
TestDegradedCollector |
aggregator | Bad address zeroed; good collector still visible |
Deferred (not in v0)
- Per-source (busiest nginx) breakdown — requires adding
SOURCEto theGroupByproto enum and encoding the source into the merged snapshot entries; deferred until the proto is stable cmd/cli— covered in PLAN_CLI.mdcmd/frontend— covered in PLAN_FRONTEND.md- ClickHouse export
- TLS / auth
- Prometheus metrics endpoint