Files
nginx-logtail/PLAN_AGGREGATOR.md

10 KiB
Raw Permalink Blame History

Aggregator v0 — Implementation Plan

Module path: git.ipng.ch/ipng/nginx-logtail

Scope: A working aggregator that subscribes to StreamSnapshots from all configured collectors, maintains a merged in-memory cache, and serves the same LogtailService gRPC interface as the collector. Tolerates partial collector failures.


Step 1 — Extract shared logic to internal/store

The aggregator's cache is structurally identical to the collector's store: same Entry and snapshot types, same tiered ring buffers, same heap-based top-K, same label encoding (encodeTuple, labelTuple), same matchesFilter and dimensionLabel functions.

Rather than duplicating ~200 lines of load-bearing code, extract these to a shared internal package before writing any aggregator code. Then refactor the collector to import it.

New package: internal/store

Move from cmd/collector/store.go into internal/store/store.go:

  • Tuple4 struct
  • Entry struct
  • snapshot struct (unexported → exported: Snapshot)
  • entryHeap + heap interface methods
  • encodeTuple, labelTuple, splitN, indexOf
  • matchesFilter, dimensionLabel
  • topKFromMap, topK
  • trendPoint
  • ringView, bucketsForWindow
  • All ring-buffer constants (fineRingSize, coarseRingSize, fineTopK, coarseTopK, coarseEvery)

Keep in cmd/collector/store.go (collector-specific):

  • liveMapCap
  • Store struct (live map + ring buffers + subscriber fan-out + Run goroutine)
  • ingest, rotate, mergeFineBuckets
  • QueryTopN, QueryTrend, Subscribe, Unsubscribe, broadcast
  • The Store embeds the ring buffers using the types from internal/store

Collector tests must continue to pass unchanged after the refactor.


Step 2 — subscriber.go

One goroutine per collector. Dials the collector, calls StreamSnapshots, and forwards each received pb.Snapshot to the merger. Reconnects with exponential backoff on any stream error.

CollectorSub struct:
  addr    string
  merger  *Merger
  source  string          // filled from first snapshot received
  fails   int             // consecutive failures

Lifecycle:

  1. Dial(addr)client.StreamSnapshots(ctx, &pb.SnapshotRequest{})
  2. Loop: stream.Recv()merger.Apply(snap); on error: close, fails++
  3. If fails >= 3: call merger.Zero(addr), log degraded warning
  4. Backoff sleep (100 ms → doubles → cap 30 s), then go to step 1
  5. On successful Recv() after degraded: fails = 0, log recovery

Context cancellation exits the goroutine cleanly.


Step 3 — merger.go

Maintains the per-collector maps and a single running merged map. Uses a delta strategy: when a new snapshot arrives from collector X, subtract X's previous entries from merged, add the new entries, and replace X's stored map. This is O(snapshot_size) rather than O(N_collectors × snapshot_size).

Merger struct:
  mu           sync.Mutex
  perCollector map[string]map[string]int64  // addr → (label → count)
  merged       map[string]int64             // label → total count across all collectors

Methods:

  • Apply(snap *pb.Snapshot) — lock, subtract old, add new, store new, unlock
  • Zero(addr string) — lock, subtract perCollector[addr] from merged, delete entry, unlock
  • TopK(k int) []store.Entry — lock, call store.TopKFromMap(merged, k), unlock

Apply is called from multiple subscriber goroutines concurrently — the mutex is the only synchronisation point. No channels needed here.


Step 4 — cache.go

The aggregator's equivalent of the collector's Store. Holds the tiered ring buffers and answers TopN/Trend/StreamSnapshots queries. Populated by a 1-minute ticker that snapshots the current merged view from the merger.

Cache struct:
  source  string
  merger  *Merger

  mu            sync.RWMutex
  fineRing      [fineRingSize]store.Snapshot
  fineHead      int
  fineFilled    int
  coarseRing    [coarseRingSize]store.Snapshot
  coarseHead    int
  coarseFilled  int
  fineTick      int

  subMu  sync.Mutex
  subs   map[chan store.Snapshot]struct{}

Run(ctx context.Context):

  • 1-minute ticker → rotate(time.Now())
  • rotate: merger.TopK(fineTopK) → fine ring slot; every 5 ticks → merge last 5 fine slots into coarse ring slot (identical logic to collector Store.rotate)
  • After writing: broadcast fine snapshot to subscribers

QueryTopN, QueryTrend, Subscribe, Unsubscribe, broadcast: identical to collector Store, backed by internal/store helpers.

Why tick-based and not snapshot-triggered? Collectors send snapshots roughly once per minute but not in sync. Triggering a ring write on every incoming snapshot would produce N writes per minute (one per collector), inflating the ring and misaligning time windows. A single ticker keeps the aggregator ring aligned with the same 1-minute cadence as the collectors.


Step 5 — server.go

Identical structure to cmd/collector/server.go. Implements pb.LogtailServiceServer backed by the Cache instead of the collector's Store. No new logic; just a different backing type.

StreamSnapshots sends merged fine snapshots (from cache.Subscribe) to downstream consumers (frontend, CLI, or a second-tier aggregator).


Step 6 — main.go

Flags:

Flag Default Description
--listen :9091 gRPC listen address
--collectors Comma-separated host:port addresses of collectors
--source hostname Name for this aggregator in query responses

Wire-up:

  1. Parse collector addresses
  2. Create Merger
  3. Create Cache(merger, source)
  4. Start cache.Run(ctx) goroutine (ticker + ring rotation)
  5. Start one CollectorSub.Run(ctx) goroutine per collector address
  6. Start gRPC server
  7. signal.NotifyContext for clean shutdown on SIGINT/SIGTERM

Step 7 — Tests

Test What it covers
TestMergerApply Two collectors send snapshots; merged map sums correctly
TestMergerReplacement Second snapshot from same collector replaces first, not adds
TestMergerZero Marking a collector degraded removes its contribution from merged
TestMergerConcurrent Apply and Zero from concurrent goroutines; no race (run with -race)
TestCacheRotation After one ticker fire, fine ring has 1 entry with correct counts
TestCacheCoarseRing After 5 ticker fires, coarse ring has 1 entry
TestCacheQueryTopN TopN returns correct merged rankings
TestCacheQueryTrend Trend returns per-bucket sums oldest-first
TestCacheSubscribe Subscriber receives snapshot after each rotation
TestGRPCEndToEnd Two in-process fake collector servers; real aggregator dials them; TopN, Trend, StreamSnapshots verified

All existing collector tests must continue to pass after the internal/store refactor.


Step 8 — Smoke test

  • Start two collector instances pointing at generated log files
  • Start the aggregator pointing at both
  • Use grpcurl to call TopN on the aggregator and confirm counts match the sum of the two individual collector TopN results
  • Kill one collector; confirm the aggregator continues serving and logs a degraded warning
  • Restart the killed collector; confirm the aggregator recovers and resumes merging

✓ COMPLETE — Implementation notes

Deviations from the plan

  • TestMergerZeroNonexistent added: Plan listed 10 tests; an extra test was added to cover Zero() on a source that never sent a snapshot (should be a no-op). Total: 13 tests.
  • TestDegradedCollector in end-to-end section: Rather than a separate block, degraded behaviour is tested with one real fake collector + one unreachable port in the same test file.
  • Race in TestGRPCEndToEnd: The cache.rotate() call to trigger a broadcast needed a 50 ms sleep after client.StreamSnapshots() to allow the server goroutine to register its subscriber before the broadcast fired. Without it the test was intermittently flaky under the race detector and parallel test runs.
  • source field not stored on CollectorSub: Plan mentioned storing source from the first snapshot, but Apply uses snap.Source directly (keying perCollector by address). The source field was not needed on the struct.

Test results

$ go test ./... -count=1 -race -timeout 60s
ok  git.ipng.ch/ipng/nginx-logtail/cmd/aggregator  4.1s
ok  git.ipng.ch/ipng/nginx-logtail/cmd/collector   9.7s

All 13 aggregator tests and all 17 collector tests pass with -race.

Test inventory

Test Package What it covers
TestMergerApply aggregator Two collectors sum correctly
TestMergerReplacement aggregator Second snapshot replaces, not adds
TestMergerZero aggregator Degraded collector removed from merged
TestMergerZeroNonexistent aggregator Zero on unknown source is a no-op
TestMergerConcurrent aggregator Apply + Zero from concurrent goroutines; -race
TestCacheRotation aggregator Fine ring written after one ticker fire
TestCacheCoarseRing aggregator Coarse ring written after 5 ticker fires
TestCacheQueryTopN aggregator TopN returns correct merged rankings
TestCacheQueryTopNWithFilter aggregator TopN with website filter
TestCacheQueryTrend aggregator Trend per-bucket sums oldest-first
TestCacheSubscribe aggregator Subscriber receives snapshot on rotation
TestGRPCEndToEnd aggregator Two fake collectors; real gRPC TopN/Trend/Stream
TestDegradedCollector aggregator Bad address zeroed; good collector still visible

Deferred (not in v0)

  • Per-source (busiest nginx) breakdown — requires adding SOURCE to the GroupBy proto enum and encoding the source into the merged snapshot entries; deferred until the proto is stable
  • cmd/cli — covered in PLAN_CLI.md
  • cmd/frontend — covered in PLAN_FRONTEND.md
  • ClickHouse export
  • TLS / auth
  • Prometheus metrics endpoint