6.9 KiB
Collector v0 — Implementation Plan ✓ COMPLETE
Module path: git.ipng.ch/ipng/nginx-logtail
Scope: A working collector that tails files, aggregates into memory, and serves TopN,
Trend, and StreamSnapshots over gRPC. Full vertical slice, no optimisation passes yet.
Step 1 — Repo scaffolding
go mod init git.ipng.ch/ipng/nginx-logtail.gitignore- Install deps:
google.golang.org/grpc,google.golang.org/protobuf,github.com/fsnotify/fsnotify
Step 2 — Proto (proto/logtail.proto)
Write the full proto file as specified in README.md DESIGN § Protobuf API. Generate Go stubs with
protoc. Commit generated files. This defines the contract everything else builds on.
Step 3 — Parser (cmd/collector/parser.go)
LogRecordstruct:Website,ClientPrefix,URI,Status stringParseLine(line string) (LogRecord, bool)—SplitNon tab, discard query string at?, returnfalsefor lines with fewer than 8 fieldsTruncateIP(addr string, v4bits, v6bits int) string— handle IPv4 and IPv6- Unit-tested with table-driven tests: normal line, short line, IPv6, query string stripping, /24 and /48 truncation
Step 4 — Store (cmd/collector/store.go)
Implement in order, each piece testable independently:
Tuple4and live map —map[Tuple4]int64, cap enforcement at 100K,Ingest(r LogRecord)- Fine ring buffer —
[60]Snapshotcircular array,rotate()heap-selects top-50K from live map, appends to ring, resets live map - Coarse ring buffer —
[288]Snapshot, populated every 5 fine rotations by merging the last 5 fine snapshots into a top-5K snapshot QueryTopN(filter, groupBy, n, window)— RLock, sum bucket range, group by dimension, apply filter, heap-select top NQueryTrend(filter, window)— per-bucket count sum, returns one point per bucketStore.Run(ch <-chan LogRecord)— single goroutine: read channel →Ingest, minute ticker →rotate()- Snapshot broadcast — per-subscriber buffered channel fan-out;
Subscribe() <-chan Snapshot/Unsubscribe(ch)
Step 5 — Tailer (cmd/collector/tailer.go)
Tailerstruct: path, fsnotify watcher, output channel- On start: open file, seek to EOF, register fsnotify watch
- On
fsnotify.Write:bufio.Scannerreads all new lines, sendsLogRecordto channel - On
fsnotify.Rename/Remove: drain to EOF, close fd, retry open with 100 ms backoff (up to 5 s), resume from position 0 — no lines lost between drain and reopen Tailer.Run(ctx context.Context)— blocks until context cancelled
Step 6 — gRPC server (cmd/collector/server.go)
Serverwraps*Store, implementsLogtailServiceServerTopN:store.QueryTopN→ marshal to proto responseTrend:store.QueryTrend→ marshal to proto responseStreamSnapshots:store.Subscribe(), loop sending snapshots until client disconnects or context done, thenstore.Unsubscribe(ch)
Step 7 — Main (cmd/collector/main.go)
Flags:
--listendefault:9090--logscomma-separated log file paths--sourcename for this collector instance (default: hostname)--v4prefixdefault24--v6prefixdefault48
Wire-up: create channel → start store.Run goroutine → start one Tailer goroutine per log
path → start gRPC server → signal.NotifyContext for clean shutdown on SIGINT/SIGTERM.
Step 8 — Smoke test
- Generate fake log lines at 10K/s (small Go script or shell one-liner)
- Run collector against them
- Use
grpcurlto callTopNand verify results - Check
runtime.MemStatsto confirm memory stays well under 1 GB
Deferred (not in v0)
cmd/cli,cmd/aggregator,cmd/frontend- ClickHouse export
- TLS / auth
- Prometheus metrics endpoint
Implementation notes
Deviation from plan: MultiTailer
Step 5 planned one Tailer struct per file. During implementation this was changed to a single
MultiTailer with one shared fsnotify.Watcher. Reason: one watcher per file creates one inotify
instance per file; the kernel default limit is 128 instances per user, which would be hit with
100s of log files. The MultiTailer uses a single instance and routes events by path via a
map[string]*fileState.
Deviation from plan: IPv6 /48 semantics
The design doc said "truncate to /48". /48 keeps the first three full 16-bit groups intact
(e.g. 2001:db8:cafe::1 → 2001:db8:cafe::/48). An early test expected 2001:db8:ca00::/48
(truncating mid-group), which was wrong. The code is correct; the test was fixed.
Test results
Run with: go test ./cmd/collector/ -v -count=1 -timeout 120s
| Test | What it covers |
|---|---|
TestParseLine (7 cases) |
Tab parsing, query string stripping, bad lines |
TestTruncateIP |
IPv4 /24 and IPv6 /48 masking |
TestIngestAndRotate |
Live map → fine ring rotation |
TestLiveMapCap |
Hard cap at 100 K entries, no panic beyond cap |
TestQueryTopN |
Ranked results from ring buffer |
TestQueryTopNWithFilter |
Filter by HTTP status code |
TestQueryTrend |
Per-bucket counts, oldest-first ordering |
TestCoarseRingPopulated |
5 fine ticks → 1 coarse bucket, count aggregation |
TestSubscribeBroadcast |
Fan-out channel delivery after rotation |
TestTopKOrdering |
Heap select returns correct top-K descending |
TestMultiTailerReadsLines |
Live file write → LogRecord received on channel |
TestMultiTailerMultipleFiles |
5 files, one watcher, all lines received |
TestMultiTailerLogRotation |
RENAME → drain → retry → new file tailed correctly |
TestExpandGlobs |
Glob pattern expands to matching files only |
TestExpandGlobsDeduplication |
Same file via path + glob deduplicated to one |
TestMemoryBudget |
Full ring fill stays within 1 GB heap |
TestGRPCEndToEnd |
Real gRPC server: TopN, filtered TopN, Trend, StreamSnapshots |
Total: 17 tests, all passing.
Benchmark results
Run with: go test ./cmd/collector/ -bench=. -benchtime=3s
Hardware: 12th Gen Intel Core i7-12700T
| Benchmark | ns/op | throughput | headroom vs 10K/s |
|---|---|---|---|
BenchmarkParseLine |
418 | ~2.4M lines/s | 240× |
BenchmarkIngest |
152 | ~6.5M records/s | 650× |
Both the parser and the store ingestion goroutine have several hundred times more capacity than the 10 000 lines/second peak requirement. The bottleneck at scale will be fsnotify event delivery and kernel I/O, not the Go code.