Files
nginx-logtail/PLAN_AGGREGATOR.md

251 lines
10 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Aggregator v0 — Implementation Plan
Module path: `git.ipng.ch/ipng/nginx-logtail`
**Scope:** A working aggregator that subscribes to `StreamSnapshots` from all configured
collectors, maintains a merged in-memory cache, and serves the same `LogtailService` gRPC
interface as the collector. Tolerates partial collector failures.
---
## Step 1 — Extract shared logic to `internal/store`
The aggregator's cache is structurally identical to the collector's store: same `Entry` and
`snapshot` types, same tiered ring buffers, same heap-based top-K, same label encoding
(`encodeTuple`, `labelTuple`), same `matchesFilter` and `dimensionLabel` functions.
Rather than duplicating ~200 lines of load-bearing code, extract these to a shared internal
package before writing any aggregator code. Then refactor the collector to import it.
**New package: `internal/store`**
Move from `cmd/collector/store.go` into `internal/store/store.go`:
- `Tuple4` struct
- `Entry` struct
- `snapshot` struct (unexported → exported: `Snapshot`)
- `entryHeap` + heap interface methods
- `encodeTuple`, `labelTuple`, `splitN`, `indexOf`
- `matchesFilter`, `dimensionLabel`
- `topKFromMap`, `topK`
- `trendPoint`
- `ringView`, `bucketsForWindow`
- All ring-buffer constants (`fineRingSize`, `coarseRingSize`, `fineTopK`, `coarseTopK`,
`coarseEvery`)
Keep in `cmd/collector/store.go` (collector-specific):
- `liveMapCap`
- `Store` struct (live map + ring buffers + subscriber fan-out + `Run` goroutine)
- `ingest`, `rotate`, `mergeFineBuckets`
- `QueryTopN`, `QueryTrend`, `Subscribe`, `Unsubscribe`, `broadcast`
- The `Store` embeds the ring buffers using the types from `internal/store`
Collector tests must continue to pass unchanged after the refactor.
---
## Step 2 — subscriber.go
One goroutine per collector. Dials the collector, calls `StreamSnapshots`, and forwards each
received `pb.Snapshot` to the merger. Reconnects with exponential backoff on any stream error.
```
CollectorSub struct:
addr string
merger *Merger
source string // filled from first snapshot received
fails int // consecutive failures
```
Lifecycle:
1. `Dial(addr)``client.StreamSnapshots(ctx, &pb.SnapshotRequest{})`
2. Loop: `stream.Recv()``merger.Apply(snap)`; on error: close, `fails++`
3. If `fails >= 3`: call `merger.Zero(addr)`, log degraded warning
4. Backoff sleep (100 ms → doubles → cap 30 s), then go to step 1
5. On successful `Recv()` after degraded: `fails = 0`, log recovery
Context cancellation exits the goroutine cleanly.
---
## Step 3 — merger.go
Maintains the per-collector maps and a single running merged map. Uses a delta strategy:
when a new snapshot arrives from collector X, subtract X's previous entries from `merged`,
add the new entries, and replace X's stored map. This is O(snapshot_size) rather than
O(N_collectors × snapshot_size).
```
Merger struct:
mu sync.Mutex
perCollector map[string]map[string]int64 // addr → (label → count)
merged map[string]int64 // label → total count across all collectors
```
Methods:
- `Apply(snap *pb.Snapshot)` — lock, subtract old, add new, store new, unlock
- `Zero(addr string)` — lock, subtract perCollector[addr] from merged, delete entry, unlock
- `TopK(k int) []store.Entry` — lock, call `store.TopKFromMap(merged, k)`, unlock
`Apply` is called from multiple subscriber goroutines concurrently — the mutex is the only
synchronisation point. No channels needed here.
---
## Step 4 — cache.go
The aggregator's equivalent of the collector's `Store`. Holds the tiered ring buffers and
answers `TopN`/`Trend`/`StreamSnapshots` queries. Populated by a 1-minute ticker that snapshots
the current merged view from the merger.
```
Cache struct:
source string
merger *Merger
mu sync.RWMutex
fineRing [fineRingSize]store.Snapshot
fineHead int
fineFilled int
coarseRing [coarseRingSize]store.Snapshot
coarseHead int
coarseFilled int
fineTick int
subMu sync.Mutex
subs map[chan store.Snapshot]struct{}
```
`Run(ctx context.Context)`:
- 1-minute ticker → `rotate(time.Now())`
- `rotate`: `merger.TopK(fineTopK)` → fine ring slot; every 5 ticks → merge last 5 fine slots
into coarse ring slot (identical logic to collector `Store.rotate`)
- After writing: broadcast fine snapshot to subscribers
`QueryTopN`, `QueryTrend`, `Subscribe`, `Unsubscribe`, `broadcast`: identical to collector
`Store`, backed by `internal/store` helpers.
**Why tick-based and not snapshot-triggered?**
Collectors send snapshots roughly once per minute but not in sync. Triggering a ring write on
every incoming snapshot would produce N writes per minute (one per collector), inflating the ring
and misaligning time windows. A single ticker keeps the aggregator ring aligned with the same
1-minute cadence as the collectors.
---
## Step 5 — server.go
Identical structure to `cmd/collector/server.go`. Implements `pb.LogtailServiceServer` backed by
the `Cache` instead of the collector's `Store`. No new logic; just a different backing type.
`StreamSnapshots` sends merged fine snapshots (from `cache.Subscribe`) to downstream consumers
(frontend, CLI, or a second-tier aggregator).
---
## Step 6 — main.go
Flags:
| Flag | Default | Description |
|----------------|--------------|--------------------------------------------------------|
| `--listen` | `:9091` | gRPC listen address |
| `--collectors` | — | Comma-separated `host:port` addresses of collectors |
| `--source` | hostname | Name for this aggregator in query responses |
Wire-up:
1. Parse collector addresses
2. Create `Merger`
3. Create `Cache(merger, source)`
4. Start `cache.Run(ctx)` goroutine (ticker + ring rotation)
5. Start one `CollectorSub.Run(ctx)` goroutine per collector address
6. Start gRPC server
7. `signal.NotifyContext` for clean shutdown on SIGINT/SIGTERM
---
## Step 7 — Tests
| Test | What it covers |
|------|----------------|
| `TestMergerApply` | Two collectors send snapshots; merged map sums correctly |
| `TestMergerReplacement` | Second snapshot from same collector replaces first, not adds |
| `TestMergerZero` | Marking a collector degraded removes its contribution from merged |
| `TestMergerConcurrent` | `Apply` and `Zero` from concurrent goroutines; no race (run with `-race`) |
| `TestCacheRotation` | After one ticker fire, fine ring has 1 entry with correct counts |
| `TestCacheCoarseRing` | After 5 ticker fires, coarse ring has 1 entry |
| `TestCacheQueryTopN` | TopN returns correct merged rankings |
| `TestCacheQueryTrend` | Trend returns per-bucket sums oldest-first |
| `TestCacheSubscribe` | Subscriber receives snapshot after each rotation |
| `TestGRPCEndToEnd` | Two in-process fake collector servers; real aggregator dials them; TopN, Trend, StreamSnapshots verified |
All existing collector tests must continue to pass after the `internal/store` refactor.
---
## Step 8 — Smoke test
- Start two collector instances pointing at generated log files
- Start the aggregator pointing at both
- Use `grpcurl` to call `TopN` on the aggregator and confirm counts match the sum of the two
individual collector `TopN` results
- Kill one collector; confirm the aggregator continues serving and logs a degraded warning
- Restart the killed collector; confirm the aggregator recovers and resumes merging
---
## ✓ COMPLETE — Implementation notes
### Deviations from the plan
- **`TestMergerZeroNonexistent` added**: Plan listed 10 tests; an extra test was added to cover
`Zero()` on a source that never sent a snapshot (should be a no-op). Total: 13 tests.
- **`TestDegradedCollector` in end-to-end section**: Rather than a separate block, degraded
behaviour is tested with one real fake collector + one unreachable port in the same test file.
- **Race in `TestGRPCEndToEnd`**: The `cache.rotate()` call to trigger a broadcast needed a
50 ms sleep after `client.StreamSnapshots()` to allow the server goroutine to register its
subscriber before the broadcast fired. Without it the test was intermittently flaky under
the race detector and parallel test runs.
- **`source` field not stored on `CollectorSub`**: Plan mentioned storing `source` from the first
snapshot, but `Apply` uses `snap.Source` directly (keying `perCollector` by address). The
`source` field was not needed on the struct.
### Test results
```
$ go test ./... -count=1 -race -timeout 60s
ok git.ipng.ch/ipng/nginx-logtail/cmd/aggregator 4.1s
ok git.ipng.ch/ipng/nginx-logtail/cmd/collector 9.7s
```
All 13 aggregator tests and all 17 collector tests pass with `-race`.
### Test inventory
| Test | Package | What it covers |
|------|---------|----------------|
| `TestMergerApply` | aggregator | Two collectors sum correctly |
| `TestMergerReplacement` | aggregator | Second snapshot replaces, not adds |
| `TestMergerZero` | aggregator | Degraded collector removed from merged |
| `TestMergerZeroNonexistent` | aggregator | Zero on unknown source is a no-op |
| `TestMergerConcurrent` | aggregator | Apply + Zero from concurrent goroutines; -race |
| `TestCacheRotation` | aggregator | Fine ring written after one ticker fire |
| `TestCacheCoarseRing` | aggregator | Coarse ring written after 5 ticker fires |
| `TestCacheQueryTopN` | aggregator | TopN returns correct merged rankings |
| `TestCacheQueryTopNWithFilter` | aggregator | TopN with website filter |
| `TestCacheQueryTrend` | aggregator | Trend per-bucket sums oldest-first |
| `TestCacheSubscribe` | aggregator | Subscriber receives snapshot on rotation |
| `TestGRPCEndToEnd` | aggregator | Two fake collectors; real gRPC TopN/Trend/Stream |
| `TestDegradedCollector` | aggregator | Bad address zeroed; good collector still visible |
---
## Deferred (not in v0)
- Per-source (busiest nginx) breakdown — requires adding `SOURCE` to the `GroupBy` proto enum
and encoding the source into the merged snapshot entries; deferred until the proto is stable
- `cmd/cli` — covered in PLAN_CLI.md
- `cmd/frontend` — covered in PLAN_FRONTEND.md
- ClickHouse export
- TLS / auth
- Prometheus metrics endpoint