SPECIFICATION

This project contains four programs:

  1. A collector that tails any number of nginx log files and maintains an in-memory structure of {website, client_prefix, http_request_uri, http_response} counts across all files. It answers TopN and Trend queries via gRPC and pushes minute snapshots to the aggregator via server-streaming. Runs on each nginx machine in the cluster. No UI — gRPC interface only.

  2. An aggregator that subscribes to the snapshot stream from all collectors, merges their data into a unified in-memory cache, and exposes the same gRPC interface. Answers questions like "what is the busiest website globally", "which client prefix is causing the most HTTP 503s", and shows trending information useful for DDoS detection. Runs on a central machine.

  3. An HTTP frontend companion to the aggregator that renders a drilldown dashboard. Operators can restrict by http_response=429, then by website=www.example.com, and so on. Works with either a collector or aggregator as its backend. Zero JavaScript — server-rendered HTML with inline SVG sparklines and meta-refresh.

  4. A CLI for shell-based debugging. Sends topn, trend, and stream queries to any collector or aggregator, fans out to multiple targets in parallel, and outputs human-readable tables or newline-delimited JSON.

Programs are written in Go. No CGO, no external runtime dependencies.


DESIGN

Directory Layout

nginx-logtail/
├── proto/
│   ├── logtail.proto                  # shared protobuf definitions
│   └── logtailpb/
│       ├── logtail.pb.go              # generated: messages, enums
│       └── logtail_grpc.pb.go         # generated: service stubs
├── internal/
│   └── store/
│       └── store.go                   # shared types: Tuple4, Entry, Snapshot, ring helpers
└── cmd/
    ├── collector/
    │   ├── main.go
    │   ├── tailer.go                  # MultiTailer: tail N files via one shared fsnotify watcher
    │   ├── parser.go                  # tab-separated logtail log_format parser (~50 ns/line)
    │   ├── store.go                   # bounded top-K in-memory store + tiered ring buffers
    │   └── server.go                  # gRPC server: TopN, Trend, StreamSnapshots
    ├── aggregator/
    │   ├── main.go
    │   ├── subscriber.go              # one goroutine per collector; StreamSnapshots with backoff
    │   ├── merger.go                  # delta-merge: O(snapshot_size) per update
    │   ├── cache.go                   # tick-based ring buffer cache served to clients
    │   ├── registry.go                # TargetRegistry: addr→name map updated from snapshot sources
    │   └── server.go                  # gRPC server (same surface as collector)
    ├── frontend/
    │   ├── main.go
    │   ├── handler.go                 # URL param parsing, concurrent TopN+Trend, template exec
    │   ├── filter.go                  # ParseFilterExpr / FilterExprString mini filter language
    │   ├── client.go                  # gRPC dial helper
    │   ├── sparkline.go               # TrendPoints → inline SVG polyline
    │   ├── format.go                  # fmtCount (space thousands separator)
    │   └── templates/
    │       ├── base.html              # outer HTML shell, inline CSS, meta-refresh
    │       └── index.html             # window tabs, group-by tabs, breadcrumb, table, footer
    └── cli/
        ├── main.go                    # subcommand dispatch and usage
        ├── flags.go                   # shared flags, parseTargets, buildFilter, parseWindow
        ├── client.go                  # gRPC dial helper
        ├── format.go                  # printTable, fmtCount, fmtTime, targetHeader
        ├── cmd_topn.go                # topn: concurrent fan-out, table + JSON output
        ├── cmd_trend.go               # trend: concurrent fan-out, table + JSON output
        ├── cmd_stream.go              # stream: multiplexed streams, auto-reconnect
        └── cmd_targets.go             # targets: list collectors known to the endpoint

Data Model

The core unit is a count keyed by four dimensions:

Field Description Example
website nginx $host www.example.com
client_prefix client IP truncated to /24 IPv4 or /48 IPv6 1.2.3.0/24
http_request_uri $request_uri path only — query string stripped /api/v1/search
http_response HTTP status code 429

Time Windows & Tiered Ring Buffers

Two ring buffers at different resolutions cover all query windows up to 24 hours:

Tier Bucket size Buckets Top-K/bucket Covers Roll-up trigger
Fine 1 min 60 50 000 1 h every minute
Coarse 5 min 288 5 000 24 h every 5 fine ticks

Supported query windows and which tier they read from:

Window Tier Buckets summed
1 min fine last 1
5 min fine last 5
15 min fine last 15
60 min fine all 60
6 h coarse last 72
24 h coarse all 288

Every minute: snapshot live map → top-50K → append to fine ring, reset live map. Every 5 minutes: merge last 5 fine snapshots → top-5K → append to coarse ring.

Memory Budget (Collector, target ≤ 1 GB)

Entry size: ~30 B website + ~15 B prefix + ~50 B URI + 3 B status + 8 B count + ~80 B Go map overhead ≈ ~186 bytes per entry.

Structure Entries Size
Live map (capped) 100 000 ~19 MB
Fine ring (60 × 1-min) 60 × 50 000 ~558 MB
Coarse ring (288 × 5-min) 288 × 5 000 ~268 MB
Total ~845 MB

The live map is hard-capped at 100 K entries. Once full, only updates to existing keys are accepted; new keys are dropped until the next rotation resets the map. This keeps memory bounded regardless of attack cardinality.

Future Work — ClickHouse Export (post-MVP)

Do not implement until the end-to-end MVP is running.

The aggregator will optionally write 1-minute pre-aggregated rows to ClickHouse for 7d/30d historical views. Schema sketch:

CREATE TABLE logtail (
  ts            DateTime,
  website       LowCardinality(String),
  client_prefix String,
  request_uri   LowCardinality(String),
  status        UInt16,
  count         UInt64
) ENGINE = SummingMergeTree(count)
PARTITION BY toYYYYMMDD(ts)
ORDER BY (ts, website, status, client_prefix, request_uri);

The frontend routes window=7d|30d queries to ClickHouse; all shorter windows continue to use the in-memory cache. Kafka is not needed — the aggregator writes directly. This is purely additive and does not change any existing interface.

Protobuf API (proto/logtail.proto)

enum StatusOp { EQ = 0; NE = 1; GT = 2; GE = 3; LT = 4; LE = 5; }

message Filter {
  optional string website          = 1;
  optional string client_prefix    = 2;
  optional string http_request_uri = 3;
  optional int32  http_response    = 4;
  StatusOp        status_op        = 5;  // comparison operator for http_response
  optional string website_regex    = 6;  // RE2 regex against website
  optional string uri_regex        = 7;  // RE2 regex against http_request_uri
}

enum GroupBy { WEBSITE = 0; CLIENT_PREFIX = 1; REQUEST_URI = 2; HTTP_RESPONSE = 3; }
enum Window  { W1M = 0; W5M = 1; W15M = 2; W60M = 3; W6H = 4; W24H = 5; }

message TopNRequest   { Filter filter = 1; GroupBy group_by = 2; int32 n = 3; Window window = 4; }
message TopNEntry     { string label = 1; int64 count = 2; }
message TopNResponse  { repeated TopNEntry entries = 1; string source = 2; }

// Trend: one total count per minute (or 5-min) bucket, for sparklines
message TrendRequest  { Filter filter = 1; Window window = 4; }
message TrendPoint    { int64 timestamp_unix = 1; int64 count = 2; }
message TrendResponse { repeated TrendPoint points = 1; string source = 2; }

// Streaming: collector pushes a fine snapshot after every minute rotation
message SnapshotRequest {}
message Snapshot {
  string             source    = 1;
  int64              timestamp = 2;
  repeated TopNEntry entries   = 3;  // full top-50K for this bucket
}

// Target discovery: list the collectors behind the queried endpoint
message ListTargetsRequest {}
message TargetInfo {
  string name = 1;  // display name (--source value from the collector)
  string addr = 2;  // gRPC address; empty string means "this endpoint itself"
}
message ListTargetsResponse { repeated TargetInfo targets = 1; }

service LogtailService {
  rpc TopN(TopNRequest)                returns (TopNResponse);
  rpc Trend(TrendRequest)              returns (TrendResponse);
  rpc StreamSnapshots(SnapshotRequest) returns (stream Snapshot);
  rpc ListTargets(ListTargetsRequest)  returns (ListTargetsResponse);
}
// Both collector and aggregator implement LogtailService.
// The aggregator's StreamSnapshots re-streams the merged view.
// ListTargets: aggregator returns all configured collectors; collector returns itself.

Program 1 — Collector

tailer.go

  • MultiTailer: one shared fsnotify.Watcher for all files regardless of count — avoids the inotify instance limit when tailing hundreds of files.
  • On WRITE event: read all new lines from that file's bufio.Reader.
  • On RENAME/REMOVE (logrotate): drain old fd to EOF, close, start retry-open goroutine with exponential backoff. Sends the new *os.File back via a channel to keep map access single-threaded.
  • Emits LogRecord structs on a shared buffered channel (capacity 200 K — absorbs ~20 s of peak).
  • Accepts paths via --logs (comma-separated or glob) and --logs-file (one path/glob per line).

parser.go

  • Parses the fixed logtail nginx log format — tab-separated, fixed field order, no quoting:

    log_format logtail '$host\t$remote_addr\t$msec\t$request_method\t$request_uri\t$status\t$body_bytes_sent\t$request_time';
    
    # Field Used for
    0 $host website
    1 $remote_addr client_prefix
    2 $msec (discarded)
    3 $request_method (discarded)
    4 $request_uri http_request_uri
    5 $status http_response
    6 $body_bytes_sent (discarded)
    7 $request_time (discarded)
  • strings.SplitN(line, "\t", 8) — ~50 ns/line. No regex.

  • $request_uri: query string discarded at first ?.

  • $remote_addr: truncated to /24 (IPv4) or /48 (IPv6); prefix lengths configurable via flags.

  • Lines with fewer than 8 fields are silently skipped.

store.go

  • Single aggregator goroutine reads from the channel and updates the live map — no locking on the hot path. At 10 K lines/s the goroutine uses <1% CPU.
  • Live map: map[Tuple4]int64, hard-capped at 100 K entries (new keys dropped when full).
  • Minute ticker: heap-selects top-50K entries, writes snapshot to fine ring, resets live map.
  • Every 5 fine ticks: merge last 5 fine snapshots → top-5K → write to coarse ring.
  • TopN query: RLock ring, sum bucket range, apply filter, group by dimension, heap-select top N.
  • Trend query: per-bucket filtered sum, returns one TrendPoint per bucket.
  • Subscriber fan-out: per-subscriber buffered channel; Subscribe/Unsubscribe for streaming.

server.go

  • gRPC server on configurable port (default :9090).
  • TopN and Trend: unary, answered from the ring buffer under RLock.
  • StreamSnapshots: registers a subscriber channel; loops Recv on it; 30 s keepalive ticker.

Program 2 — Aggregator

subscriber.go

  • One goroutine per collector. Dials, calls StreamSnapshots, forwards each Snapshot to the merger.
  • Reconnects with exponential backoff (100 ms → doubles → cap 30 s).
  • After 3 consecutive failures: calls merger.Zero(addr) to remove that collector's contribution from the merged view (prevents stale counts accumulating during outages).
  • Resets failure count on first successful Recv; logs recovery.

merger.go

  • Delta strategy: on each new snapshot from collector X, subtract X's previous entries from merged, add the new entries, store new map. O(snapshot_size) per update — not O(N_collectors × snapshot_size).
  • Zero(addr): subtracts the collector's last-known contribution and deletes its entry — called when a collector is marked degraded.

cache.go

  • Tick-based rotation (1-min ticker, not snapshot-triggered): keeps the aggregator ring aligned to the same 1-minute cadence as collectors regardless of how many collectors are connected.
  • Same tiered ring structure as the collector store; populated from merger.TopK() each tick.
  • QueryTopN, QueryTrend, Subscribe/Unsubscribe — identical interface to collector store.

registry.go

  • TargetRegistry: sync.RWMutex-protected map[addr → name]. Initialised with the configured collector addresses; display names are updated from the source field of the first snapshot received from each collector.
  • Targets() returns a stable sorted slice of {name, addr} pairs for ListTargets responses.

server.go

  • Implements LogtailService backed by the cache (not live fan-out).
  • StreamSnapshots re-streams merged fine snapshots; usable by a second-tier aggregator or monitoring system.
  • ListTargets returns the current TargetRegistry contents — all configured collectors with their display names and gRPC addresses.

Program 3 — Frontend

handler.go

  • All filter state in the URL query string: w (window), by (group_by), f_website, f_prefix, f_uri, f_status, f_website_re, f_uri_re, n, target. No server-side session — URLs are shareable and bookmarkable; multiple operators see independent views.
  • Filter expression box: a q= parameter carries a mini filter language (status>=400 AND website~=gouda.* AND uri~=^/api/). On submission the handler parses it via ParseFilterExpr and redirects to the canonical URL with individual f_* params; q= never appears in the final URL. Parse errors re-render the current page with an inline message.
  • Status expressions: f_status accepts 200, !=200, >=400, <500, etc. — parsed by store.ParseStatusExpr into (value, StatusOp) for the filter protobuf.
  • Regex filters: f_website_re and f_uri_re hold RE2 patterns; compiled once per request into store.CompiledFilter before the query-loop iteration. Invalid regexes match nothing.
  • TopN, Trend, and ListTargets RPCs issued concurrently (all with a 5 s deadline); page renders with whatever completes. Trend failure suppresses the sparkline; ListTargets failure hides the source picker — both are non-fatal.
  • Source picker: ListTargets result drives a source: tab row. Clicking a collector tab sets target= to that collector's address, querying it directly. The "all" tab resets to the default aggregator. Picker is hidden when ListTargets returns ≤0 collectors (direct collector mode).
  • Drilldown: clicking a table row adds the current dimension's filter and advances by through website → prefix → uri → status → website (cycles).
  • raw=1: returns the TopN result as JSON — same URL, no CLI needed for scripting.
  • target= override: per-request gRPC endpoint override for comparing sources.
  • Error pages render at HTTP 502 with the window/group-by tabs still functional.

sparkline.go

  • renderSparkline([]*pb.TrendPoint) template.HTML — fixed viewBox="0 0 300 60" SVG, Y-scaled to max count, rendered as <polyline>. Returns "" for fewer than 2 points or all-zero data.

templates/

  • base.html: outer shell, inline CSS (~40 lines), conditional <meta http-equiv="refresh">.
  • index.html: window tabs, group-by tabs, filter breadcrumb with × remove links, sparkline, TopN table with <meter> bars (% relative to rank-1), footer with source and refresh info.
  • No external CSS, no web fonts, no JavaScript. Renders in w3m/lynx.

Program 4 — CLI

Subcommands

logtail-cli topn    [flags]   ranked label → count table (exits after one response)
logtail-cli trend   [flags]   per-bucket time series (exits after one response)
logtail-cli stream  [flags]   live snapshot feed (runs until Ctrl-C, auto-reconnects)
logtail-cli targets [flags]   list targets known to the queried endpoint

Flags

Shared (all subcommands):

Flag Default Description
--target localhost:9090 Comma-separated host:port list; fan-out to all
--json false Emit newline-delimited JSON instead of a table
--website Filter: website
--prefix Filter: client prefix
--uri Filter: request URI
--status Filter: HTTP status expression (200, !=200, >=400, <500, …)
--website-re Filter: RE2 regex against website
--uri-re Filter: RE2 regex against request URI

topn only: --n 10, --window 5m, --group-by website

trend only: --window 5m

Multi-target fan-out

--target accepts a comma-separated list. All targets are queried concurrently; results are printed in order with a per-target header. Single-target output omits the header for clean pipe-to-jq use.

Output

Default: human-readable table with space-separated thousands (18 432). --json: one JSON object per target (NDJSON for stream).

stream reconnects automatically on error (5 s backoff). All other subcommands exit immediately with a non-zero code on gRPC error.

Key Design Decisions

Decision Rationale
Single aggregator goroutine in collector Eliminates all map lock contention on the 10 K/s hot path
Hard cap live map at 100 K entries Bounds memory regardless of DDoS cardinality explosion
Ring buffer of sorted snapshots (not raw maps) TopN queries avoid re-sorting; merge is a single heap pass
Push-based streaming (collector → aggregator) Aggregator cache always fresh; query latency is cache-read only
Delta merge in aggregator O(snapshot_size) per update, not O(N_collectors × size)
Tick-based cache rotation in aggregator Ring stays on the same 1-min cadence regardless of collector count
Degraded collector zeroing Stale counts from failed collectors don't accumulate in the merged view
Same LogtailService for collector and aggregator CLI and frontend work with either; no special-casing
internal/store shared package ~200 lines of ring-buffer logic shared between collector and aggregator
Filter state in URL, not session cookie Multiple concurrent operators; shareable/bookmarkable URLs
Query strings stripped at ingest Major cardinality reduction; prevents URI explosion under attack
No persistent storage Simplicity; acceptable for ops dashboards (restart = lose history)
Trusted internal network, no TLS Reduces operational complexity; add a TLS proxy if needed later
Server-side SVG sparklines, meta-refresh Zero JS dependencies; works in terminal browsers and curl
CLI default: human-readable table Operator-friendly by default; --json opt-in for scripting
CLI multi-target fan-out Compare a collector vs. aggregator, or two collectors, in one command
CLI uses stdlib flag, no framework Four subcommands don't justify a dependency
Status filter as expression string (!=200, >=400) Operator-friendly; parsed once at query boundary, encoded as (int32, StatusOp) in proto
Regex filters compiled once per query (CompiledFilter) Up to 288 × 5 000 per-entry calls — compiling per-entry would dominate query latency
Filter expression box (q=) redirects to canonical URL Filter state stays in individual f_* params; URLs remain shareable and bookmarkable
ListTargets + frontend source picker (no Tuple5) "Which nginx is busiest?" answered by switching target= to a collector; no data model changes, no extra memory
Description
No description provided
Readme 220 KiB
Languages
Go 96.4%
HTML 3.6%