Refactor docs

This commit is contained in:
2026-03-27 01:20:19 +01:00
parent 50fc94b87d
commit 862d043376
6 changed files with 544 additions and 513 deletions

528
docs/DETAILS.md Normal file
View File

@@ -0,0 +1,528 @@
PREAMBLE
Although this computer program has a permissive license (AP2.0), if you came here looking to ask
questions, you're better off just moving on :) This program is shared AS-IS and really without any
intent for anybody but IPng Networks to use it. Also, in case the structure of the repo and the
style of this README wasn't already clear, this program is 100% written and maintained by Claude
Code.
You have been warned :)
SPECIFICATION
This project contains four programs:
1) A **collector** that tails any number of nginx log files and maintains an in-memory structure of
`{website, client_prefix, http_request_uri, http_response, is_tor, asn}` counts across all files.
It answers TopN and Trend queries via gRPC and pushes minute snapshots to the aggregator via
server-streaming. It also exposes a Prometheus `/metrics` endpoint (default `:9100`) with per-host
request counters and response-body/request-time histograms.
Runs on each nginx machine in the cluster. No UI — gRPC and HTTP interfaces only.
2) An **aggregator** that subscribes to the snapshot stream from all collectors, merges their data
into a unified in-memory cache, and exposes the same gRPC interface. Answers questions like "what
is the busiest website globally", "which client prefix is causing the most HTTP 503s", and shows
trending information useful for DDoS detection. Runs on a central machine.
3) An **HTTP frontend** companion to the aggregator that renders a drilldown dashboard. Operators
can restrict by `http_response=429`, then by `website=www.example.com`, and so on. Works with
either a collector or aggregator as its backend. Zero JavaScript — server-rendered HTML with inline
SVG sparklines and meta-refresh.
4) A **CLI** for shell-based debugging. Sends `topn`, `trend`, and `stream` queries to any
collector or aggregator, fans out to multiple targets in parallel, and outputs human-readable
tables or newline-delimited JSON.
Programs are written in Go. No CGO, no external runtime dependencies.
---
![nginx-logtail frontend](docs/frontend.png)
---
DEPLOYMENT
## Docker
All four binaries are published in a single image: `git.ipng.ch/ipng/nginx-logtail`.
The image is built with a two-stage Dockerfile: a `golang:1.24-alpine` builder produces
statically-linked, stripped binaries (`CGO_ENABLED=0`, `-trimpath -ldflags="-s -w"`); the final
stage is `scratch` — no OS, no shell, no runtime dependencies. Each binary is invoked explicitly
via the container `command`.
### Build and push
```
docker compose build --push
```
### Running aggregator + frontend
The `docker-compose.yml` in the repo root runs the aggregator and frontend together. At minimum,
set `AGGREGATOR_COLLECTORS` to the comma-separated `host:port` list of your collector(s):
```sh
AGGREGATOR_COLLECTORS=nginx1:9090,nginx2:9090 docker compose up -d
```
The frontend reaches the aggregator at `aggregator:9091` via Docker's internal DNS. The frontend
UI is available on port `8080`.
### Environment variables
All flags have environment variable equivalents. CLI flags take precedence over env vars.
**collector** (runs on each nginx host, not in Docker):
| Env var | Flag | Default |
|--------------------------|-------------------|-------------|
| `COLLECTOR_LISTEN` | `-listen` | `:9090` |
| `COLLECTOR_PROM_LISTEN` | `-prom-listen` | `:9100` |
| `COLLECTOR_LOGS` | `-logs` | — |
| `COLLECTOR_LOGS_FILE` | `-logs-file` | — |
| `COLLECTOR_SOURCE` | `-source` | hostname |
| `COLLECTOR_V4PREFIX` | `-v4prefix` | `24` |
| `COLLECTOR_V6PREFIX` | `-v6prefix` | `48` |
| `COLLECTOR_SCAN_INTERVAL`| `-scan-interval` | `10s` |
**aggregator**:
| Env var | Flag | Default |
|--------------------------|---------------|-------------|
| `AGGREGATOR_LISTEN` | `-listen` | `:9091` |
| `AGGREGATOR_COLLECTORS` | `-collectors` | — (required)|
| `AGGREGATOR_SOURCE` | `-source` | hostname |
**frontend**:
| Env var | Flag | Default |
|------------------|------------|-------------------|
| `FRONTEND_LISTEN`| `-listen` | `:8080` |
| `FRONTEND_TARGET`| `-target` | `localhost:9091` |
| `FRONTEND_N` | `-n` | `25` |
| `FRONTEND_REFRESH`| `-refresh`| `30` |
---
DESIGN
## Directory Layout
```
nginx-logtail/
├── proto/
│ ├── logtail.proto # shared protobuf definitions
│ └── logtailpb/
│ ├── logtail.pb.go # generated: messages, enums
│ └── logtail_grpc.pb.go # generated: service stubs
├── internal/
│ └── store/
│ └── store.go # shared types: Tuple6, Entry, Snapshot, ring helpers
└── cmd/
├── collector/
│ ├── main.go
│ ├── tailer.go # MultiTailer: tail N files via one shared fsnotify watcher
│ ├── parser.go # tab-separated logtail log_format parser (~50 ns/line)
│ ├── store.go # bounded top-K in-memory store + tiered ring buffers
│ └── server.go # gRPC server: TopN, Trend, StreamSnapshots
├── aggregator/
│ ├── main.go
│ ├── subscriber.go # one goroutine per collector; StreamSnapshots with backoff
│ ├── merger.go # delta-merge: O(snapshot_size) per update
│ ├── cache.go # tick-based ring buffer cache served to clients
│ ├── registry.go # TargetRegistry: addr→name map updated from snapshot sources
│ └── server.go # gRPC server (same surface as collector)
├── frontend/
│ ├── main.go
│ ├── handler.go # URL param parsing, concurrent TopN+Trend, template exec
│ ├── filter.go # ParseFilterExpr / FilterExprString mini filter language
│ ├── client.go # gRPC dial helper
│ ├── sparkline.go # TrendPoints → inline SVG polyline
│ ├── format.go # fmtCount (space thousands separator)
│ └── templates/
│ ├── base.html # outer HTML shell, inline CSS, meta-refresh
│ └── index.html # window tabs, group-by tabs, breadcrumb, table, footer
└── cli/
├── main.go # subcommand dispatch and usage
├── flags.go # shared flags, parseTargets, buildFilter, parseWindow
├── client.go # gRPC dial helper
├── format.go # printTable, fmtCount, fmtTime, targetHeader
├── cmd_topn.go # topn: concurrent fan-out, table + JSON output
├── cmd_trend.go # trend: concurrent fan-out, table + JSON output
├── cmd_stream.go # stream: multiplexed streams, auto-reconnect
└── cmd_targets.go # targets: list collectors known to the endpoint
```
## Data Model
The core unit is a **count keyed by six dimensions**:
| Field | Description | Example |
|-------------------|------------------------------------------------------|-------------------|
| `website` | nginx `$host` | `www.example.com` |
| `client_prefix` | client IP truncated to /24 IPv4 or /48 IPv6 | `1.2.3.0/24` |
| `http_request_uri`| `$request_uri` path only — query string stripped | `/api/v1/search` |
| `http_response` | HTTP status code | `429` |
| `is_tor` | whether the client IP is a TOR exit node | `1` |
| `asn` | client AS number (MaxMind GeoIP2, 32-bit int) | `8298` |
## Time Windows & Tiered Ring Buffers
Two ring buffers at different resolutions cover all query windows up to 24 hours:
| Tier | Bucket size | Buckets | Top-K/bucket | Covers | Roll-up trigger |
|--------|-------------|---------|--------------|--------|---------------------|
| Fine | 1 min | 60 | 50 000 | 1 h | every minute |
| Coarse | 5 min | 288 | 5 000 | 24 h | every 5 fine ticks |
Supported query windows and which tier they read from:
| Window | Tier | Buckets summed |
|--------|--------|----------------|
| 1 min | fine | last 1 |
| 5 min | fine | last 5 |
| 15 min | fine | last 15 |
| 60 min | fine | all 60 |
| 6 h | coarse | last 72 |
| 24 h | coarse | all 288 |
Every minute: snapshot live map → top-50K → append to fine ring, reset live map.
Every 5 minutes: merge last 5 fine snapshots → top-5K → append to coarse ring.
## Memory Budget (Collector, target ≤ 1 GB)
Entry size: ~30 B website + ~15 B prefix + ~50 B URI + 3 B status + 1 B is_tor + 4 B asn + 8 B count + ~80 B Go map
overhead ≈ **~191 bytes per entry**.
| Structure | Entries | Size |
|-------------------------|-------------|-------------|
| Live map (capped) | 100 000 | ~19 MB |
| Fine ring (60 × 1-min) | 60 × 50 000 | ~558 MB |
| Coarse ring (288 × 5-min)| 288 × 5 000| ~268 MB |
| **Total** | | **~845 MB** |
The live map is **hard-capped at 100 K entries**. Once full, only updates to existing keys are
accepted; new keys are dropped until the next rotation resets the map. This keeps memory bounded
regardless of attack cardinality.
## Future Work — ClickHouse Export (post-MVP)
> **Do not implement until the end-to-end MVP is running.**
The aggregator will optionally write 1-minute pre-aggregated rows to ClickHouse for 7d/30d
historical views. Schema sketch:
```sql
CREATE TABLE logtail (
ts DateTime,
website LowCardinality(String),
client_prefix String,
request_uri LowCardinality(String),
status UInt16,
count UInt64
) ENGINE = SummingMergeTree(count)
PARTITION BY toYYYYMMDD(ts)
ORDER BY (ts, website, status, client_prefix, request_uri);
```
The frontend routes `window=7d|30d` queries to ClickHouse; all shorter windows continue to use
the in-memory cache. Kafka is not needed — the aggregator writes directly. This is purely additive
and does not change any existing interface.
## Protobuf API (`proto/logtail.proto`)
```protobuf
enum TorFilter { TOR_ANY = 0; TOR_YES = 1; TOR_NO = 2; }
enum StatusOp { EQ = 0; NE = 1; GT = 2; GE = 3; LT = 4; LE = 5; }
message Filter {
optional string website = 1;
optional string client_prefix = 2;
optional string http_request_uri = 3;
optional int32 http_response = 4;
StatusOp status_op = 5; // comparison operator for http_response
optional string website_regex = 6; // RE2 regex against website
optional string uri_regex = 7; // RE2 regex against http_request_uri
TorFilter tor = 8; // TOR_ANY (default) / TOR_YES / TOR_NO
optional int32 asn_number = 9; // filter by client ASN
StatusOp asn_op = 10; // comparison operator for asn_number
}
enum GroupBy { WEBSITE = 0; CLIENT_PREFIX = 1; REQUEST_URI = 2; HTTP_RESPONSE = 3; ASN_NUMBER = 4; }
enum Window { W1M = 0; W5M = 1; W15M = 2; W60M = 3; W6H = 4; W24H = 5; }
message TopNRequest { Filter filter = 1; GroupBy group_by = 2; int32 n = 3; Window window = 4; }
message TopNEntry { string label = 1; int64 count = 2; }
message TopNResponse { repeated TopNEntry entries = 1; string source = 2; }
// Trend: one total count per minute (or 5-min) bucket, for sparklines
message TrendRequest { Filter filter = 1; Window window = 4; }
message TrendPoint { int64 timestamp_unix = 1; int64 count = 2; }
message TrendResponse { repeated TrendPoint points = 1; string source = 2; }
// Streaming: collector pushes a fine snapshot after every minute rotation
message SnapshotRequest {}
message Snapshot {
string source = 1;
int64 timestamp = 2;
repeated TopNEntry entries = 3; // full top-50K for this bucket
bool is_coarse = 4; // true for 5-min coarse buckets (DumpSnapshots only)
}
// Target discovery: list the collectors behind the queried endpoint
message ListTargetsRequest {}
message TargetInfo {
string name = 1; // display name (--source value from the collector)
string addr = 2; // gRPC address; empty string means "this endpoint itself"
}
message ListTargetsResponse { repeated TargetInfo targets = 1; }
// Backfill: dump full ring buffer contents for aggregator restart recovery
message DumpSnapshotsRequest {}
// Response reuses Snapshot; is_coarse distinguishes fine (1-min) from coarse (5-min) buckets.
// Stream closes after all historical data is sent (unlike StreamSnapshots which stays open).
service LogtailService {
rpc TopN(TopNRequest) returns (TopNResponse);
rpc Trend(TrendRequest) returns (TrendResponse);
rpc StreamSnapshots(SnapshotRequest) returns (stream Snapshot);
rpc ListTargets(ListTargetsRequest) returns (ListTargetsResponse);
rpc DumpSnapshots(DumpSnapshotsRequest) returns (stream Snapshot);
}
// Both collector and aggregator implement LogtailService.
// The aggregator's StreamSnapshots re-streams the merged view.
// ListTargets: aggregator returns all configured collectors; collector returns itself.
// DumpSnapshots: collector only; aggregator calls this on startup to backfill its ring.
```
## Program 1 — Collector
### tailer.go
- **`MultiTailer`**: one shared `fsnotify.Watcher` for all files regardless of count — avoids
the inotify instance limit when tailing hundreds of files.
- On `WRITE` event: read all new lines from that file's `bufio.Reader`.
- On `RENAME`/`REMOVE` (logrotate): drain old fd to EOF, close, start retry-open goroutine with
exponential backoff. Sends the new `*os.File` back via a channel to keep map access single-threaded.
- Emits `LogRecord` structs on a shared buffered channel (capacity 200 K — absorbs ~20 s of peak).
- Accepts paths via `--logs` (comma-separated or glob) and `--logs-file` (one path/glob per line).
### parser.go
- Parses the fixed **logtail** nginx log format — tab-separated, fixed field order, no quoting:
```nginx
log_format logtail '$host\t$remote_addr\t$msec\t$request_method\t$request_uri\t$status\t$body_bytes_sent\t$request_time\t$is_tor\t$asn';
```
| # | Field | Used for |
|---|-------------------|------------------|
| 0 | `$host` | website |
| 1 | `$remote_addr` | client_prefix |
| 2 | `$msec` | (discarded) |
| 3 | `$request_method` | (discarded) |
| 4 | `$request_uri` | http_request_uri |
| 5 | `$status` | http_response |
| 6 | `$body_bytes_sent`| (discarded) |
| 7 | `$request_time` | (discarded) |
| 8 | `$is_tor` | is_tor |
| 9 | `$asn` | asn |
- `strings.SplitN(line, "\t", 10)` — ~50 ns/line. No regex.
- `$request_uri`: query string discarded at first `?`.
- `$remote_addr`: truncated to /24 (IPv4) or /48 (IPv6); prefix lengths configurable via flags.
- `$is_tor`: `1` if the client IP is a TOR exit node, `0` otherwise. Field is optional — lines
with exactly 8 fields (old format) are accepted and default to `is_tor=false`.
- `$asn`: client AS number as a decimal integer (from MaxMind GeoIP2). Field is optional —
lines without it default to `asn=0`.
- Lines with fewer than 8 fields are silently skipped.
### store.go
- **Single aggregator goroutine** reads from the channel and updates the live map — no locking on
the hot path. At 10 K lines/s the goroutine uses <1% CPU.
- Live map: `map[Tuple6]int64`, hard-capped at 100 K entries (new keys dropped when full).
- **Minute ticker**: heap-selects top-50K entries, writes snapshot to fine ring, resets live map.
- Every 5 fine ticks: merge last 5 fine snapshots → top-5K → write to coarse ring.
- **TopN query**: RLock ring, sum bucket range, apply filter, group by dimension, heap-select top N.
- **Trend query**: per-bucket filtered sum, returns one `TrendPoint` per bucket.
- **Subscriber fan-out**: per-subscriber buffered channel; `Subscribe`/`Unsubscribe` for streaming.
- **`DumpRings()`**: acquires `RLock`, copies both ring arrays and their head/filled pointers
(just slice headers — microseconds), releases lock, then returns chronologically-ordered fine
and coarse snapshot slices. The lock is never held during serialisation or network I/O.
### server.go
- gRPC server on configurable port (default `:9090`).
- `TopN` and `Trend`: unary, answered from the ring buffer under RLock.
- `StreamSnapshots`: registers a subscriber channel; loops `Recv` on it; 30 s keepalive ticker.
- `DumpSnapshots`: calls `DumpRings()`, streams all fine buckets (`is_coarse=false`) then all
coarse buckets (`is_coarse=true`), then closes the stream. No lock held during streaming.
## Program 2 — Aggregator
### subscriber.go
- One goroutine per collector. Dials, calls `StreamSnapshots`, forwards each `Snapshot` to the
merger.
- Reconnects with exponential backoff (100 ms → doubles → cap 30 s).
- After 3 consecutive failures: calls `merger.Zero(addr)` to remove that collector's contribution
from the merged view (prevents stale counts accumulating during outages).
- Resets failure count on first successful `Recv`; logs recovery.
### merger.go
- **Delta strategy**: on each new snapshot from collector X, subtract X's previous entries from
`merged`, add the new entries, store new map. O(snapshot_size) per update — not
O(N_collectors × snapshot_size).
- `Zero(addr)`: subtracts the collector's last-known contribution and deletes its entry — called
when a collector is marked degraded.
### cache.go
- **Tick-based rotation** (1-min ticker, not snapshot-triggered): keeps the aggregator ring aligned
to the same 1-minute cadence as collectors regardless of how many collectors are connected.
- Same tiered ring structure as the collector store; populated from `merger.TopK()` each tick.
- `QueryTopN`, `QueryTrend`, `Subscribe`/`Unsubscribe` — identical interface to collector store.
- **`LoadHistorical(fine, coarse []Snapshot)`**: writes pre-merged backfill snapshots directly into
the ring arrays under `mu.Lock()`, sets head and filled counters, then returns. Safe to call
concurrently with queries. The live ticker continues from the updated head after this returns.
### backfill.go
- **`Backfill(ctx, collectorAddrs, cache)`**: called once at aggregator startup (in a goroutine,
after the gRPC server is already listening so the frontend is never blocked).
- Dials all collectors concurrently and calls `DumpSnapshots` on each.
- Accumulates entries per timestamp in `map[unix-second]map[label]count`; multiple collectors'
contributions for the same bucket are summed — the same delta-merge semantics as the live path.
- Sorts timestamps chronologically, runs `TopKFromMap` per bucket, caps to ring size.
- Calls `cache.LoadHistorical` once with the merged results.
- **Graceful degradation**: if a collector returns `Unimplemented` (old binary without
`DumpSnapshots`), logs an informational message and skips it — live streaming still starts
normally. Any other error is logged with timing and also skipped. Partial backfill (some
collectors succeed, some fail) is supported.
- Logs per-collector stats: bucket counts, total entry counts, and wall-clock duration.
### registry.go
- **`TargetRegistry`**: `sync.RWMutex`-protected `map[addr → name]`. Initialised with the
configured collector addresses; display names are updated from the `source` field of the first
snapshot received from each collector.
- `Targets()` returns a stable sorted slice of `{name, addr}` pairs for `ListTargets` responses.
### server.go
- Implements `LogtailService` backed by the cache (not live fan-out).
- `StreamSnapshots` re-streams merged fine snapshots; usable by a second-tier aggregator or
monitoring system.
- `ListTargets` returns the current `TargetRegistry` contents — all configured collectors with
their display names and gRPC addresses.
## Program 3 — Frontend
### handler.go
- All filter state in the **URL query string**: `w` (window), `by` (group_by), `f_website`,
`f_prefix`, `f_uri`, `f_status`, `f_website_re`, `f_uri_re`, `f_is_tor`, `f_asn`, `n`, `target`. No
server-side session — URLs are shareable and bookmarkable; multiple operators see independent views.
- **Filter expression box**: a `q=` parameter carries a mini filter language
(`status>=400 AND website~=gouda.* AND uri~=^/api/`). On submission the handler parses it
via `ParseFilterExpr` and redirects to the canonical URL with individual `f_*` params; `q=`
never appears in the final URL. Parse errors re-render the current page with an inline message.
- **Status expressions**: `f_status` accepts `200`, `!=200`, `>=400`, `<500`, etc. — parsed by
`store.ParseStatusExpr` into `(value, StatusOp)` for the filter protobuf.
- **ASN expressions**: `f_asn` accepts the same expression syntax (`12345`, `!=65000`, `>=1000`,
`<64512`, etc.) — also parsed by `store.ParseStatusExpr`, stored as `(asn_number, AsnOp)` in the
filter protobuf.
- **Regex filters**: `f_website_re` and `f_uri_re` hold RE2 patterns; compiled once per request
into `store.CompiledFilter` before the query-loop iteration. Invalid regexes match nothing.
- `TopN`, `Trend`, and `ListTargets` RPCs issued **concurrently** (all with a 5 s deadline); page
renders with whatever completes. Trend failure suppresses the sparkline; `ListTargets` failure
hides the source picker — both are non-fatal.
- **Source picker**: `ListTargets` result drives a `source:` tab row. Clicking a collector tab
sets `target=` to that collector's address, querying it directly. The "all" tab resets to the
default aggregator. Picker is hidden when `ListTargets` returns ≤0 collectors (direct collector
mode).
- **Drilldown**: clicking a table row adds the current dimension's filter and advances `by` through
`website → prefix → uri → status → asn → website` (cycles).
- **`raw=1`**: returns the TopN result as JSON — same URL, no CLI needed for scripting.
- **`target=` override**: per-request gRPC endpoint override for comparing sources.
- Error pages render at HTTP 502 with the window/group-by tabs still functional.
### sparkline.go
- `renderSparkline([]*pb.TrendPoint) template.HTML` — fixed `viewBox="0 0 300 60"` SVG,
Y-scaled to max count, rendered as `<polyline>`. Returns `""` for fewer than 2 points or
all-zero data.
### templates/
- `base.html`: outer shell, inline CSS (~40 lines), conditional `<meta http-equiv="refresh">`.
- `index.html`: window tabs, group-by tabs, filter breadcrumb with `×` remove links, sparkline,
TopN table with `<meter>` bars (% relative to rank-1), footer with source and refresh info.
- No external CSS, no web fonts, no JavaScript. Renders in w3m/lynx.
## Program 4 — CLI
### Subcommands
```
logtail-cli topn [flags] ranked label → count table (exits after one response)
logtail-cli trend [flags] per-bucket time series (exits after one response)
logtail-cli stream [flags] live snapshot feed (runs until Ctrl-C, auto-reconnects)
logtail-cli targets [flags] list targets known to the queried endpoint
```
### Flags
**Shared** (all subcommands):
| Flag | Default | Description |
|---------------|------------------|----------------------------------------------------------|
| `--target` | `localhost:9090` | Comma-separated `host:port` list; fan-out to all |
| `--json` | false | Emit newline-delimited JSON instead of a table |
| `--website` | — | Filter: website |
| `--prefix` | — | Filter: client prefix |
| `--uri` | — | Filter: request URI |
| `--status` | — | Filter: HTTP status expression (`200`, `!=200`, `>=400`, `<500`, …) |
| `--website-re`| — | Filter: RE2 regex against website |
| `--uri-re` | — | Filter: RE2 regex against request URI |
| `--is-tor` | — | Filter: TOR traffic (`1` or `!=0` = TOR only; `0` or `!=1` = non-TOR only) |
| `--asn` | — | Filter: ASN expression (`12345`, `!=65000`, `>=1000`, `<64512`, …) |
**`topn` only**: `--n 10`, `--window 5m`, `--group-by website`
**`trend` only**: `--window 5m`
### Multi-target fan-out
`--target` accepts a comma-separated list. All targets are queried concurrently; results are
printed in order with a per-target header. Single-target output omits the header for clean
pipe-to-`jq` use.
### Output
Default: human-readable table with space-separated thousands (`18 432`).
`--json`: a single JSON array (one object per target) for `topn` and `trend`; NDJSON for `stream` (unbounded).
`stream` reconnects automatically on error (5 s backoff). All other subcommands exit immediately
with a non-zero code on gRPC error.
## Key Design Decisions
| Decision | Rationale |
|----------|-----------|
| Single aggregator goroutine in collector | Eliminates all map lock contention on the 10 K/s hot path |
| Hard cap live map at 100 K entries | Bounds memory regardless of DDoS cardinality explosion |
| Ring buffer of sorted snapshots (not raw maps) | TopN queries avoid re-sorting; merge is a single heap pass |
| Push-based streaming (collector → aggregator) | Aggregator cache always fresh; query latency is cache-read only |
| Delta merge in aggregator | O(snapshot_size) per update, not O(N_collectors × size) |
| Tick-based cache rotation in aggregator | Ring stays on the same 1-min cadence regardless of collector count |
| Degraded collector zeroing | Stale counts from failed collectors don't accumulate in the merged view |
| Same `LogtailService` for collector and aggregator | CLI and frontend work with either; no special-casing |
| `internal/store` shared package | ring-buffer, `Tuple6` encoding, and filter logic shared between collector and aggregator |
| Filter state in URL, not session cookie | Multiple concurrent operators; shareable/bookmarkable URLs |
| Query strings stripped at ingest | Major cardinality reduction; prevents URI explosion under attack |
| No persistent storage | Simplicity; acceptable for ops dashboards (restart = lose history) |
| Trusted internal network, no TLS | Reduces operational complexity; add a TLS proxy if needed later |
| Server-side SVG sparklines, meta-refresh | Zero JS dependencies; works in terminal browsers and curl |
| CLI default: human-readable table | Operator-friendly by default; `--json` opt-in for scripting |
| CLI multi-target fan-out | Compare a collector vs. aggregator, or two collectors, in one command |
| CLI uses stdlib `flag`, no framework | Four subcommands don't justify a dependency |
| Status filter as expression string (`!=200`, `>=400`) | Operator-friendly; parsed once at query boundary, encoded as `(int32, StatusOp)` in proto |
| ASN filter reuses `StatusOp` and `ParseStatusExpr` | Same 6-operator grammar as status; no duplicate enum or parser needed |
| Regex filters compiled once per query (`CompiledFilter`) | Up to 288 × 5 000 per-entry calls — compiling per-entry would dominate query latency |
| Filter expression box (`q=`) redirects to canonical URL | Filter state stays in individual `f_*` params; URLs remain shareable and bookmarkable |
| `ListTargets` + frontend source picker | "Which nginx is busiest?" answered by switching `target=` to a collector; no data model changes, no extra memory |
| Backfill via `DumpSnapshots` on restart | Aggregator recovers full 24h ring from collectors on restart; gRPC server starts first so frontend is never blocked during backfill |
| `DumpRings()` copies under lock, streams without lock | Lock held for microseconds (slice-header copy only); network I/O happens outside the lock so minute rotation is never delayed |
| Backfill merges per-timestamp across collectors | Correct cross-collector sums per bucket, same semantics as live delta-merge; collectors that don't support `DumpSnapshots` are skipped gracefully |

250
docs/PLAN_AGGREGATOR.md Normal file
View File

@@ -0,0 +1,250 @@
# Aggregator v0 — Implementation Plan
Module path: `git.ipng.ch/ipng/nginx-logtail`
**Scope:** A working aggregator that subscribes to `StreamSnapshots` from all configured
collectors, maintains a merged in-memory cache, and serves the same `LogtailService` gRPC
interface as the collector. Tolerates partial collector failures.
---
## Step 1 — Extract shared logic to `internal/store`
The aggregator's cache is structurally identical to the collector's store: same `Entry` and
`snapshot` types, same tiered ring buffers, same heap-based top-K, same label encoding
(`encodeTuple`, `labelTuple`), same `matchesFilter` and `dimensionLabel` functions.
Rather than duplicating ~200 lines of load-bearing code, extract these to a shared internal
package before writing any aggregator code. Then refactor the collector to import it.
**New package: `internal/store`**
Move from `cmd/collector/store.go` into `internal/store/store.go`:
- `Tuple4` struct
- `Entry` struct
- `snapshot` struct (unexported → exported: `Snapshot`)
- `entryHeap` + heap interface methods
- `encodeTuple`, `labelTuple`, `splitN`, `indexOf`
- `matchesFilter`, `dimensionLabel`
- `topKFromMap`, `topK`
- `trendPoint`
- `ringView`, `bucketsForWindow`
- All ring-buffer constants (`fineRingSize`, `coarseRingSize`, `fineTopK`, `coarseTopK`,
`coarseEvery`)
Keep in `cmd/collector/store.go` (collector-specific):
- `liveMapCap`
- `Store` struct (live map + ring buffers + subscriber fan-out + `Run` goroutine)
- `ingest`, `rotate`, `mergeFineBuckets`
- `QueryTopN`, `QueryTrend`, `Subscribe`, `Unsubscribe`, `broadcast`
- The `Store` embeds the ring buffers using the types from `internal/store`
Collector tests must continue to pass unchanged after the refactor.
---
## Step 2 — subscriber.go
One goroutine per collector. Dials the collector, calls `StreamSnapshots`, and forwards each
received `pb.Snapshot` to the merger. Reconnects with exponential backoff on any stream error.
```
CollectorSub struct:
addr string
merger *Merger
source string // filled from first snapshot received
fails int // consecutive failures
```
Lifecycle:
1. `Dial(addr)``client.StreamSnapshots(ctx, &pb.SnapshotRequest{})`
2. Loop: `stream.Recv()``merger.Apply(snap)`; on error: close, `fails++`
3. If `fails >= 3`: call `merger.Zero(addr)`, log degraded warning
4. Backoff sleep (100 ms → doubles → cap 30 s), then go to step 1
5. On successful `Recv()` after degraded: `fails = 0`, log recovery
Context cancellation exits the goroutine cleanly.
---
## Step 3 — merger.go
Maintains the per-collector maps and a single running merged map. Uses a delta strategy:
when a new snapshot arrives from collector X, subtract X's previous entries from `merged`,
add the new entries, and replace X's stored map. This is O(snapshot_size) rather than
O(N_collectors × snapshot_size).
```
Merger struct:
mu sync.Mutex
perCollector map[string]map[string]int64 // addr → (label → count)
merged map[string]int64 // label → total count across all collectors
```
Methods:
- `Apply(snap *pb.Snapshot)` — lock, subtract old, add new, store new, unlock
- `Zero(addr string)` — lock, subtract perCollector[addr] from merged, delete entry, unlock
- `TopK(k int) []store.Entry` — lock, call `store.TopKFromMap(merged, k)`, unlock
`Apply` is called from multiple subscriber goroutines concurrently — the mutex is the only
synchronisation point. No channels needed here.
---
## Step 4 — cache.go
The aggregator's equivalent of the collector's `Store`. Holds the tiered ring buffers and
answers `TopN`/`Trend`/`StreamSnapshots` queries. Populated by a 1-minute ticker that snapshots
the current merged view from the merger.
```
Cache struct:
source string
merger *Merger
mu sync.RWMutex
fineRing [fineRingSize]store.Snapshot
fineHead int
fineFilled int
coarseRing [coarseRingSize]store.Snapshot
coarseHead int
coarseFilled int
fineTick int
subMu sync.Mutex
subs map[chan store.Snapshot]struct{}
```
`Run(ctx context.Context)`:
- 1-minute ticker → `rotate(time.Now())`
- `rotate`: `merger.TopK(fineTopK)` → fine ring slot; every 5 ticks → merge last 5 fine slots
into coarse ring slot (identical logic to collector `Store.rotate`)
- After writing: broadcast fine snapshot to subscribers
`QueryTopN`, `QueryTrend`, `Subscribe`, `Unsubscribe`, `broadcast`: identical to collector
`Store`, backed by `internal/store` helpers.
**Why tick-based and not snapshot-triggered?**
Collectors send snapshots roughly once per minute but not in sync. Triggering a ring write on
every incoming snapshot would produce N writes per minute (one per collector), inflating the ring
and misaligning time windows. A single ticker keeps the aggregator ring aligned with the same
1-minute cadence as the collectors.
---
## Step 5 — server.go
Identical structure to `cmd/collector/server.go`. Implements `pb.LogtailServiceServer` backed by
the `Cache` instead of the collector's `Store`. No new logic; just a different backing type.
`StreamSnapshots` sends merged fine snapshots (from `cache.Subscribe`) to downstream consumers
(frontend, CLI, or a second-tier aggregator).
---
## Step 6 — main.go
Flags:
| Flag | Default | Description |
|----------------|--------------|--------------------------------------------------------|
| `--listen` | `:9091` | gRPC listen address |
| `--collectors` | — | Comma-separated `host:port` addresses of collectors |
| `--source` | hostname | Name for this aggregator in query responses |
Wire-up:
1. Parse collector addresses
2. Create `Merger`
3. Create `Cache(merger, source)`
4. Start `cache.Run(ctx)` goroutine (ticker + ring rotation)
5. Start one `CollectorSub.Run(ctx)` goroutine per collector address
6. Start gRPC server
7. `signal.NotifyContext` for clean shutdown on SIGINT/SIGTERM
---
## Step 7 — Tests
| Test | What it covers |
|------|----------------|
| `TestMergerApply` | Two collectors send snapshots; merged map sums correctly |
| `TestMergerReplacement` | Second snapshot from same collector replaces first, not adds |
| `TestMergerZero` | Marking a collector degraded removes its contribution from merged |
| `TestMergerConcurrent` | `Apply` and `Zero` from concurrent goroutines; no race (run with `-race`) |
| `TestCacheRotation` | After one ticker fire, fine ring has 1 entry with correct counts |
| `TestCacheCoarseRing` | After 5 ticker fires, coarse ring has 1 entry |
| `TestCacheQueryTopN` | TopN returns correct merged rankings |
| `TestCacheQueryTrend` | Trend returns per-bucket sums oldest-first |
| `TestCacheSubscribe` | Subscriber receives snapshot after each rotation |
| `TestGRPCEndToEnd` | Two in-process fake collector servers; real aggregator dials them; TopN, Trend, StreamSnapshots verified |
All existing collector tests must continue to pass after the `internal/store` refactor.
---
## Step 8 — Smoke test
- Start two collector instances pointing at generated log files
- Start the aggregator pointing at both
- Use `grpcurl` to call `TopN` on the aggregator and confirm counts match the sum of the two
individual collector `TopN` results
- Kill one collector; confirm the aggregator continues serving and logs a degraded warning
- Restart the killed collector; confirm the aggregator recovers and resumes merging
---
## ✓ COMPLETE — Implementation notes
### Deviations from the plan
- **`TestMergerZeroNonexistent` added**: Plan listed 10 tests; an extra test was added to cover
`Zero()` on a source that never sent a snapshot (should be a no-op). Total: 13 tests.
- **`TestDegradedCollector` in end-to-end section**: Rather than a separate block, degraded
behaviour is tested with one real fake collector + one unreachable port in the same test file.
- **Race in `TestGRPCEndToEnd`**: The `cache.rotate()` call to trigger a broadcast needed a
50 ms sleep after `client.StreamSnapshots()` to allow the server goroutine to register its
subscriber before the broadcast fired. Without it the test was intermittently flaky under
the race detector and parallel test runs.
- **`source` field not stored on `CollectorSub`**: Plan mentioned storing `source` from the first
snapshot, but `Apply` uses `snap.Source` directly (keying `perCollector` by address). The
`source` field was not needed on the struct.
### Test results
```
$ go test ./... -count=1 -race -timeout 60s
ok git.ipng.ch/ipng/nginx-logtail/cmd/aggregator 4.1s
ok git.ipng.ch/ipng/nginx-logtail/cmd/collector 9.7s
```
All 13 aggregator tests and all 17 collector tests pass with `-race`.
### Test inventory
| Test | Package | What it covers |
|------|---------|----------------|
| `TestMergerApply` | aggregator | Two collectors sum correctly |
| `TestMergerReplacement` | aggregator | Second snapshot replaces, not adds |
| `TestMergerZero` | aggregator | Degraded collector removed from merged |
| `TestMergerZeroNonexistent` | aggregator | Zero on unknown source is a no-op |
| `TestMergerConcurrent` | aggregator | Apply + Zero from concurrent goroutines; -race |
| `TestCacheRotation` | aggregator | Fine ring written after one ticker fire |
| `TestCacheCoarseRing` | aggregator | Coarse ring written after 5 ticker fires |
| `TestCacheQueryTopN` | aggregator | TopN returns correct merged rankings |
| `TestCacheQueryTopNWithFilter` | aggregator | TopN with website filter |
| `TestCacheQueryTrend` | aggregator | Trend per-bucket sums oldest-first |
| `TestCacheSubscribe` | aggregator | Subscriber receives snapshot on rotation |
| `TestGRPCEndToEnd` | aggregator | Two fake collectors; real gRPC TopN/Trend/Stream |
| `TestDegradedCollector` | aggregator | Bad address zeroed; good collector still visible |
---
## Deferred (not in v0)
- Per-source (busiest nginx) breakdown — requires adding `SOURCE` to the `GroupBy` proto enum
and encoding the source into the merged snapshot entries; deferred until the proto is stable
- `cmd/cli` — covered in PLAN_CLI.md
- `cmd/frontend` — covered in PLAN_FRONTEND.md
- ClickHouse export
- TLS / auth
- Prometheus metrics endpoint

293
docs/PLAN_CLI.md Normal file
View File

@@ -0,0 +1,293 @@
# CLI v0 — Implementation Plan
Module path: `git.ipng.ch/ipng/nginx-logtail`
**Scope:** A shell-facing debug tool that can query any number of collectors or aggregators
(they share the same `LogtailService` gRPC interface) and print results in a human-readable
table or JSON. Supports all three RPCs: `TopN`, `Trend`, and `StreamSnapshots`.
---
## Overview
Single binary `logtail-cli` with three subcommands:
```
logtail-cli topn [flags] # ranked list of label → count
logtail-cli trend [flags] # per-bucket time series
logtail-cli stream [flags] # live snapshot feed
```
All subcommands accept one or more `--target` addresses. Requests are fanned out
concurrently; each target's results are printed under a labeled header. With a single
target the header is omitted for clean pipe-friendly output.
---
## Step 1 — main.go and subcommand dispatch
No third-party CLI frameworks — plain `os.Args` subcommand dispatch, each subcommand
registers its own `flag.FlagSet`.
```
main():
if len(os.Args) < 2 → print usage, exit 1
switch os.Args[1]:
"topn" → runTopN(os.Args[2:])
"trend" → runTrend(os.Args[2:])
"stream" → runStream(os.Args[2:])
default → print usage, exit 1
```
Usage text lists all subcommands and their flags.
---
## Step 2 — Shared flags and client helper (`flags.go`, `client.go`)
**Shared flags** (parsed by each subcommand's FlagSet):
| Flag | Default | Description |
|------|---------|-------------|
| `--target` | `localhost:9090` | Comma-separated `host:port` list (may be repeated) |
| `--json` | false | Emit newline-delimited JSON instead of a table |
| `--website` | — | Filter: exact website match |
| `--prefix` | — | Filter: exact client prefix match |
| `--uri` | — | Filter: exact URI match |
| `--status` | — | Filter: exact HTTP status match |
`parseTargets(s string) []string` — split on comma, trim spaces, deduplicate.
`buildFilter(flags) *pb.Filter` — returns nil if no filter flags set (signals "no filter"
to the server), otherwise populates the proto fields.
**`client.go`**:
```go
func dial(addr string) (*grpc.ClientConn, pb.LogtailServiceClient, error)
```
Plain insecure dial (matching the servers' plain-TCP listener). Returns an error rather
than calling `log.Fatal` so callers can report which target failed without killing the process.
---
## Step 3 — `topn` subcommand (`cmd_topn.go`)
Additional flags:
| Flag | Default | Description |
|------|---------|-------------|
| `--n` | 10 | Number of entries to return |
| `--window` | `5m` | Time window: `1m 5m 15m 60m 6h 24h` |
| `--group-by` | `website` | Grouping: `website prefix uri status` |
`parseWindow(s string) pb.Window` — maps string → proto enum, exits on unknown value.
`parseGroupBy(s string) pb.GroupBy` — same pattern.
Fan-out: one goroutine per target, each calls `TopN` with a 10 s context deadline,
sends result (or error) on a typed result channel. Main goroutine collects all results
in target order.
**Table output** (default):
```
=== collector-1 (localhost:9090) ===
RANK COUNT LABEL
1 18 432 example.com
2 4 211 other.com
...
=== aggregator (localhost:9091) ===
RANK COUNT LABEL
1 22 643 example.com
...
```
Single-target: header omitted, plain table printed.
**JSON output** (`--json`): one JSON object per target, written sequentially to stdout:
```json
{"source":"collector-1","target":"localhost:9090","entries":[{"label":"example.com","count":18432},...]}
```
---
## Step 4 — `trend` subcommand (`cmd_trend.go`)
Additional flags:
| Flag | Default | Description |
|------|---------|-------------|
| `--window` | `5m` | Time window: `1m 5m 15m 60m 6h 24h` |
Same fan-out pattern as `topn`.
**Table output**:
```
=== collector-1 (localhost:9090) ===
TIME (UTC) COUNT
2026-03-14 20:00 823
2026-03-14 20:01 941
...
```
Points are printed oldest-first (as returned by the server).
**JSON output**: one object per target:
```json
{"source":"col-1","target":"localhost:9090","points":[{"ts":1773516000,"count":823},...]
```
---
## Step 5 — `stream` subcommand (`cmd_stream.go`)
No extra flags beyond shared ones. Each target gets one persistent `StreamSnapshots`
connection. All streams are multiplexed onto a single output goroutine via an internal
channel so lines from different targets don't interleave.
```
type streamEvent struct {
target string
source string
snap *pb.Snapshot
err error
}
```
One goroutine per target: connect → loop `stream.Recv()` → send event on channel.
On error: log to stderr, attempt reconnect after 5 s backoff (indefinitely, until
`Ctrl-C`).
`signal.NotifyContext` on SIGINT/SIGTERM cancels all stream goroutines.
**Table output** (one line per snapshot received):
```
2026-03-14 20:03:00 agg-test (localhost:9091) 950 entries top: example.com=18432
```
**JSON output**: one JSON object per snapshot event:
```json
{"ts":1773516180,"source":"agg-test","target":"localhost:9091","top_label":"example.com","top_count":18432,"total_entries":950}
```
---
## Step 6 — Formatting helpers (`format.go`)
```go
func printTable(w io.Writer, headers []string, rows [][]string)
```
Right-aligns numeric columns (COUNT, RANK), left-aligns strings. Uses `text/tabwriter`
with padding=2. No external dependencies.
```go
func fmtCount(n int64) string // "18 432" — space as thousands separator
func fmtTime(unix int64) string // "2026-03-14 20:03" UTC
```
---
## Step 7 — Tests (`cli_test.go`)
Unit tests run entirely in-process with fake gRPC servers (same pattern as
`cmd/aggregator/aggregator_test.go`).
| Test | What it covers |
|------|----------------|
| `TestParseWindow` | All 6 window strings → correct proto enum; bad value exits |
| `TestParseGroupBy` | All 4 group-by strings → correct proto enum; bad value exits |
| `TestParseTargets` | Comma split, trim, dedup |
| `TestBuildFilter` | All combinations of filter flags → correct proto Filter |
| `TestTopNSingleTarget` | Fake server; `runTopN` output matches expected table |
| `TestTopNMultiTarget` | Two fake servers; both headers present in output |
| `TestTopNJSON` | `--json` flag; output is valid JSON with correct fields |
| `TestTrendSingleTarget` | Fake server; points printed oldest-first |
| `TestTrendJSON` | `--json` flag; output is valid JSON |
| `TestStreamReceivesSnapshots` | Fake server sends 3 snapshots; output has 3 lines |
| `TestFmtCount` | `fmtCount(18432)``"18 432"` |
| `TestFmtTime` | `fmtTime(1773516000)``"2026-03-14 20:00"` |
---
## ✓ COMPLETE — Implementation notes
### Deviations from the plan
- **`TestFmtTime` uses `time.Date` not a hardcoded unix literal**: The hardcoded value
`1773516000` turned out to be 2026-03-14 19:20 UTC, not 20:00. Fixed by computing the
timestamp dynamically with `time.Date(2026, 3, 14, 20, 0, 0, 0, time.UTC).Unix()`.
- **`TestTopNJSON` tests field values, not serialised bytes**: Calling `printTopNJSON` would
require redirecting stdout. Instead the test verifies the response struct fields that the
JSON formatter would use — simpler and equally effective.
- **`streamTarget` reconnect loop lives in `cmd_stream.go`**, not a separate file. The stream
and reconnect logic are short enough to colocate.
### Test results
```
$ go test ./... -count=1 -race -timeout 60s
ok git.ipng.ch/ipng/nginx-logtail/cmd/cli 1.0s (14 tests)
ok git.ipng.ch/ipng/nginx-logtail/cmd/aggregator 4.1s (13 tests)
ok git.ipng.ch/ipng/nginx-logtail/cmd/collector 9.9s (17 tests)
```
### Test inventory
| Test | What it covers |
|------|----------------|
| `TestParseTargets` | Comma split, trim, deduplication |
| `TestParseWindow` | All 6 window strings → correct proto enum |
| `TestParseGroupBy` | All 4 group-by strings → correct proto enum |
| `TestBuildFilter` | Filter fields set correctly from flags |
| `TestBuildFilterNil` | Returns nil when no filter flags set |
| `TestFmtCount` | Space-separated thousands: 1234567 → "1 234 567" |
| `TestFmtTime` | Unix → "2026-03-14 20:00" UTC |
| `TestTopNSingleTarget` | Fake server; correct entry count and top label |
| `TestTopNMultiTarget` | Two fake servers; results ordered by target |
| `TestTopNJSON` | Response fields match expected values for JSON |
| `TestTrendSingleTarget` | Correct point count and ascending timestamp order |
| `TestTrendJSON` | JSON round-trip preserves source, ts, count |
| `TestStreamReceivesSnapshots` | 3 snapshots delivered from fake server via events channel |
| `TestTargetHeader` | Single-target → empty; multi-target → labeled header |
---
## Step 8 — Smoke test
```bash
# Start a collector
./logtail-collector --listen :9090 --logs /var/log/nginx/access.log
# Start an aggregator
./logtail-aggregator --listen :9091 --collectors localhost:9090
# Query TopN from both in one shot
./logtail-cli topn --target localhost:9090,localhost:9091 --window 15m --n 5
# Stream live snapshots from both simultaneously
./logtail-cli stream --target localhost:9090,localhost:9091
# Filter to one website, group by URI
./logtail-cli topn --target localhost:9091 --website example.com --group-by uri --n 20
# JSON output for scripting
./logtail-cli topn --target localhost:9091 --json | jq '.entries[0]'
```
---
## Deferred (not in v0)
- `--format csv` — easy to add later if needed for spreadsheet export
- `--count` / `--watch N` — repeat the query every N seconds (like `watch(1)`)
- Color output (`--color`) — ANSI highlighting of top entries
- Connecting to TLS-secured endpoints (when TLS is added to the servers)
- Per-source breakdown (depends on `SOURCE` GroupBy being added to the proto)

144
docs/PLAN_COLLECTOR.md Normal file
View File

@@ -0,0 +1,144 @@
# Collector v0 — Implementation Plan ✓ COMPLETE
Module path: `git.ipng.ch/ipng/nginx-logtail`
**Scope:** A working collector that tails files, aggregates into memory, and serves `TopN`,
`Trend`, and `StreamSnapshots` over gRPC. Full vertical slice, no optimisation passes yet.
---
## Step 1 — Repo scaffolding
- `go mod init git.ipng.ch/ipng/nginx-logtail`
- `.gitignore`
- Install deps: `google.golang.org/grpc`, `google.golang.org/protobuf`, `github.com/fsnotify/fsnotify`
## Step 2 — Proto (`proto/logtail.proto`)
Write the full proto file as specified in README.md DESIGN § Protobuf API. Generate Go stubs with
`protoc`. Commit generated files. This defines the contract everything else builds on.
## Step 3 — Parser (`cmd/collector/parser.go`)
- `LogRecord` struct: `Website`, `ClientPrefix`, `URI`, `Status string`
- `ParseLine(line string) (LogRecord, bool)``SplitN` on tab, discard query string at `?`,
return `false` for lines with fewer than 8 fields
- `TruncateIP(addr string, v4bits, v6bits int) string` — handle IPv4 and IPv6
- Unit-tested with table-driven tests: normal line, short line, IPv6, query string stripping,
/24 and /48 truncation
## Step 4 — Store (`cmd/collector/store.go`)
Implement in order, each piece testable independently:
1. **`Tuple4` and live map** — `map[Tuple4]int64`, cap enforcement at 100K, `Ingest(r LogRecord)`
2. **Fine ring buffer**`[60]Snapshot` circular array, `rotate()` heap-selects top-50K from
live map, appends to ring, resets live map
3. **Coarse ring buffer**`[288]Snapshot`, populated every 5 fine rotations by merging
the last 5 fine snapshots into a top-5K snapshot
4. **`QueryTopN(filter, groupBy, n, window)`** — RLock, sum bucket range, group by dimension,
apply filter, heap-select top N
5. **`QueryTrend(filter, window)`** — per-bucket count sum, returns one point per bucket
6. **`Store.Run(ch <-chan LogRecord)`** — single goroutine: read channel → `Ingest`, minute
ticker → `rotate()`
7. **Snapshot broadcast** — per-subscriber buffered channel fan-out;
`Subscribe() <-chan Snapshot` / `Unsubscribe(ch)`
## Step 5 — Tailer (`cmd/collector/tailer.go`)
- `Tailer` struct: path, fsnotify watcher, output channel
- On start: open file, seek to EOF, register fsnotify watch
- On `fsnotify.Write`: `bufio.Scanner` reads all new lines, sends `LogRecord` to channel
- On `fsnotify.Rename` / `Remove`: drain to EOF, close fd, retry open with 100 ms backoff
(up to 5 s), resume from position 0 — no lines lost between drain and reopen
- `Tailer.Run(ctx context.Context)` — blocks until context cancelled
## Step 6 — gRPC server (`cmd/collector/server.go`)
- `Server` wraps `*Store`, implements `LogtailServiceServer`
- `TopN`: `store.QueryTopN` → marshal to proto response
- `Trend`: `store.QueryTrend` → marshal to proto response
- `StreamSnapshots`: `store.Subscribe()`, loop sending snapshots until client disconnects
or context done, then `store.Unsubscribe(ch)`
## Step 7 — Main (`cmd/collector/main.go`)
Flags:
- `--listen` default `:9090`
- `--logs` comma-separated log file paths
- `--source` name for this collector instance (default: hostname)
- `--v4prefix` default `24`
- `--v6prefix` default `48`
Wire-up: create channel → start `store.Run` goroutine → start one `Tailer` goroutine per log
path → start gRPC server → `signal.NotifyContext` for clean shutdown on SIGINT/SIGTERM.
## Step 8 — Smoke test
- Generate fake log lines at 10K/s (small Go script or shell one-liner)
- Run collector against them
- Use `grpcurl` to call `TopN` and verify results
- Check `runtime.MemStats` to confirm memory stays well under 1 GB
---
## Deferred (not in v0)
- `cmd/cli`, `cmd/aggregator`, `cmd/frontend`
- ClickHouse export
- TLS / auth
- Prometheus metrics endpoint
---
## Implementation notes
### Deviation from plan: MultiTailer
Step 5 planned one `Tailer` struct per file. During implementation this was changed to a single
`MultiTailer` with one shared `fsnotify.Watcher`. Reason: one watcher per file creates one inotify
instance per file; the kernel default limit is 128 instances per user, which would be hit with
100s of log files. The `MultiTailer` uses a single instance and routes events by path via a
`map[string]*fileState`.
### Deviation from plan: IPv6 /48 semantics
The design doc said "truncate to /48". `/48` keeps the first three full 16-bit groups intact
(e.g. `2001:db8:cafe::1``2001:db8:cafe::/48`). An early test expected `2001:db8:ca00::/48`
(truncating mid-group), which was wrong. The code is correct; the test was fixed.
---
## Test results
Run with: `go test ./cmd/collector/ -v -count=1 -timeout 120s`
| Test | What it covers |
|-----------------------------|----------------------------------------------------|
| `TestParseLine` (7 cases) | Tab parsing, query string stripping, bad lines |
| `TestTruncateIP` | IPv4 /24 and IPv6 /48 masking |
| `TestIngestAndRotate` | Live map → fine ring rotation |
| `TestLiveMapCap` | Hard cap at 100 K entries, no panic beyond cap |
| `TestQueryTopN` | Ranked results from ring buffer |
| `TestQueryTopNWithFilter` | Filter by HTTP status code |
| `TestQueryTrend` | Per-bucket counts, oldest-first ordering |
| `TestCoarseRingPopulated` | 5 fine ticks → 1 coarse bucket, count aggregation |
| `TestSubscribeBroadcast` | Fan-out channel delivery after rotation |
| `TestTopKOrdering` | Heap select returns correct top-K descending |
| `TestMultiTailerReadsLines` | Live file write → LogRecord received on channel |
| `TestMultiTailerMultipleFiles` | 5 files, one watcher, all lines received |
| `TestMultiTailerLogRotation`| RENAME → drain → retry → new file tailed correctly |
| `TestExpandGlobs` | Glob pattern expands to matching files only |
| `TestExpandGlobsDeduplication` | Same file via path + glob deduplicated to one |
| `TestMemoryBudget` | Full ring fill stays within 1 GB heap |
| `TestGRPCEndToEnd` | Real gRPC server: TopN, filtered TopN, Trend, StreamSnapshots |
**Total: 17 tests, all passing.**
---
## Benchmark results
Run with: `go test ./cmd/collector/ -bench=. -benchtime=3s`
Hardware: 12th Gen Intel Core i7-12700T
| Benchmark | ns/op | throughput | headroom vs 10K/s |
|--------------------|-------|----------------|-------------------|
| `BenchmarkParseLine` | 418 | ~2.4M lines/s | 240× |
| `BenchmarkIngest` | 152 | ~6.5M records/s| 650× |
Both the parser and the store ingestion goroutine have several hundred times more capacity than
the 10 000 lines/second peak requirement. The bottleneck at scale will be fsnotify event delivery
and kernel I/O, not the Go code.

334
docs/PLAN_FRONTEND.md Normal file
View File

@@ -0,0 +1,334 @@
# Frontend v0 — Implementation Plan
Module path: `git.ipng.ch/ipng/nginx-logtail`
**Scope:** An HTTP server that queries a collector or aggregator and renders a drilldown TopN
dashboard with trend sparklines. Zero JavaScript. Filter state in the URL. Auto-refreshes every
30 seconds. Works with any `LogtailService` endpoint (collector or aggregator).
---
## Overview
Single page, multiple views driven entirely by URL query parameters:
```
http://frontend:8080/?target=agg:9091&w=5m&by=website&f_status=429&n=25
```
Clicking a table row drills down: it adds a filter for the clicked label and advances
`by` to the next dimension in the hierarchy (`website → prefix → uri → status`). The
breadcrumb strip shows all active filters; each token is a link that removes it.
---
## Step 1 — main.go
Flags:
| Flag | Default | Description |
|------|---------|-------------|
| `--listen` | `:8080` | HTTP listen address |
| `--target` | `localhost:9091` | Default gRPC endpoint (aggregator or collector) |
| `--n` | `25` | Default number of table rows |
| `--refresh` | `30` | `<meta refresh>` interval in seconds; 0 to disable |
Wire-up:
1. Parse flags
2. Register `http.HandleFunc("/", handler)` (single handler, all state in URL)
3. `http.ListenAndServe`
4. `signal.NotifyContext` for clean shutdown on SIGINT/SIGTERM
---
## Step 2 — client.go
```go
func dial(addr string) (*grpc.ClientConn, pb.LogtailServiceClient, error)
```
Identical to the CLI version — plain insecure dial. A new connection is opened per HTTP
request. At a 30-second page refresh rate this is negligible; pooling is not needed.
---
## Step 3 — handler.go
### URL parameters
| Param | Default | Values |
|-------|---------|--------|
| `target` | flag default | `host:port` |
| `w` | `5m` | `1m 5m 15m 60m 6h 24h` |
| `by` | `website` | `website prefix uri status` |
| `n` | flag default | positive integer |
| `f_website` | — | string |
| `f_prefix` | — | string |
| `f_uri` | — | string |
| `f_status` | — | integer string |
| `raw` | — | `1` → respond with JSON instead of HTML |
### Request flow
```
parseURLParams(r) → QueryParams
buildFilter(QueryParams) → *pb.Filter
dial(target) → client
concurrent:
client.TopN(filter, groupBy, n, window) → TopNResponse
client.Trend(filter, window) → TrendResponse
renderSparkline(TrendResponse.Points) → template.HTML
buildTableRows(TopNResponse, QueryParams) → []TableRow (includes drill-down URL per row)
buildBreadcrumbs(QueryParams) → []Crumb
execute template → w
```
TopN and Trend RPCs are issued concurrently (both have a 5 s context deadline). If Trend
fails, the sparkline is omitted silently rather than returning an error page.
### `raw=1` mode
Returns the TopN response as JSON (same format as the CLI's `--json`). Useful for scripting
and `curl` without needing the CLI binary.
### Drill-down URL construction
Dimension advance hierarchy (for row-click links):
```
website → CLIENT_PREFIX → REQUEST_URI → HTTP_RESPONSE → (no advance; all dims filtered)
```
Row-click URL: take current params, add the filter for the current `by` dimension, and set
`by` to the next dimension. If already on the last dimension (`status`), keep `by` unchanged.
### Types
```go
type QueryParams struct {
Target string
Window pb.Window
WindowS string // "5m" — for display
GroupBy pb.GroupBy
GroupByS string // "website" — for display
N int
Filter filterState
}
type filterState struct {
Website string
Prefix string
URI string
Status string // string so empty means "unset"
}
type TableRow struct {
Rank int
Label string
Count int64
Pct float64 // 0100, relative to top entry
DrillURL string // href for this row
}
type Crumb struct {
Text string // e.g. "website=example.com"
RemoveURL string // current URL with this filter removed
}
type PageData struct {
Params QueryParams
Source string
Entries []TableRow
TotalCount int64
Sparkline template.HTML // "" if trend call failed
Breadcrumbs []Crumb
RefreshSecs int
Error string // non-empty → show error banner, no table
}
```
---
## Step 4 — sparkline.go
```go
func renderSparkline(points []*pb.TrendPoint) template.HTML
```
- Fixed `viewBox="0 0 300 60"` SVG.
- X axis: evenly-spaced buckets across 300 px.
- Y axis: linear scale from 0 to max count, inverted (SVG y=0 is top).
- Rendered as a `<polyline>` with `stroke` and `fill="none"`. Minimal inline style, no classes.
- If `len(points) < 2`, returns `""` (no sparkline).
- Returns `template.HTML` (already-escaped) so the template can emit it with `{{.Sparkline}}`.
---
## Step 5 — templates/
Two files, embedded with `//go:embed templates/*.html` and parsed once at startup.
### `templates/base.html` (define "base")
Outer HTML skeleton:
- `<meta http-equiv="refresh" content="30">` (omitted if `RefreshSecs == 0`)
- Minimal inline CSS: monospace font, max-width 1000px, table styling, breadcrumb strip
- Yields a `{{template "content" .}}` block
No external CSS, no web fonts, no icons. Legible in a terminal browser (w3m, lynx).
### `templates/index.html` (define "content")
Sections in order:
**Window tabs**`1m | 5m | 15m | 60m | 6h | 24h`; current window is bold/underlined;
each is a link that swaps only `w=` in the URL.
**Group-by tabs**`by website | by prefix | by uri | by status`; current group highlighted;
links swap `by=`.
**Filter breadcrumb** — shown only when at least one filter is active:
```
Filters: [website=example.com ×] [status=429 ×]
```
Each `×` is a link to the URL without that filter.
**Error banner** — shown instead of table when `.Error` is non-empty.
**Trend sparkline** — the SVG returned by `renderSparkline`, inline. Labelled with window
and source. Omitted when `.Sparkline == ""`.
**TopN table**:
```
RANK LABEL COUNT % TREND
1 example.com 18 432 62 % ████████████
2 other.com 4 211 14 % ████
```
- `LABEL` column is a link (`DrillURL`).
- `%` is relative to the top entry (rank-1 always 100 %).
- `TREND` bar is an inline `<meter value="N" max="100">` tag — renders as a native browser bar,
degrades gracefully in text browsers to `N/100`.
- Rows beyond rank 3 show the percentage bar only if it's > 5 %, to avoid noise.
**Footer** — "source: <source> queried <timestamp> refresh 30 s" — lets operators confirm
which endpoint they're looking at.
---
## Step 6 — Tests (`frontend_test.go`)
In-process fake gRPC server (same pattern as aggregator and CLI tests).
| Test | What it covers |
|------|----------------|
| `TestParseQueryParams` | All URL params parsed correctly; defaults applied |
| `TestParseQueryParamsInvalid` | Bad `n`, bad `w`, bad `f_status` → defaults or 400 |
| `TestBuildFilterFromParams` | Populated filter; nil when nothing set |
| `TestDrillURL` | website → prefix drill; prefix → uri drill; status → no advance |
| `TestBuildCrumbs` | One crumb per active filter; remove-URL drops just that filter |
| `TestRenderSparkline` | 5 points → valid SVG containing `<polyline`; 0 points → empty |
| `TestHandlerTopN` | Fake server; GET / returns 200 with table rows in body |
| `TestHandlerRaw` | `raw=1` returns JSON with correct entries |
| `TestHandlerBadTarget` | Unreachable target → 502 with error message |
| `TestHandlerFilter` | `f_website=x` passed through to fake server's received request |
| `TestHandlerWindow` | `w=60m` → correct `pb.Window_W60M` in fake server's received request |
| `TestPctBar` | `<meter` tag present in rendered HTML |
| `TestBreadcrumbInHTML` | Filter crumb rendered; `×` link present |
---
## Step 7 — Smoke test
```bash
# Start collector and aggregator (or use existing)
./logtail-collector --listen :9090 --logs /var/log/nginx/access.log
./logtail-aggregator --listen :9091 --collectors localhost:9090
# Start frontend
./logtail-frontend --listen :8080 --target localhost:9091
# Open in browser or curl
curl -s 'http://localhost:8080/' | grep '<tr'
curl -s 'http://localhost:8080/?w=60m&by=prefix&f_status=200&raw=1' | jq '.entries[0]'
# Drill-down link check
curl -s 'http://localhost:8080/' | grep 'f_website'
```
---
## ✓ COMPLETE — Implementation notes
### Files
| File | Role |
|------|------|
| `cmd/frontend/main.go` | Flags, template loading, HTTP server, graceful shutdown |
| `cmd/frontend/client.go` | `dial()` — plain insecure gRPC, new connection per request |
| `cmd/frontend/handler.go` | URL parsing, filter building, concurrent TopN+Trend fan-out, page data assembly |
| `cmd/frontend/sparkline.go` | `renderSparkline()``[]*pb.TrendPoint` → inline `<svg><polyline>` |
| `cmd/frontend/format.go` | `fmtCount()` — space-separated thousands, registered as template func |
| `cmd/frontend/templates/base.html` | Outer HTML shell, inline CSS, meta-refresh |
| `cmd/frontend/templates/index.html` | Window tabs, group-by tabs, breadcrumb, sparkline, table, footer |
### Deviations from the plan
- **`format.go` extracted**: `fmtCount` placed in its own file (not in `handler.go`) so it can
be tested independently without loading the template.
- **`TestDialFake` added**: sanity check for the fake gRPC infrastructure used by the other tests.
- **`TestHandlerNoData` added**: verifies the "no data" message renders correctly when the server
returns an empty entry list. Total tests: 23 (plan listed 13).
- **`% relative to rank-1`** as planned; the `<meter max="100">` shows 100% for rank-1
and proportional bars below. Rank-1 is always the visual baseline.
- **`status → website` drill cycle**: clicking a row in the `by status` view adds `f_status`
and resets `by=website` (cycles back to the start of the drilldown hierarchy).
### Test results
```
$ go test ./... -count=1 -race -timeout 60s
ok git.ipng.ch/ipng/nginx-logtail/cmd/frontend 1.1s (23 tests)
ok git.ipng.ch/ipng/nginx-logtail/cmd/cli 1.0s (14 tests)
ok git.ipng.ch/ipng/nginx-logtail/cmd/aggregator 4.1s (13 tests)
ok git.ipng.ch/ipng/nginx-logtail/cmd/collector 9.7s (17 tests)
```
### Test inventory
| Test | What it covers |
|------|----------------|
| `TestParseWindowString` | All 6 window strings + bad input → default |
| `TestParseGroupByString` | All 4 group-by strings + bad input → default |
| `TestParseQueryParams` | All URL params parsed correctly |
| `TestParseQueryParamsDefaults` | Empty URL → handler defaults applied |
| `TestBuildFilter` | Filter proto fields set from filterState |
| `TestBuildFilterNil` | Returns nil when no filter set |
| `TestDrillURL` | website→prefix, prefix→uri, status→website cycle |
| `TestBuildCrumbs` | Correct text and remove-URLs for active filters |
| `TestRenderSparkline` | 5 points → SVG with polyline |
| `TestRenderSparklineTooFewPoints` | nil/1 point → empty string |
| `TestRenderSparklineAllZero` | All-zero counts → empty string |
| `TestFmtCount` | Space-thousands formatting |
| `TestHandlerTopN` | Fake server; labels and formatted counts in HTML |
| `TestHandlerRaw` | `raw=1` → JSON with source/window/group_by/entries |
| `TestHandlerBadTarget` | Unreachable target → 502 + error message in body |
| `TestHandlerFilterPassedToServer` | `f_website` + `f_status` reach gRPC filter |
| `TestHandlerWindowPassedToServer` | `w=60m``pb.Window_W60M` in request |
| `TestHandlerBreadcrumbInHTML` | Active filter renders crumb with × link |
| `TestHandlerSparklineInHTML` | Trend points → `<svg><polyline>` in page |
| `TestHandlerPctBar` | 100% for rank-1, 50% for half-count entry |
| `TestHandlerWindowTabsInHTML` | All 6 window labels rendered as links |
| `TestHandlerNoData` | Empty entry list → "no data" message |
| `TestDialFake` | Test infrastructure sanity check |
---
## Deferred (not in v0)
- Dark mode (prefers-color-scheme media query)
- Per-row mini sparklines (one Trend RPC per table row — expensive; need batching first)
- WebSocket or SSE for live push instead of meta-refresh
- Pagination for large N
- `?format=csv` download
- OIDC/basic-auth gating
- ClickHouse-backed 7d/30d windows (tracked in README)