From 4393ae272681098452644c15632f86f29cb0f60f Mon Sep 17 00:00:00 2001 From: Pim van Pelt Date: Sat, 14 Mar 2026 20:07:05 +0100 Subject: [PATCH] Initial commit with a spec + plan --- .gitignore | 10 ++ README.md | 371 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 381 insertions(+) create mode 100644 .gitignore create mode 100644 README.md diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6b672b3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +# Binaries +/collector +/aggregator +/frontend +/cli + +# Editor +.idea/ +.vscode/ +*.swp diff --git a/README.md b/README.md new file mode 100644 index 0000000..a18f4c8 --- /dev/null +++ b/README.md @@ -0,0 +1,371 @@ +SPECIFICATION + +This project contains three programs: +1) A collector that can tail any number of nginx logfiles, and will keep a data structure of +{website,client_prefix,http_request_uri,http_response} across all logfiles in memory. It is +queryable and can give topN clients by website and by http_request; in other words I can see "who is +causing the most HTTP 429" or "what is the busiest website". This program pre-aggregates the logs +into a queryable structure. It runs on any number (10 or so) of nginx machines in a cluster. There +is no UI here, only a gRPC interface. + +2) An aggregator that can query the first one and show global stats and trending information. It needs +to be able to show global aggregated information from the first (collectors) to show 'what is the +busiest nginx' in addition to 'what is the busiest website' or 'which client_prefix or +http_request_uri is causing the most HTTP 503s'. It runs on a central machine and can show trending +information; useful for ddos detection. This aggregator is an RPC client of the collectors, and +itself presents a gRPC interface. + +3) An HTTP companion frontend to the aggregator that can query either collector or aggregator and +answer user queries in a drilldown fashion, eg 'restrict to http_response=429' then 'restrict to +website=www.example.com' and so on. This is an interactive rollup UI that helps operators see +which websites are performing well, and which are performing poorly (eg excessive requests, +excessive http response errors, DDoS) + +Programs are written in Golang with a modern, responsive interactive interface. + +--- + +DESIGN + +## Directory Layout + +``` +nginx-logtail/ +├── proto/ +│ └── logtail.proto # shared protobuf definitions +└── cmd/ + ├── collector/ + │ ├── main.go + │ ├── tailer.go # tail multiple log files via fsnotify, handle logrotate + │ ├── parser.go # tab-separated logtail log_format parser + │ ├── store.go # bounded top-K in-memory store + tiered ring buffers + │ └── server.go # gRPC server with server-streaming StreamSnapshots + ├── aggregator/ + │ ├── main.go + │ ├── subscriber.go # opens streaming RPC to each collector, merges into cache + │ ├── merger.go # merge/sum TopN entries across sources + │ ├── cache.go # merged snapshot + tiered ring buffer served to frontend + │ └── server.go # gRPC server (same surface as collector) + ├── frontend/ + │ ├── main.go + │ ├── handler.go # HTTP handlers, filter state in URL query string + │ ├── client.go # gRPC client to aggregator (or collector) + │ └── templates/ # server-rendered HTML + inline SVG sparklines + └── cli/ + └── main.go # topn / trend / stream subcommands, JSON output +``` + +## Data Model + +The core unit is a **count keyed by four dimensions**: + +| Field | Description | Example | +|-------------------|------------------------------------------------------|-------------------| +| `website` | nginx `$host` | `www.example.com` | +| `client_prefix` | client IP truncated to /24 IPv4 or /48 IPv6 | `1.2.3.0/24` | +| `http_request_uri`| `$request_uri` path only — query string stripped | `/api/v1/search` | +| `http_response` | HTTP status code | `429` | + +## Time Windows & Tiered Ring Buffers + +Two ring buffers at different resolutions cover all query windows up to 24 hours: + +| Tier | Bucket size | Buckets | Top-K/bucket | Covers | Roll-up trigger | +|--------|-------------|---------|--------------|--------|---------------------| +| Fine | 1 min | 60 | 50 000 | 1 h | every minute | +| Coarse | 5 min | 288 | 5 000 | 24 h | every 5 fine ticks | + +Supported query windows and which tier they read from: + +| Window | Tier | Buckets summed | +|--------|--------|---------------| +| 1 min | fine | last 1 | +| 5 min | fine | last 5 | +| 15 min | fine | last 15 | +| 60 min | fine | all 60 | +| 6 h | coarse | last 72 | +| 24 h | coarse | all 288 | + +Every minute: snapshot live map → top-50K → append to fine ring, reset live map. +Every 5 minutes: merge last 5 fine snapshots → top-5K → append to coarse ring. + +## Memory Budget (Collector, target ≤ 1 GB) + +Entry size: ~30 B website + ~15 B prefix + ~50 B URI + 3 B status + 8 B count + ~80 B Go map +overhead ≈ **~186 bytes per entry**. + +| Structure | Entries | Size | +|-------------------------|------------|------------| +| Live map (capped) | 100 000 | ~19 MB | +| Fine ring (60 × 1-min) | 60 × 50 000 | ~558 MB | +| Coarse ring (288 × 5-min)| 288 × 5 000 | ~268 MB | +| **Total** | | **~845 MB** | + +The live map is **hard-capped at 100 K entries**. Once full, only updates to existing keys are +accepted; new keys are dropped until the next rotation resets the map. This keeps memory bounded +regardless of attack cardinality. + +## Future Work — ClickHouse Export (post-MVP) + +> **Do not implement until the end-to-end MVP is running.** + +The aggregator will optionally write 1-minute pre-aggregated rows to ClickHouse for 7d/30d +historical views. Schema sketch: + +```sql +CREATE TABLE logtail ( + ts DateTime, + website LowCardinality(String), + client_prefix String, + request_uri LowCardinality(String), + status UInt16, + count UInt64 +) ENGINE = SummingMergeTree(count) +PARTITION BY toYYYYMMDD(ts) +ORDER BY (ts, website, status, client_prefix, request_uri); +``` + +The frontend routes `window=7d|30d` queries to ClickHouse; all shorter windows continue to use +the in-memory cache. Kafka is not needed — the aggregator writes directly. This is purely additive +and does not change any existing interface. + +## Protobuf API (`proto/logtail.proto`) + +```protobuf +message Filter { + optional string website = 1; + optional string client_prefix = 2; + optional string http_request_uri = 3; + optional int32 http_response = 4; +} + +enum GroupBy { WEBSITE = 0; CLIENT_PREFIX = 1; REQUEST_URI = 2; HTTP_RESPONSE = 3; } +enum Window { W1M = 0; W5M = 1; W15M = 2; W60M = 3; W6H = 4; W24H = 5; } + +message TopNRequest { Filter filter = 1; GroupBy group_by = 2; int32 n = 3; Window window = 4; } +message TopNEntry { string label = 1; int64 count = 2; } +message TopNResponse { repeated TopNEntry entries = 1; string source = 2; } + +// Trend: one total count per minute bucket, for sparklines +message TrendRequest { Filter filter = 1; Window window = 4; } +message TrendPoint { int64 timestamp_unix = 1; int64 count = 2; } +message TrendResponse { repeated TrendPoint points = 1; } + +// Streaming: collector pushes a snapshot after every minute rotation +message SnapshotRequest {} +message Snapshot { + string source = 1; + int64 timestamp = 2; + repeated TopNEntry entries = 3; // full top-50K for this bucket +} + +service LogtailService { + rpc TopN(TopNRequest) returns (TopNResponse); + rpc Trend(TrendRequest) returns (TrendResponse); + rpc StreamSnapshots(SnapshotRequest) returns (stream Snapshot); +} +// Both collector and aggregator implement LogtailService. +// Aggregator's StreamSnapshots fans out to all collectors and merges. +``` + +## Program 1 — Collector + +### tailer.go +- One goroutine per log file. Opens file, seeks to EOF. +- Uses **fsnotify** (inotify on Linux) to detect writes. On `WRITE` event: read all new lines. +- On `RENAME`/`REMOVE` event (logrotate): drain to EOF of old fd, then **re-open** the original + path (with retry backoff) and resume from position 0. No lines are lost between drain and reopen. +- Emits `LogRecord` structs on a shared buffered channel (size 200 K — absorbs ~20 s of peak load). + +### parser.go +- Parses the fixed **logtail** nginx log format — tab-separated, fixed field order, no quoting: + + ```nginx + log_format logtail '$host\t$remote_addr\t$msec\t$request_method\t$request_uri\t$status\t$body_bytes_sent\t$request_time'; + ``` + + Example line: + ``` + www.example.com 1.2.3.4 1741954800.123 GET /api/v1/search 200 1452 0.043 + ``` + + Field positions (0-indexed): + + | # | Field | Used for | + |---|------------------|-----------------| + | 0 | `$host` | website | + | 1 | `$remote_addr` | client_prefix | + | 2 | `$msec` | (discarded) | + | 3 | `$request_method`| (discarded) | + | 4 | `$request_uri` | http_request_uri| + | 5 | `$status` | http_response | + | 6 | `$body_bytes_sent`| (discarded) | + | 7 | `$request_time` | (discarded) | + +- At runtime: `strings.SplitN(line, "\t", 8)` — single call, ~50 ns/line. No regex, no state machine. +- `$request_uri`: query string discarded at first `?`. +- `$remote_addr`: truncated to /24 (IPv4) or /48 (IPv6); prefix lengths configurable. +- Lines with fewer than 8 fields are silently skipped (malformed / truncated write). + +### store.go +- **Single aggregator goroutine** reads from the channel and updates the live map — no locking on + the hot path. At 10 K lines/s the goroutine uses <1% CPU. +- Live map: `map[Tuple4]int64`, hard-capped at 100 K entries (new keys dropped when full). +- **Minute ticker**: goroutine heap-selects top-50K entries from live map, writes snapshot into + fine ring buffer slot, clears live map, advances fine ring head. +- Every 5 fine ticks: merge last 5 fine snapshots → heap-select top-5K → write to coarse ring. +- Fine ring: `[60]Snapshot` circular array. Coarse ring: `[288]Snapshot` circular array. + Each Snapshot is `[]TopNEntry` sorted desc by count (already sorted, merge is a heap pass). +- **TopN query path**: RLock relevant ring, sum the bucket range, group by dimension, apply filter, + heap-select top N. Worst case: 288×5K = 1.4M iterations — completes in <20 ms. +- **Trend query path**: for each bucket in range, sum counts of entries matching filter, emit one + `TrendPoint`. O(buckets × K) but result is tiny (max 288 points). + +### server.go +- gRPC server on configurable port (default :9090). +- `TopN` and `Trend`: read-only calls into store, answered directly. +- `StreamSnapshots`: on each minute rotation the store signals a broadcast channel; the streaming + handler wakes, reads the latest snapshot from the ring, and sends it to all connected aggregators. + Uses `sync.Cond` or a fan-out via per-subscriber buffered channels. + +## Program 2 — Aggregator + +### subscriber.go +- On startup: dials each collector, calls `StreamSnapshots`, receives `Snapshot` messages. +- Each incoming snapshot is handed to **merger.go**. Reconnects with exponential backoff on + stream error. Marks collector as degraded after 3 failed reconnects; clears on success. + +### merger.go +- Maintains one `map[Tuple4]int64` per collector (latest snapshot only — no ring buffer here, + the aggregator's cache serves that role). +- On each new snapshot from a collector: replace that collector's map, then rebuild the merged + view by summing across all collector maps. Store merged result into cache.go's ring buffer. + +### cache.go +- Same ring-buffer structure as the collector store (60 slots), populated by merger. +- `TopN` and `Trend` queries are answered from this cache — no live fan-out needed at query time, + satisfying the 250 ms SLA with headroom. +- Also tracks per-collector entry counts for "busiest nginx" queries (answered by treating + `source` as an additional group-by dimension). + +### server.go +- Implements the same `LogtailService` proto as the collector. +- `StreamSnapshots` on the aggregator re-streams merged snapshots to any downstream consumer + (e.g. a second-tier aggregator, or monitoring). + +## Program 3 — Frontend + +### handler.go +- Filter state lives entirely in the **URL query string** (no server-side session needed; multiple + operators see independent views without shared state). Parameters: `w` (window), `by` (group_by), + `f_website`, `f_prefix`, `f_uri`, `f_status`. +- Main page: renders a ranked table. Clicking a row appends that dimension to the URL filter and + redirects. A breadcrumb shows active filters; each token is a link that removes it. +- **Auto-refresh**: `` — simple, reliable, no JS required. +- A `?raw=1` flag returns JSON for scripting/curl use. + +### templates/ +- Base layout with filter breadcrumb and window selector tabs (1m / 5m / 15m / 60m / 6h / 24h). +- Table partial: columns are label, count, % of total, bar (inline ``). +- Sparkline partial: inline SVG polyline built from `TrendResponse.points` — 60 points, scaled to + the bucket's max, rendered server-side. No JS, no external assets. + +## Program 4 — CLI + +A single binary (`cmd/cli/main.go`) for shell-based debugging and programmatic top-K queries. +Talks to any collector or aggregator via gRPC. All output is JSON. + +### Subcommands + +``` +cli topn --target HOST:PORT [filter flags] [--by DIM] [--window W] [--n N] [--pretty] +cli trend --target HOST:PORT [filter flags] [--window W] [--pretty] +cli stream --target HOST:PORT [--pretty] +``` + +### Flags + +| Flag | Default | Description | +|---------------|--------------|--------------------------------------------------------| +| `--target` | `localhost:9090` | gRPC address of collector or aggregator | +| `--by` | `website` | Group-by dimension: `website`, `prefix`, `uri`, `status` | +| `--window` | `5m` | Time window: `1m` `5m` `15m` `60m` `6h` `24h` | +| `--n` | `10` | Number of top entries to return | +| `--website` | — | Filter: restrict to this website | +| `--prefix` | — | Filter: restrict to this client prefix | +| `--uri` | — | Filter: restrict to this request URI | +| `--status` | — | Filter: restrict to this HTTP status code | +| `--pretty` | false | Indent JSON output | + +### Output format + +**`topn`** — single JSON object, exits after one response: +```json +{ + "target": "agg:9091", "window": "5m", "group_by": "prefix", + "filter": {"status": 429, "website": "www.example.com"}, + "queried_at": "2026-03-14T12:00:00Z", + "entries": [ + {"rank": 1, "label": "1.2.3.0/24", "count": 8471}, + {"rank": 2, "label": "5.6.7.0/24", "count": 3201} + ] +} +``` + +**`trend`** — single JSON object, exits after one response: +```json +{ + "target": "agg:9091", "window": "24h", "filter": {"status": 503}, + "queried_at": "2026-03-14T12:00:00Z", + "points": [ + {"time": "2026-03-14T11:00:00Z", "count": 45}, + {"time": "2026-03-14T11:05:00Z", "count": 120} + ] +} +``` + +**`stream`** — NDJSON (one JSON object per line, unbounded), suitable for `| jq -c 'select(...)'`: +```json +{"source": "nginx3:9090", "bucket_time": "2026-03-14T12:01:00Z", "entry_count": 42318, "top5": [{"label": "www.example.com", "count": 18000}, ...]} +``` + +### Example usage + +```bash +# Who is hammering us with 429s right now? +cli topn --target agg:9091 --window 1m --by prefix --status 429 --n 20 | jq '.entries[]' + +# Which website has the most 503s over the last 24h? +cli topn --target agg:9091 --window 24h --by website --status 503 + +# Trend of all traffic to one site over 6h (for a quick graph) +cli trend --target agg:9091 --window 6h --website api.example.com | jq '.points[] | [.time, .count]' + +# Watch live snapshots from one collector, filter for high-volume buckets +cli stream --target nginx3:9090 | jq -c 'select(.entry_count > 10000)' +``` + +### Implementation notes + +- Single `main.go` using the standard `flag` package with a manual subcommand dispatch — + no external CLI framework needed for three subcommands. +- Shares no code with the other binaries; duplicates the gRPC client setup locally (it's three + lines). Avoids creating a shared internal package for something this small. +- Non-zero exit code on any gRPC error so it composes cleanly in shell scripts. + +## Key Design Decisions + +| Decision | Rationale | +|----------|-----------| +| Single aggregator goroutine in collector | Eliminates all map lock contention on the 10 K/s hot path | +| Hard cap live map at 100 K entries | Bounds memory regardless of DDoS cardinality explosion | +| Ring buffer of sorted snapshots (not raw maps) | TopN queries avoid re-sorting; merge is a single heap pass | +| Push-based streaming (collector → aggregator) | Aggregator cache is always fresh; query latency is cache-read only | +| Same `LogtailService` for collector and aggregator | Frontend works with either; useful for single-box and debugging | +| Filter state in URL, not session cookie | Supports multiple concurrent operators; shareable/bookmarkable URLs | +| Query strings stripped at ingest | Major cardinality reduction; prevents URI explosion under attack | +| No persistent storage | Simplicity; acceptable for ops dashboards (restart = lose history) | +| Trusted internal network, no TLS | Reduces operational complexity; add a TLS proxy if needed later | +| Server-side SVG sparklines, meta-refresh | Zero JS dependencies; works in terminal browsers and curl | +| CLI outputs JSON, NDJSON for streaming | Composable with jq; non-zero exit on error for shell scripts | +| CLI uses stdlib `flag`, no framework | Three subcommands don't justify a dependency; single file |