From 862d043376cba535c66c27d75f56e583c85df8e7 Mon Sep 17 00:00:00 2001 From: Pim van Pelt Date: Fri, 27 Mar 2026 01:20:19 +0100 Subject: [PATCH] Refactor docs --- README.md | 529 +----------------- docs/DETAILS.md | 528 +++++++++++++++++ PLAN_AGGREGATOR.md => docs/PLAN_AGGREGATOR.md | 0 PLAN_CLI.md => docs/PLAN_CLI.md | 0 PLAN_COLLECTOR.md => docs/PLAN_COLLECTOR.md | 0 PLAN_FRONTEND.md => docs/PLAN_FRONTEND.md | 0 6 files changed, 544 insertions(+), 513 deletions(-) create mode 100644 docs/DETAILS.md rename PLAN_AGGREGATOR.md => docs/PLAN_AGGREGATOR.md (100%) rename PLAN_CLI.md => docs/PLAN_CLI.md (100%) rename PLAN_COLLECTOR.md => docs/PLAN_COLLECTOR.md (100%) rename PLAN_FRONTEND.md => docs/PLAN_FRONTEND.md (100%) diff --git a/README.md b/README.md index 11ff58e..c94bdfa 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -PREAMBLE +## PREAMBLE Although this computer program has a permissive license (AP2.0), if you came here looking to ask questions, you're better off just moving on :) This program is shared AS-IS and really without any @@ -8,521 +8,24 @@ Code. You have been warned :) -SPECIFICATION - -This project contains four programs: - -1) A **collector** that tails any number of nginx log files and maintains an in-memory structure of -`{website, client_prefix, http_request_uri, http_response, is_tor, asn}` counts across all files. -It answers TopN and Trend queries via gRPC and pushes minute snapshots to the aggregator via -server-streaming. It also exposes a Prometheus `/metrics` endpoint (default `:9100`) with per-host -request counters and response-body/request-time histograms. -Runs on each nginx machine in the cluster. No UI — gRPC and HTTP interfaces only. - -2) An **aggregator** that subscribes to the snapshot stream from all collectors, merges their data -into a unified in-memory cache, and exposes the same gRPC interface. Answers questions like "what -is the busiest website globally", "which client prefix is causing the most HTTP 503s", and shows -trending information useful for DDoS detection. Runs on a central machine. - -3) An **HTTP frontend** companion to the aggregator that renders a drilldown dashboard. Operators -can restrict by `http_response=429`, then by `website=www.example.com`, and so on. Works with -either a collector or aggregator as its backend. Zero JavaScript — server-rendered HTML with inline -SVG sparklines and meta-refresh. - -4) A **CLI** for shell-based debugging. Sends `topn`, `trend`, and `stream` queries to any -collector or aggregator, fans out to multiple targets in parallel, and outputs human-readable -tables or newline-delimited JSON. - -Programs are written in Go. No CGO, no external runtime dependencies. - ---- - ![nginx-logtail frontend](docs/frontend.png) ---- +## What is this? -DEPLOYMENT +This project consists of four components: +1. A log collector that tails NGINX (or Apache) logs of a certain format, and aggregates + information per website, client address, status, and so on. It buckets these into windows + of 1min, 5min, 15min, 60min, 6hrs and 24hrs. It exposes this on a gRPC endpoint. +1. An aggregator that can scrape any number of collectors into a merged regional (or global) + view. The aggregator exposes the same gRPC endpoint as the collectors. +1. A Frontend that allows to query this data structure very quickly. +1. A CLI that allows to query this data also, returning JSON for further processing. -## Docker +It's written in Go, and is meant to deploy collectors on any number of webservers, and central +aggregation and frontend logic. It's released under [[APACHE](LICENSE)] license. It can be run +either as `systemd` units, or in Docker, or any combination of the two. -All four binaries are published in a single image: `git.ipng.ch/ipng/nginx-logtail`. +See [[User Guide](docs/USERGUIDE.md)] or [[DETAILS](docs/DETAILS.md)] for more information. -The image is built with a two-stage Dockerfile: a `golang:1.24-alpine` builder produces -statically-linked, stripped binaries (`CGO_ENABLED=0`, `-trimpath -ldflags="-s -w"`); the final -stage is `scratch` — no OS, no shell, no runtime dependencies. Each binary is invoked explicitly -via the container `command`. - -### Build and push - -``` -docker compose build --push -``` - -### Running aggregator + frontend - -The `docker-compose.yml` in the repo root runs the aggregator and frontend together. At minimum, -set `AGGREGATOR_COLLECTORS` to the comma-separated `host:port` list of your collector(s): - -```sh -AGGREGATOR_COLLECTORS=nginx1:9090,nginx2:9090 docker compose up -d -``` - -The frontend reaches the aggregator at `aggregator:9091` via Docker's internal DNS. The frontend -UI is available on port `8080`. - -### Environment variables - -All flags have environment variable equivalents. CLI flags take precedence over env vars. - -**collector** (runs on each nginx host, not in Docker): - -| Env var | Flag | Default | -|--------------------------|-------------------|-------------| -| `COLLECTOR_LISTEN` | `-listen` | `:9090` | -| `COLLECTOR_PROM_LISTEN` | `-prom-listen` | `:9100` | -| `COLLECTOR_LOGS` | `-logs` | — | -| `COLLECTOR_LOGS_FILE` | `-logs-file` | — | -| `COLLECTOR_SOURCE` | `-source` | hostname | -| `COLLECTOR_V4PREFIX` | `-v4prefix` | `24` | -| `COLLECTOR_V6PREFIX` | `-v6prefix` | `48` | -| `COLLECTOR_SCAN_INTERVAL`| `-scan-interval` | `10s` | - -**aggregator**: - -| Env var | Flag | Default | -|--------------------------|---------------|-------------| -| `AGGREGATOR_LISTEN` | `-listen` | `:9091` | -| `AGGREGATOR_COLLECTORS` | `-collectors` | — (required)| -| `AGGREGATOR_SOURCE` | `-source` | hostname | - -**frontend**: - -| Env var | Flag | Default | -|------------------|------------|-------------------| -| `FRONTEND_LISTEN`| `-listen` | `:8080` | -| `FRONTEND_TARGET`| `-target` | `localhost:9091` | -| `FRONTEND_N` | `-n` | `25` | -| `FRONTEND_REFRESH`| `-refresh`| `30` | - ---- - -DESIGN - -## Directory Layout - -``` -nginx-logtail/ -├── proto/ -│ ├── logtail.proto # shared protobuf definitions -│ └── logtailpb/ -│ ├── logtail.pb.go # generated: messages, enums -│ └── logtail_grpc.pb.go # generated: service stubs -├── internal/ -│ └── store/ -│ └── store.go # shared types: Tuple6, Entry, Snapshot, ring helpers -└── cmd/ - ├── collector/ - │ ├── main.go - │ ├── tailer.go # MultiTailer: tail N files via one shared fsnotify watcher - │ ├── parser.go # tab-separated logtail log_format parser (~50 ns/line) - │ ├── store.go # bounded top-K in-memory store + tiered ring buffers - │ └── server.go # gRPC server: TopN, Trend, StreamSnapshots - ├── aggregator/ - │ ├── main.go - │ ├── subscriber.go # one goroutine per collector; StreamSnapshots with backoff - │ ├── merger.go # delta-merge: O(snapshot_size) per update - │ ├── cache.go # tick-based ring buffer cache served to clients - │ ├── registry.go # TargetRegistry: addr→name map updated from snapshot sources - │ └── server.go # gRPC server (same surface as collector) - ├── frontend/ - │ ├── main.go - │ ├── handler.go # URL param parsing, concurrent TopN+Trend, template exec - │ ├── filter.go # ParseFilterExpr / FilterExprString mini filter language - │ ├── client.go # gRPC dial helper - │ ├── sparkline.go # TrendPoints → inline SVG polyline - │ ├── format.go # fmtCount (space thousands separator) - │ └── templates/ - │ ├── base.html # outer HTML shell, inline CSS, meta-refresh - │ └── index.html # window tabs, group-by tabs, breadcrumb, table, footer - └── cli/ - ├── main.go # subcommand dispatch and usage - ├── flags.go # shared flags, parseTargets, buildFilter, parseWindow - ├── client.go # gRPC dial helper - ├── format.go # printTable, fmtCount, fmtTime, targetHeader - ├── cmd_topn.go # topn: concurrent fan-out, table + JSON output - ├── cmd_trend.go # trend: concurrent fan-out, table + JSON output - ├── cmd_stream.go # stream: multiplexed streams, auto-reconnect - └── cmd_targets.go # targets: list collectors known to the endpoint -``` - -## Data Model - -The core unit is a **count keyed by six dimensions**: - -| Field | Description | Example | -|-------------------|------------------------------------------------------|-------------------| -| `website` | nginx `$host` | `www.example.com` | -| `client_prefix` | client IP truncated to /24 IPv4 or /48 IPv6 | `1.2.3.0/24` | -| `http_request_uri`| `$request_uri` path only — query string stripped | `/api/v1/search` | -| `http_response` | HTTP status code | `429` | -| `is_tor` | whether the client IP is a TOR exit node | `1` | -| `asn` | client AS number (MaxMind GeoIP2, 32-bit int) | `8298` | - -## Time Windows & Tiered Ring Buffers - -Two ring buffers at different resolutions cover all query windows up to 24 hours: - -| Tier | Bucket size | Buckets | Top-K/bucket | Covers | Roll-up trigger | -|--------|-------------|---------|--------------|--------|---------------------| -| Fine | 1 min | 60 | 50 000 | 1 h | every minute | -| Coarse | 5 min | 288 | 5 000 | 24 h | every 5 fine ticks | - -Supported query windows and which tier they read from: - -| Window | Tier | Buckets summed | -|--------|--------|----------------| -| 1 min | fine | last 1 | -| 5 min | fine | last 5 | -| 15 min | fine | last 15 | -| 60 min | fine | all 60 | -| 6 h | coarse | last 72 | -| 24 h | coarse | all 288 | - -Every minute: snapshot live map → top-50K → append to fine ring, reset live map. -Every 5 minutes: merge last 5 fine snapshots → top-5K → append to coarse ring. - -## Memory Budget (Collector, target ≤ 1 GB) - -Entry size: ~30 B website + ~15 B prefix + ~50 B URI + 3 B status + 1 B is_tor + 4 B asn + 8 B count + ~80 B Go map -overhead ≈ **~191 bytes per entry**. - -| Structure | Entries | Size | -|-------------------------|-------------|-------------| -| Live map (capped) | 100 000 | ~19 MB | -| Fine ring (60 × 1-min) | 60 × 50 000 | ~558 MB | -| Coarse ring (288 × 5-min)| 288 × 5 000| ~268 MB | -| **Total** | | **~845 MB** | - -The live map is **hard-capped at 100 K entries**. Once full, only updates to existing keys are -accepted; new keys are dropped until the next rotation resets the map. This keeps memory bounded -regardless of attack cardinality. - -## Future Work — ClickHouse Export (post-MVP) - -> **Do not implement until the end-to-end MVP is running.** - -The aggregator will optionally write 1-minute pre-aggregated rows to ClickHouse for 7d/30d -historical views. Schema sketch: - -```sql -CREATE TABLE logtail ( - ts DateTime, - website LowCardinality(String), - client_prefix String, - request_uri LowCardinality(String), - status UInt16, - count UInt64 -) ENGINE = SummingMergeTree(count) -PARTITION BY toYYYYMMDD(ts) -ORDER BY (ts, website, status, client_prefix, request_uri); -``` - -The frontend routes `window=7d|30d` queries to ClickHouse; all shorter windows continue to use -the in-memory cache. Kafka is not needed — the aggregator writes directly. This is purely additive -and does not change any existing interface. - -## Protobuf API (`proto/logtail.proto`) - -```protobuf -enum TorFilter { TOR_ANY = 0; TOR_YES = 1; TOR_NO = 2; } -enum StatusOp { EQ = 0; NE = 1; GT = 2; GE = 3; LT = 4; LE = 5; } - -message Filter { - optional string website = 1; - optional string client_prefix = 2; - optional string http_request_uri = 3; - optional int32 http_response = 4; - StatusOp status_op = 5; // comparison operator for http_response - optional string website_regex = 6; // RE2 regex against website - optional string uri_regex = 7; // RE2 regex against http_request_uri - TorFilter tor = 8; // TOR_ANY (default) / TOR_YES / TOR_NO - optional int32 asn_number = 9; // filter by client ASN - StatusOp asn_op = 10; // comparison operator for asn_number -} - -enum GroupBy { WEBSITE = 0; CLIENT_PREFIX = 1; REQUEST_URI = 2; HTTP_RESPONSE = 3; ASN_NUMBER = 4; } -enum Window { W1M = 0; W5M = 1; W15M = 2; W60M = 3; W6H = 4; W24H = 5; } - -message TopNRequest { Filter filter = 1; GroupBy group_by = 2; int32 n = 3; Window window = 4; } -message TopNEntry { string label = 1; int64 count = 2; } -message TopNResponse { repeated TopNEntry entries = 1; string source = 2; } - -// Trend: one total count per minute (or 5-min) bucket, for sparklines -message TrendRequest { Filter filter = 1; Window window = 4; } -message TrendPoint { int64 timestamp_unix = 1; int64 count = 2; } -message TrendResponse { repeated TrendPoint points = 1; string source = 2; } - -// Streaming: collector pushes a fine snapshot after every minute rotation -message SnapshotRequest {} -message Snapshot { - string source = 1; - int64 timestamp = 2; - repeated TopNEntry entries = 3; // full top-50K for this bucket - bool is_coarse = 4; // true for 5-min coarse buckets (DumpSnapshots only) -} - -// Target discovery: list the collectors behind the queried endpoint -message ListTargetsRequest {} -message TargetInfo { - string name = 1; // display name (--source value from the collector) - string addr = 2; // gRPC address; empty string means "this endpoint itself" -} -message ListTargetsResponse { repeated TargetInfo targets = 1; } - -// Backfill: dump full ring buffer contents for aggregator restart recovery -message DumpSnapshotsRequest {} -// Response reuses Snapshot; is_coarse distinguishes fine (1-min) from coarse (5-min) buckets. -// Stream closes after all historical data is sent (unlike StreamSnapshots which stays open). - -service LogtailService { - rpc TopN(TopNRequest) returns (TopNResponse); - rpc Trend(TrendRequest) returns (TrendResponse); - rpc StreamSnapshots(SnapshotRequest) returns (stream Snapshot); - rpc ListTargets(ListTargetsRequest) returns (ListTargetsResponse); - rpc DumpSnapshots(DumpSnapshotsRequest) returns (stream Snapshot); -} -// Both collector and aggregator implement LogtailService. -// The aggregator's StreamSnapshots re-streams the merged view. -// ListTargets: aggregator returns all configured collectors; collector returns itself. -// DumpSnapshots: collector only; aggregator calls this on startup to backfill its ring. -``` - -## Program 1 — Collector - -### tailer.go -- **`MultiTailer`**: one shared `fsnotify.Watcher` for all files regardless of count — avoids - the inotify instance limit when tailing hundreds of files. -- On `WRITE` event: read all new lines from that file's `bufio.Reader`. -- On `RENAME`/`REMOVE` (logrotate): drain old fd to EOF, close, start retry-open goroutine with - exponential backoff. Sends the new `*os.File` back via a channel to keep map access single-threaded. -- Emits `LogRecord` structs on a shared buffered channel (capacity 200 K — absorbs ~20 s of peak). -- Accepts paths via `--logs` (comma-separated or glob) and `--logs-file` (one path/glob per line). - -### parser.go -- Parses the fixed **logtail** nginx log format — tab-separated, fixed field order, no quoting: - - ```nginx - log_format logtail '$host\t$remote_addr\t$msec\t$request_method\t$request_uri\t$status\t$body_bytes_sent\t$request_time\t$is_tor\t$asn'; - ``` - - | # | Field | Used for | - |---|-------------------|------------------| - | 0 | `$host` | website | - | 1 | `$remote_addr` | client_prefix | - | 2 | `$msec` | (discarded) | - | 3 | `$request_method` | (discarded) | - | 4 | `$request_uri` | http_request_uri | - | 5 | `$status` | http_response | - | 6 | `$body_bytes_sent`| (discarded) | - | 7 | `$request_time` | (discarded) | - | 8 | `$is_tor` | is_tor | - | 9 | `$asn` | asn | - -- `strings.SplitN(line, "\t", 10)` — ~50 ns/line. No regex. -- `$request_uri`: query string discarded at first `?`. -- `$remote_addr`: truncated to /24 (IPv4) or /48 (IPv6); prefix lengths configurable via flags. -- `$is_tor`: `1` if the client IP is a TOR exit node, `0` otherwise. Field is optional — lines - with exactly 8 fields (old format) are accepted and default to `is_tor=false`. -- `$asn`: client AS number as a decimal integer (from MaxMind GeoIP2). Field is optional — - lines without it default to `asn=0`. -- Lines with fewer than 8 fields are silently skipped. - -### store.go -- **Single aggregator goroutine** reads from the channel and updates the live map — no locking on - the hot path. At 10 K lines/s the goroutine uses <1% CPU. -- Live map: `map[Tuple6]int64`, hard-capped at 100 K entries (new keys dropped when full). -- **Minute ticker**: heap-selects top-50K entries, writes snapshot to fine ring, resets live map. -- Every 5 fine ticks: merge last 5 fine snapshots → top-5K → write to coarse ring. -- **TopN query**: RLock ring, sum bucket range, apply filter, group by dimension, heap-select top N. -- **Trend query**: per-bucket filtered sum, returns one `TrendPoint` per bucket. -- **Subscriber fan-out**: per-subscriber buffered channel; `Subscribe`/`Unsubscribe` for streaming. -- **`DumpRings()`**: acquires `RLock`, copies both ring arrays and their head/filled pointers - (just slice headers — microseconds), releases lock, then returns chronologically-ordered fine - and coarse snapshot slices. The lock is never held during serialisation or network I/O. - -### server.go -- gRPC server on configurable port (default `:9090`). -- `TopN` and `Trend`: unary, answered from the ring buffer under RLock. -- `StreamSnapshots`: registers a subscriber channel; loops `Recv` on it; 30 s keepalive ticker. -- `DumpSnapshots`: calls `DumpRings()`, streams all fine buckets (`is_coarse=false`) then all - coarse buckets (`is_coarse=true`), then closes the stream. No lock held during streaming. - -## Program 2 — Aggregator - -### subscriber.go -- One goroutine per collector. Dials, calls `StreamSnapshots`, forwards each `Snapshot` to the - merger. -- Reconnects with exponential backoff (100 ms → doubles → cap 30 s). -- After 3 consecutive failures: calls `merger.Zero(addr)` to remove that collector's contribution - from the merged view (prevents stale counts accumulating during outages). -- Resets failure count on first successful `Recv`; logs recovery. - -### merger.go -- **Delta strategy**: on each new snapshot from collector X, subtract X's previous entries from - `merged`, add the new entries, store new map. O(snapshot_size) per update — not - O(N_collectors × snapshot_size). -- `Zero(addr)`: subtracts the collector's last-known contribution and deletes its entry — called - when a collector is marked degraded. - -### cache.go -- **Tick-based rotation** (1-min ticker, not snapshot-triggered): keeps the aggregator ring aligned - to the same 1-minute cadence as collectors regardless of how many collectors are connected. -- Same tiered ring structure as the collector store; populated from `merger.TopK()` each tick. -- `QueryTopN`, `QueryTrend`, `Subscribe`/`Unsubscribe` — identical interface to collector store. -- **`LoadHistorical(fine, coarse []Snapshot)`**: writes pre-merged backfill snapshots directly into - the ring arrays under `mu.Lock()`, sets head and filled counters, then returns. Safe to call - concurrently with queries. The live ticker continues from the updated head after this returns. - -### backfill.go -- **`Backfill(ctx, collectorAddrs, cache)`**: called once at aggregator startup (in a goroutine, - after the gRPC server is already listening so the frontend is never blocked). -- Dials all collectors concurrently and calls `DumpSnapshots` on each. -- Accumulates entries per timestamp in `map[unix-second]map[label]count`; multiple collectors' - contributions for the same bucket are summed — the same delta-merge semantics as the live path. -- Sorts timestamps chronologically, runs `TopKFromMap` per bucket, caps to ring size. -- Calls `cache.LoadHistorical` once with the merged results. -- **Graceful degradation**: if a collector returns `Unimplemented` (old binary without - `DumpSnapshots`), logs an informational message and skips it — live streaming still starts - normally. Any other error is logged with timing and also skipped. Partial backfill (some - collectors succeed, some fail) is supported. -- Logs per-collector stats: bucket counts, total entry counts, and wall-clock duration. - -### registry.go -- **`TargetRegistry`**: `sync.RWMutex`-protected `map[addr → name]`. Initialised with the - configured collector addresses; display names are updated from the `source` field of the first - snapshot received from each collector. -- `Targets()` returns a stable sorted slice of `{name, addr}` pairs for `ListTargets` responses. - -### server.go -- Implements `LogtailService` backed by the cache (not live fan-out). -- `StreamSnapshots` re-streams merged fine snapshots; usable by a second-tier aggregator or - monitoring system. -- `ListTargets` returns the current `TargetRegistry` contents — all configured collectors with - their display names and gRPC addresses. - -## Program 3 — Frontend - -### handler.go -- All filter state in the **URL query string**: `w` (window), `by` (group_by), `f_website`, - `f_prefix`, `f_uri`, `f_status`, `f_website_re`, `f_uri_re`, `f_is_tor`, `f_asn`, `n`, `target`. No - server-side session — URLs are shareable and bookmarkable; multiple operators see independent views. -- **Filter expression box**: a `q=` parameter carries a mini filter language - (`status>=400 AND website~=gouda.* AND uri~=^/api/`). On submission the handler parses it - via `ParseFilterExpr` and redirects to the canonical URL with individual `f_*` params; `q=` - never appears in the final URL. Parse errors re-render the current page with an inline message. -- **Status expressions**: `f_status` accepts `200`, `!=200`, `>=400`, `<500`, etc. — parsed by - `store.ParseStatusExpr` into `(value, StatusOp)` for the filter protobuf. -- **ASN expressions**: `f_asn` accepts the same expression syntax (`12345`, `!=65000`, `>=1000`, - `<64512`, etc.) — also parsed by `store.ParseStatusExpr`, stored as `(asn_number, AsnOp)` in the - filter protobuf. -- **Regex filters**: `f_website_re` and `f_uri_re` hold RE2 patterns; compiled once per request - into `store.CompiledFilter` before the query-loop iteration. Invalid regexes match nothing. -- `TopN`, `Trend`, and `ListTargets` RPCs issued **concurrently** (all with a 5 s deadline); page - renders with whatever completes. Trend failure suppresses the sparkline; `ListTargets` failure - hides the source picker — both are non-fatal. -- **Source picker**: `ListTargets` result drives a `source:` tab row. Clicking a collector tab - sets `target=` to that collector's address, querying it directly. The "all" tab resets to the - default aggregator. Picker is hidden when `ListTargets` returns ≤0 collectors (direct collector - mode). -- **Drilldown**: clicking a table row adds the current dimension's filter and advances `by` through - `website → prefix → uri → status → asn → website` (cycles). -- **`raw=1`**: returns the TopN result as JSON — same URL, no CLI needed for scripting. -- **`target=` override**: per-request gRPC endpoint override for comparing sources. -- Error pages render at HTTP 502 with the window/group-by tabs still functional. - -### sparkline.go -- `renderSparkline([]*pb.TrendPoint) template.HTML` — fixed `viewBox="0 0 300 60"` SVG, - Y-scaled to max count, rendered as ``. Returns `""` for fewer than 2 points or - all-zero data. - -### templates/ -- `base.html`: outer shell, inline CSS (~40 lines), conditional ``. -- `index.html`: window tabs, group-by tabs, filter breadcrumb with `×` remove links, sparkline, - TopN table with `` bars (% relative to rank-1), footer with source and refresh info. -- No external CSS, no web fonts, no JavaScript. Renders in w3m/lynx. - -## Program 4 — CLI - -### Subcommands - -``` -logtail-cli topn [flags] ranked label → count table (exits after one response) -logtail-cli trend [flags] per-bucket time series (exits after one response) -logtail-cli stream [flags] live snapshot feed (runs until Ctrl-C, auto-reconnects) -logtail-cli targets [flags] list targets known to the queried endpoint -``` - -### Flags - -**Shared** (all subcommands): - -| Flag | Default | Description | -|---------------|------------------|----------------------------------------------------------| -| `--target` | `localhost:9090` | Comma-separated `host:port` list; fan-out to all | -| `--json` | false | Emit newline-delimited JSON instead of a table | -| `--website` | — | Filter: website | -| `--prefix` | — | Filter: client prefix | -| `--uri` | — | Filter: request URI | -| `--status` | — | Filter: HTTP status expression (`200`, `!=200`, `>=400`, `<500`, …) | -| `--website-re`| — | Filter: RE2 regex against website | -| `--uri-re` | — | Filter: RE2 regex against request URI | -| `--is-tor` | — | Filter: TOR traffic (`1` or `!=0` = TOR only; `0` or `!=1` = non-TOR only) | -| `--asn` | — | Filter: ASN expression (`12345`, `!=65000`, `>=1000`, `<64512`, …) | - -**`topn` only**: `--n 10`, `--window 5m`, `--group-by website` - -**`trend` only**: `--window 5m` - -### Multi-target fan-out - -`--target` accepts a comma-separated list. All targets are queried concurrently; results are -printed in order with a per-target header. Single-target output omits the header for clean -pipe-to-`jq` use. - -### Output - -Default: human-readable table with space-separated thousands (`18 432`). -`--json`: a single JSON array (one object per target) for `topn` and `trend`; NDJSON for `stream` (unbounded). - -`stream` reconnects automatically on error (5 s backoff). All other subcommands exit immediately -with a non-zero code on gRPC error. - -## Key Design Decisions - -| Decision | Rationale | -|----------|-----------| -| Single aggregator goroutine in collector | Eliminates all map lock contention on the 10 K/s hot path | -| Hard cap live map at 100 K entries | Bounds memory regardless of DDoS cardinality explosion | -| Ring buffer of sorted snapshots (not raw maps) | TopN queries avoid re-sorting; merge is a single heap pass | -| Push-based streaming (collector → aggregator) | Aggregator cache always fresh; query latency is cache-read only | -| Delta merge in aggregator | O(snapshot_size) per update, not O(N_collectors × size) | -| Tick-based cache rotation in aggregator | Ring stays on the same 1-min cadence regardless of collector count | -| Degraded collector zeroing | Stale counts from failed collectors don't accumulate in the merged view | -| Same `LogtailService` for collector and aggregator | CLI and frontend work with either; no special-casing | -| `internal/store` shared package | ring-buffer, `Tuple6` encoding, and filter logic shared between collector and aggregator | -| Filter state in URL, not session cookie | Multiple concurrent operators; shareable/bookmarkable URLs | -| Query strings stripped at ingest | Major cardinality reduction; prevents URI explosion under attack | -| No persistent storage | Simplicity; acceptable for ops dashboards (restart = lose history) | -| Trusted internal network, no TLS | Reduces operational complexity; add a TLS proxy if needed later | -| Server-side SVG sparklines, meta-refresh | Zero JS dependencies; works in terminal browsers and curl | -| CLI default: human-readable table | Operator-friendly by default; `--json` opt-in for scripting | -| CLI multi-target fan-out | Compare a collector vs. aggregator, or two collectors, in one command | -| CLI uses stdlib `flag`, no framework | Four subcommands don't justify a dependency | -| Status filter as expression string (`!=200`, `>=400`) | Operator-friendly; parsed once at query boundary, encoded as `(int32, StatusOp)` in proto | -| ASN filter reuses `StatusOp` and `ParseStatusExpr` | Same 6-operator grammar as status; no duplicate enum or parser needed | -| Regex filters compiled once per query (`CompiledFilter`) | Up to 288 × 5 000 per-entry calls — compiling per-entry would dominate query latency | -| Filter expression box (`q=`) redirects to canonical URL | Filter state stays in individual `f_*` params; URLs remain shareable and bookmarkable | -| `ListTargets` + frontend source picker | "Which nginx is busiest?" answered by switching `target=` to a collector; no data model changes, no extra memory | -| Backfill via `DumpSnapshots` on restart | Aggregator recovers full 24h ring from collectors on restart; gRPC server starts first so frontend is never blocked during backfill | -| `DumpRings()` copies under lock, streams without lock | Lock held for microseconds (slice-header copy only); network I/O happens outside the lock so minute rotation is never delayed | -| Backfill merges per-timestamp across collectors | Correct cross-collector sums per bucket, same semantics as live delta-merge; collectors that don't support `DumpSnapshots` are skipped gracefully | +The [[docs/](docs/) directory also contains extensive planning information which shows how Claude +Code single-shot implemented the whole system (in March 2026). diff --git a/docs/DETAILS.md b/docs/DETAILS.md new file mode 100644 index 0000000..11ff58e --- /dev/null +++ b/docs/DETAILS.md @@ -0,0 +1,528 @@ +PREAMBLE + +Although this computer program has a permissive license (AP2.0), if you came here looking to ask +questions, you're better off just moving on :) This program is shared AS-IS and really without any +intent for anybody but IPng Networks to use it. Also, in case the structure of the repo and the +style of this README wasn't already clear, this program is 100% written and maintained by Claude +Code. + +You have been warned :) + +SPECIFICATION + +This project contains four programs: + +1) A **collector** that tails any number of nginx log files and maintains an in-memory structure of +`{website, client_prefix, http_request_uri, http_response, is_tor, asn}` counts across all files. +It answers TopN and Trend queries via gRPC and pushes minute snapshots to the aggregator via +server-streaming. It also exposes a Prometheus `/metrics` endpoint (default `:9100`) with per-host +request counters and response-body/request-time histograms. +Runs on each nginx machine in the cluster. No UI — gRPC and HTTP interfaces only. + +2) An **aggregator** that subscribes to the snapshot stream from all collectors, merges their data +into a unified in-memory cache, and exposes the same gRPC interface. Answers questions like "what +is the busiest website globally", "which client prefix is causing the most HTTP 503s", and shows +trending information useful for DDoS detection. Runs on a central machine. + +3) An **HTTP frontend** companion to the aggregator that renders a drilldown dashboard. Operators +can restrict by `http_response=429`, then by `website=www.example.com`, and so on. Works with +either a collector or aggregator as its backend. Zero JavaScript — server-rendered HTML with inline +SVG sparklines and meta-refresh. + +4) A **CLI** for shell-based debugging. Sends `topn`, `trend`, and `stream` queries to any +collector or aggregator, fans out to multiple targets in parallel, and outputs human-readable +tables or newline-delimited JSON. + +Programs are written in Go. No CGO, no external runtime dependencies. + +--- + +![nginx-logtail frontend](docs/frontend.png) + +--- + +DEPLOYMENT + +## Docker + +All four binaries are published in a single image: `git.ipng.ch/ipng/nginx-logtail`. + +The image is built with a two-stage Dockerfile: a `golang:1.24-alpine` builder produces +statically-linked, stripped binaries (`CGO_ENABLED=0`, `-trimpath -ldflags="-s -w"`); the final +stage is `scratch` — no OS, no shell, no runtime dependencies. Each binary is invoked explicitly +via the container `command`. + +### Build and push + +``` +docker compose build --push +``` + +### Running aggregator + frontend + +The `docker-compose.yml` in the repo root runs the aggregator and frontend together. At minimum, +set `AGGREGATOR_COLLECTORS` to the comma-separated `host:port` list of your collector(s): + +```sh +AGGREGATOR_COLLECTORS=nginx1:9090,nginx2:9090 docker compose up -d +``` + +The frontend reaches the aggregator at `aggregator:9091` via Docker's internal DNS. The frontend +UI is available on port `8080`. + +### Environment variables + +All flags have environment variable equivalents. CLI flags take precedence over env vars. + +**collector** (runs on each nginx host, not in Docker): + +| Env var | Flag | Default | +|--------------------------|-------------------|-------------| +| `COLLECTOR_LISTEN` | `-listen` | `:9090` | +| `COLLECTOR_PROM_LISTEN` | `-prom-listen` | `:9100` | +| `COLLECTOR_LOGS` | `-logs` | — | +| `COLLECTOR_LOGS_FILE` | `-logs-file` | — | +| `COLLECTOR_SOURCE` | `-source` | hostname | +| `COLLECTOR_V4PREFIX` | `-v4prefix` | `24` | +| `COLLECTOR_V6PREFIX` | `-v6prefix` | `48` | +| `COLLECTOR_SCAN_INTERVAL`| `-scan-interval` | `10s` | + +**aggregator**: + +| Env var | Flag | Default | +|--------------------------|---------------|-------------| +| `AGGREGATOR_LISTEN` | `-listen` | `:9091` | +| `AGGREGATOR_COLLECTORS` | `-collectors` | — (required)| +| `AGGREGATOR_SOURCE` | `-source` | hostname | + +**frontend**: + +| Env var | Flag | Default | +|------------------|------------|-------------------| +| `FRONTEND_LISTEN`| `-listen` | `:8080` | +| `FRONTEND_TARGET`| `-target` | `localhost:9091` | +| `FRONTEND_N` | `-n` | `25` | +| `FRONTEND_REFRESH`| `-refresh`| `30` | + +--- + +DESIGN + +## Directory Layout + +``` +nginx-logtail/ +├── proto/ +│ ├── logtail.proto # shared protobuf definitions +│ └── logtailpb/ +│ ├── logtail.pb.go # generated: messages, enums +│ └── logtail_grpc.pb.go # generated: service stubs +├── internal/ +│ └── store/ +│ └── store.go # shared types: Tuple6, Entry, Snapshot, ring helpers +└── cmd/ + ├── collector/ + │ ├── main.go + │ ├── tailer.go # MultiTailer: tail N files via one shared fsnotify watcher + │ ├── parser.go # tab-separated logtail log_format parser (~50 ns/line) + │ ├── store.go # bounded top-K in-memory store + tiered ring buffers + │ └── server.go # gRPC server: TopN, Trend, StreamSnapshots + ├── aggregator/ + │ ├── main.go + │ ├── subscriber.go # one goroutine per collector; StreamSnapshots with backoff + │ ├── merger.go # delta-merge: O(snapshot_size) per update + │ ├── cache.go # tick-based ring buffer cache served to clients + │ ├── registry.go # TargetRegistry: addr→name map updated from snapshot sources + │ └── server.go # gRPC server (same surface as collector) + ├── frontend/ + │ ├── main.go + │ ├── handler.go # URL param parsing, concurrent TopN+Trend, template exec + │ ├── filter.go # ParseFilterExpr / FilterExprString mini filter language + │ ├── client.go # gRPC dial helper + │ ├── sparkline.go # TrendPoints → inline SVG polyline + │ ├── format.go # fmtCount (space thousands separator) + │ └── templates/ + │ ├── base.html # outer HTML shell, inline CSS, meta-refresh + │ └── index.html # window tabs, group-by tabs, breadcrumb, table, footer + └── cli/ + ├── main.go # subcommand dispatch and usage + ├── flags.go # shared flags, parseTargets, buildFilter, parseWindow + ├── client.go # gRPC dial helper + ├── format.go # printTable, fmtCount, fmtTime, targetHeader + ├── cmd_topn.go # topn: concurrent fan-out, table + JSON output + ├── cmd_trend.go # trend: concurrent fan-out, table + JSON output + ├── cmd_stream.go # stream: multiplexed streams, auto-reconnect + └── cmd_targets.go # targets: list collectors known to the endpoint +``` + +## Data Model + +The core unit is a **count keyed by six dimensions**: + +| Field | Description | Example | +|-------------------|------------------------------------------------------|-------------------| +| `website` | nginx `$host` | `www.example.com` | +| `client_prefix` | client IP truncated to /24 IPv4 or /48 IPv6 | `1.2.3.0/24` | +| `http_request_uri`| `$request_uri` path only — query string stripped | `/api/v1/search` | +| `http_response` | HTTP status code | `429` | +| `is_tor` | whether the client IP is a TOR exit node | `1` | +| `asn` | client AS number (MaxMind GeoIP2, 32-bit int) | `8298` | + +## Time Windows & Tiered Ring Buffers + +Two ring buffers at different resolutions cover all query windows up to 24 hours: + +| Tier | Bucket size | Buckets | Top-K/bucket | Covers | Roll-up trigger | +|--------|-------------|---------|--------------|--------|---------------------| +| Fine | 1 min | 60 | 50 000 | 1 h | every minute | +| Coarse | 5 min | 288 | 5 000 | 24 h | every 5 fine ticks | + +Supported query windows and which tier they read from: + +| Window | Tier | Buckets summed | +|--------|--------|----------------| +| 1 min | fine | last 1 | +| 5 min | fine | last 5 | +| 15 min | fine | last 15 | +| 60 min | fine | all 60 | +| 6 h | coarse | last 72 | +| 24 h | coarse | all 288 | + +Every minute: snapshot live map → top-50K → append to fine ring, reset live map. +Every 5 minutes: merge last 5 fine snapshots → top-5K → append to coarse ring. + +## Memory Budget (Collector, target ≤ 1 GB) + +Entry size: ~30 B website + ~15 B prefix + ~50 B URI + 3 B status + 1 B is_tor + 4 B asn + 8 B count + ~80 B Go map +overhead ≈ **~191 bytes per entry**. + +| Structure | Entries | Size | +|-------------------------|-------------|-------------| +| Live map (capped) | 100 000 | ~19 MB | +| Fine ring (60 × 1-min) | 60 × 50 000 | ~558 MB | +| Coarse ring (288 × 5-min)| 288 × 5 000| ~268 MB | +| **Total** | | **~845 MB** | + +The live map is **hard-capped at 100 K entries**. Once full, only updates to existing keys are +accepted; new keys are dropped until the next rotation resets the map. This keeps memory bounded +regardless of attack cardinality. + +## Future Work — ClickHouse Export (post-MVP) + +> **Do not implement until the end-to-end MVP is running.** + +The aggregator will optionally write 1-minute pre-aggregated rows to ClickHouse for 7d/30d +historical views. Schema sketch: + +```sql +CREATE TABLE logtail ( + ts DateTime, + website LowCardinality(String), + client_prefix String, + request_uri LowCardinality(String), + status UInt16, + count UInt64 +) ENGINE = SummingMergeTree(count) +PARTITION BY toYYYYMMDD(ts) +ORDER BY (ts, website, status, client_prefix, request_uri); +``` + +The frontend routes `window=7d|30d` queries to ClickHouse; all shorter windows continue to use +the in-memory cache. Kafka is not needed — the aggregator writes directly. This is purely additive +and does not change any existing interface. + +## Protobuf API (`proto/logtail.proto`) + +```protobuf +enum TorFilter { TOR_ANY = 0; TOR_YES = 1; TOR_NO = 2; } +enum StatusOp { EQ = 0; NE = 1; GT = 2; GE = 3; LT = 4; LE = 5; } + +message Filter { + optional string website = 1; + optional string client_prefix = 2; + optional string http_request_uri = 3; + optional int32 http_response = 4; + StatusOp status_op = 5; // comparison operator for http_response + optional string website_regex = 6; // RE2 regex against website + optional string uri_regex = 7; // RE2 regex against http_request_uri + TorFilter tor = 8; // TOR_ANY (default) / TOR_YES / TOR_NO + optional int32 asn_number = 9; // filter by client ASN + StatusOp asn_op = 10; // comparison operator for asn_number +} + +enum GroupBy { WEBSITE = 0; CLIENT_PREFIX = 1; REQUEST_URI = 2; HTTP_RESPONSE = 3; ASN_NUMBER = 4; } +enum Window { W1M = 0; W5M = 1; W15M = 2; W60M = 3; W6H = 4; W24H = 5; } + +message TopNRequest { Filter filter = 1; GroupBy group_by = 2; int32 n = 3; Window window = 4; } +message TopNEntry { string label = 1; int64 count = 2; } +message TopNResponse { repeated TopNEntry entries = 1; string source = 2; } + +// Trend: one total count per minute (or 5-min) bucket, for sparklines +message TrendRequest { Filter filter = 1; Window window = 4; } +message TrendPoint { int64 timestamp_unix = 1; int64 count = 2; } +message TrendResponse { repeated TrendPoint points = 1; string source = 2; } + +// Streaming: collector pushes a fine snapshot after every minute rotation +message SnapshotRequest {} +message Snapshot { + string source = 1; + int64 timestamp = 2; + repeated TopNEntry entries = 3; // full top-50K for this bucket + bool is_coarse = 4; // true for 5-min coarse buckets (DumpSnapshots only) +} + +// Target discovery: list the collectors behind the queried endpoint +message ListTargetsRequest {} +message TargetInfo { + string name = 1; // display name (--source value from the collector) + string addr = 2; // gRPC address; empty string means "this endpoint itself" +} +message ListTargetsResponse { repeated TargetInfo targets = 1; } + +// Backfill: dump full ring buffer contents for aggregator restart recovery +message DumpSnapshotsRequest {} +// Response reuses Snapshot; is_coarse distinguishes fine (1-min) from coarse (5-min) buckets. +// Stream closes after all historical data is sent (unlike StreamSnapshots which stays open). + +service LogtailService { + rpc TopN(TopNRequest) returns (TopNResponse); + rpc Trend(TrendRequest) returns (TrendResponse); + rpc StreamSnapshots(SnapshotRequest) returns (stream Snapshot); + rpc ListTargets(ListTargetsRequest) returns (ListTargetsResponse); + rpc DumpSnapshots(DumpSnapshotsRequest) returns (stream Snapshot); +} +// Both collector and aggregator implement LogtailService. +// The aggregator's StreamSnapshots re-streams the merged view. +// ListTargets: aggregator returns all configured collectors; collector returns itself. +// DumpSnapshots: collector only; aggregator calls this on startup to backfill its ring. +``` + +## Program 1 — Collector + +### tailer.go +- **`MultiTailer`**: one shared `fsnotify.Watcher` for all files regardless of count — avoids + the inotify instance limit when tailing hundreds of files. +- On `WRITE` event: read all new lines from that file's `bufio.Reader`. +- On `RENAME`/`REMOVE` (logrotate): drain old fd to EOF, close, start retry-open goroutine with + exponential backoff. Sends the new `*os.File` back via a channel to keep map access single-threaded. +- Emits `LogRecord` structs on a shared buffered channel (capacity 200 K — absorbs ~20 s of peak). +- Accepts paths via `--logs` (comma-separated or glob) and `--logs-file` (one path/glob per line). + +### parser.go +- Parses the fixed **logtail** nginx log format — tab-separated, fixed field order, no quoting: + + ```nginx + log_format logtail '$host\t$remote_addr\t$msec\t$request_method\t$request_uri\t$status\t$body_bytes_sent\t$request_time\t$is_tor\t$asn'; + ``` + + | # | Field | Used for | + |---|-------------------|------------------| + | 0 | `$host` | website | + | 1 | `$remote_addr` | client_prefix | + | 2 | `$msec` | (discarded) | + | 3 | `$request_method` | (discarded) | + | 4 | `$request_uri` | http_request_uri | + | 5 | `$status` | http_response | + | 6 | `$body_bytes_sent`| (discarded) | + | 7 | `$request_time` | (discarded) | + | 8 | `$is_tor` | is_tor | + | 9 | `$asn` | asn | + +- `strings.SplitN(line, "\t", 10)` — ~50 ns/line. No regex. +- `$request_uri`: query string discarded at first `?`. +- `$remote_addr`: truncated to /24 (IPv4) or /48 (IPv6); prefix lengths configurable via flags. +- `$is_tor`: `1` if the client IP is a TOR exit node, `0` otherwise. Field is optional — lines + with exactly 8 fields (old format) are accepted and default to `is_tor=false`. +- `$asn`: client AS number as a decimal integer (from MaxMind GeoIP2). Field is optional — + lines without it default to `asn=0`. +- Lines with fewer than 8 fields are silently skipped. + +### store.go +- **Single aggregator goroutine** reads from the channel and updates the live map — no locking on + the hot path. At 10 K lines/s the goroutine uses <1% CPU. +- Live map: `map[Tuple6]int64`, hard-capped at 100 K entries (new keys dropped when full). +- **Minute ticker**: heap-selects top-50K entries, writes snapshot to fine ring, resets live map. +- Every 5 fine ticks: merge last 5 fine snapshots → top-5K → write to coarse ring. +- **TopN query**: RLock ring, sum bucket range, apply filter, group by dimension, heap-select top N. +- **Trend query**: per-bucket filtered sum, returns one `TrendPoint` per bucket. +- **Subscriber fan-out**: per-subscriber buffered channel; `Subscribe`/`Unsubscribe` for streaming. +- **`DumpRings()`**: acquires `RLock`, copies both ring arrays and their head/filled pointers + (just slice headers — microseconds), releases lock, then returns chronologically-ordered fine + and coarse snapshot slices. The lock is never held during serialisation or network I/O. + +### server.go +- gRPC server on configurable port (default `:9090`). +- `TopN` and `Trend`: unary, answered from the ring buffer under RLock. +- `StreamSnapshots`: registers a subscriber channel; loops `Recv` on it; 30 s keepalive ticker. +- `DumpSnapshots`: calls `DumpRings()`, streams all fine buckets (`is_coarse=false`) then all + coarse buckets (`is_coarse=true`), then closes the stream. No lock held during streaming. + +## Program 2 — Aggregator + +### subscriber.go +- One goroutine per collector. Dials, calls `StreamSnapshots`, forwards each `Snapshot` to the + merger. +- Reconnects with exponential backoff (100 ms → doubles → cap 30 s). +- After 3 consecutive failures: calls `merger.Zero(addr)` to remove that collector's contribution + from the merged view (prevents stale counts accumulating during outages). +- Resets failure count on first successful `Recv`; logs recovery. + +### merger.go +- **Delta strategy**: on each new snapshot from collector X, subtract X's previous entries from + `merged`, add the new entries, store new map. O(snapshot_size) per update — not + O(N_collectors × snapshot_size). +- `Zero(addr)`: subtracts the collector's last-known contribution and deletes its entry — called + when a collector is marked degraded. + +### cache.go +- **Tick-based rotation** (1-min ticker, not snapshot-triggered): keeps the aggregator ring aligned + to the same 1-minute cadence as collectors regardless of how many collectors are connected. +- Same tiered ring structure as the collector store; populated from `merger.TopK()` each tick. +- `QueryTopN`, `QueryTrend`, `Subscribe`/`Unsubscribe` — identical interface to collector store. +- **`LoadHistorical(fine, coarse []Snapshot)`**: writes pre-merged backfill snapshots directly into + the ring arrays under `mu.Lock()`, sets head and filled counters, then returns. Safe to call + concurrently with queries. The live ticker continues from the updated head after this returns. + +### backfill.go +- **`Backfill(ctx, collectorAddrs, cache)`**: called once at aggregator startup (in a goroutine, + after the gRPC server is already listening so the frontend is never blocked). +- Dials all collectors concurrently and calls `DumpSnapshots` on each. +- Accumulates entries per timestamp in `map[unix-second]map[label]count`; multiple collectors' + contributions for the same bucket are summed — the same delta-merge semantics as the live path. +- Sorts timestamps chronologically, runs `TopKFromMap` per bucket, caps to ring size. +- Calls `cache.LoadHistorical` once with the merged results. +- **Graceful degradation**: if a collector returns `Unimplemented` (old binary without + `DumpSnapshots`), logs an informational message and skips it — live streaming still starts + normally. Any other error is logged with timing and also skipped. Partial backfill (some + collectors succeed, some fail) is supported. +- Logs per-collector stats: bucket counts, total entry counts, and wall-clock duration. + +### registry.go +- **`TargetRegistry`**: `sync.RWMutex`-protected `map[addr → name]`. Initialised with the + configured collector addresses; display names are updated from the `source` field of the first + snapshot received from each collector. +- `Targets()` returns a stable sorted slice of `{name, addr}` pairs for `ListTargets` responses. + +### server.go +- Implements `LogtailService` backed by the cache (not live fan-out). +- `StreamSnapshots` re-streams merged fine snapshots; usable by a second-tier aggregator or + monitoring system. +- `ListTargets` returns the current `TargetRegistry` contents — all configured collectors with + their display names and gRPC addresses. + +## Program 3 — Frontend + +### handler.go +- All filter state in the **URL query string**: `w` (window), `by` (group_by), `f_website`, + `f_prefix`, `f_uri`, `f_status`, `f_website_re`, `f_uri_re`, `f_is_tor`, `f_asn`, `n`, `target`. No + server-side session — URLs are shareable and bookmarkable; multiple operators see independent views. +- **Filter expression box**: a `q=` parameter carries a mini filter language + (`status>=400 AND website~=gouda.* AND uri~=^/api/`). On submission the handler parses it + via `ParseFilterExpr` and redirects to the canonical URL with individual `f_*` params; `q=` + never appears in the final URL. Parse errors re-render the current page with an inline message. +- **Status expressions**: `f_status` accepts `200`, `!=200`, `>=400`, `<500`, etc. — parsed by + `store.ParseStatusExpr` into `(value, StatusOp)` for the filter protobuf. +- **ASN expressions**: `f_asn` accepts the same expression syntax (`12345`, `!=65000`, `>=1000`, + `<64512`, etc.) — also parsed by `store.ParseStatusExpr`, stored as `(asn_number, AsnOp)` in the + filter protobuf. +- **Regex filters**: `f_website_re` and `f_uri_re` hold RE2 patterns; compiled once per request + into `store.CompiledFilter` before the query-loop iteration. Invalid regexes match nothing. +- `TopN`, `Trend`, and `ListTargets` RPCs issued **concurrently** (all with a 5 s deadline); page + renders with whatever completes. Trend failure suppresses the sparkline; `ListTargets` failure + hides the source picker — both are non-fatal. +- **Source picker**: `ListTargets` result drives a `source:` tab row. Clicking a collector tab + sets `target=` to that collector's address, querying it directly. The "all" tab resets to the + default aggregator. Picker is hidden when `ListTargets` returns ≤0 collectors (direct collector + mode). +- **Drilldown**: clicking a table row adds the current dimension's filter and advances `by` through + `website → prefix → uri → status → asn → website` (cycles). +- **`raw=1`**: returns the TopN result as JSON — same URL, no CLI needed for scripting. +- **`target=` override**: per-request gRPC endpoint override for comparing sources. +- Error pages render at HTTP 502 with the window/group-by tabs still functional. + +### sparkline.go +- `renderSparkline([]*pb.TrendPoint) template.HTML` — fixed `viewBox="0 0 300 60"` SVG, + Y-scaled to max count, rendered as ``. Returns `""` for fewer than 2 points or + all-zero data. + +### templates/ +- `base.html`: outer shell, inline CSS (~40 lines), conditional ``. +- `index.html`: window tabs, group-by tabs, filter breadcrumb with `×` remove links, sparkline, + TopN table with `` bars (% relative to rank-1), footer with source and refresh info. +- No external CSS, no web fonts, no JavaScript. Renders in w3m/lynx. + +## Program 4 — CLI + +### Subcommands + +``` +logtail-cli topn [flags] ranked label → count table (exits after one response) +logtail-cli trend [flags] per-bucket time series (exits after one response) +logtail-cli stream [flags] live snapshot feed (runs until Ctrl-C, auto-reconnects) +logtail-cli targets [flags] list targets known to the queried endpoint +``` + +### Flags + +**Shared** (all subcommands): + +| Flag | Default | Description | +|---------------|------------------|----------------------------------------------------------| +| `--target` | `localhost:9090` | Comma-separated `host:port` list; fan-out to all | +| `--json` | false | Emit newline-delimited JSON instead of a table | +| `--website` | — | Filter: website | +| `--prefix` | — | Filter: client prefix | +| `--uri` | — | Filter: request URI | +| `--status` | — | Filter: HTTP status expression (`200`, `!=200`, `>=400`, `<500`, …) | +| `--website-re`| — | Filter: RE2 regex against website | +| `--uri-re` | — | Filter: RE2 regex against request URI | +| `--is-tor` | — | Filter: TOR traffic (`1` or `!=0` = TOR only; `0` or `!=1` = non-TOR only) | +| `--asn` | — | Filter: ASN expression (`12345`, `!=65000`, `>=1000`, `<64512`, …) | + +**`topn` only**: `--n 10`, `--window 5m`, `--group-by website` + +**`trend` only**: `--window 5m` + +### Multi-target fan-out + +`--target` accepts a comma-separated list. All targets are queried concurrently; results are +printed in order with a per-target header. Single-target output omits the header for clean +pipe-to-`jq` use. + +### Output + +Default: human-readable table with space-separated thousands (`18 432`). +`--json`: a single JSON array (one object per target) for `topn` and `trend`; NDJSON for `stream` (unbounded). + +`stream` reconnects automatically on error (5 s backoff). All other subcommands exit immediately +with a non-zero code on gRPC error. + +## Key Design Decisions + +| Decision | Rationale | +|----------|-----------| +| Single aggregator goroutine in collector | Eliminates all map lock contention on the 10 K/s hot path | +| Hard cap live map at 100 K entries | Bounds memory regardless of DDoS cardinality explosion | +| Ring buffer of sorted snapshots (not raw maps) | TopN queries avoid re-sorting; merge is a single heap pass | +| Push-based streaming (collector → aggregator) | Aggregator cache always fresh; query latency is cache-read only | +| Delta merge in aggregator | O(snapshot_size) per update, not O(N_collectors × size) | +| Tick-based cache rotation in aggregator | Ring stays on the same 1-min cadence regardless of collector count | +| Degraded collector zeroing | Stale counts from failed collectors don't accumulate in the merged view | +| Same `LogtailService` for collector and aggregator | CLI and frontend work with either; no special-casing | +| `internal/store` shared package | ring-buffer, `Tuple6` encoding, and filter logic shared between collector and aggregator | +| Filter state in URL, not session cookie | Multiple concurrent operators; shareable/bookmarkable URLs | +| Query strings stripped at ingest | Major cardinality reduction; prevents URI explosion under attack | +| No persistent storage | Simplicity; acceptable for ops dashboards (restart = lose history) | +| Trusted internal network, no TLS | Reduces operational complexity; add a TLS proxy if needed later | +| Server-side SVG sparklines, meta-refresh | Zero JS dependencies; works in terminal browsers and curl | +| CLI default: human-readable table | Operator-friendly by default; `--json` opt-in for scripting | +| CLI multi-target fan-out | Compare a collector vs. aggregator, or two collectors, in one command | +| CLI uses stdlib `flag`, no framework | Four subcommands don't justify a dependency | +| Status filter as expression string (`!=200`, `>=400`) | Operator-friendly; parsed once at query boundary, encoded as `(int32, StatusOp)` in proto | +| ASN filter reuses `StatusOp` and `ParseStatusExpr` | Same 6-operator grammar as status; no duplicate enum or parser needed | +| Regex filters compiled once per query (`CompiledFilter`) | Up to 288 × 5 000 per-entry calls — compiling per-entry would dominate query latency | +| Filter expression box (`q=`) redirects to canonical URL | Filter state stays in individual `f_*` params; URLs remain shareable and bookmarkable | +| `ListTargets` + frontend source picker | "Which nginx is busiest?" answered by switching `target=` to a collector; no data model changes, no extra memory | +| Backfill via `DumpSnapshots` on restart | Aggregator recovers full 24h ring from collectors on restart; gRPC server starts first so frontend is never blocked during backfill | +| `DumpRings()` copies under lock, streams without lock | Lock held for microseconds (slice-header copy only); network I/O happens outside the lock so minute rotation is never delayed | +| Backfill merges per-timestamp across collectors | Correct cross-collector sums per bucket, same semantics as live delta-merge; collectors that don't support `DumpSnapshots` are skipped gracefully | diff --git a/PLAN_AGGREGATOR.md b/docs/PLAN_AGGREGATOR.md similarity index 100% rename from PLAN_AGGREGATOR.md rename to docs/PLAN_AGGREGATOR.md diff --git a/PLAN_CLI.md b/docs/PLAN_CLI.md similarity index 100% rename from PLAN_CLI.md rename to docs/PLAN_CLI.md diff --git a/PLAN_COLLECTOR.md b/docs/PLAN_COLLECTOR.md similarity index 100% rename from PLAN_COLLECTOR.md rename to docs/PLAN_COLLECTOR.md diff --git a/PLAN_FRONTEND.md b/docs/PLAN_FRONTEND.md similarity index 100% rename from PLAN_FRONTEND.md rename to docs/PLAN_FRONTEND.md