From 91eb56a64c3a299ac878d427d801c8da7a1973e8 Mon Sep 17 00:00:00 2001 From: Pim van Pelt Date: Tue, 24 Mar 2026 03:49:22 +0100 Subject: [PATCH] Add prometheus exporter on :9100 --- README.md | 8 +- cmd/collector/main.go | 14 +++ cmd/collector/parser.go | 40 +++++-- cmd/collector/parser_test.go | 31 +++++- cmd/collector/prom.go | 209 +++++++++++++++++++++++++++++++++++ cmd/collector/prom_test.go | 130 ++++++++++++++++++++++ cmd/collector/store.go | 6 +- docs/USERGUIDE.md | 68 ++++++++++++ 8 files changed, 486 insertions(+), 20 deletions(-) create mode 100644 cmd/collector/prom.go create mode 100644 cmd/collector/prom_test.go diff --git a/README.md b/README.md index a77826b..7d695ac 100644 --- a/README.md +++ b/README.md @@ -13,9 +13,11 @@ SPECIFICATION This project contains four programs: 1) A **collector** that tails any number of nginx log files and maintains an in-memory structure of -`{website, client_prefix, http_request_uri, http_response}` counts across all files. It answers -TopN and Trend queries via gRPC and pushes minute snapshots to the aggregator via server-streaming. -Runs on each nginx machine in the cluster. No UI — gRPC interface only. +`{website, client_prefix, http_request_uri, http_response, is_tor, asn}` counts across all files. +It answers TopN and Trend queries via gRPC and pushes minute snapshots to the aggregator via +server-streaming. It also exposes a Prometheus `/metrics` endpoint (default `:9100`) with per-host +request counters and response-body/request-time histograms. +Runs on each nginx machine in the cluster. No UI — gRPC and HTTP interfaces only. 2) An **aggregator** that subscribes to the snapshot stream from all collectors, merges their data into a unified in-memory cache, and exposes the same gRPC interface. Answers questions like "what diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 530ff19..59d967f 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -6,6 +6,7 @@ import ( "flag" "log" "net" + "net/http" "os" "os/signal" "path/filepath" @@ -19,6 +20,7 @@ import ( func main() { listen := flag.String("listen", ":9090", "gRPC listen address") + promListen := flag.String("prom-listen", ":9100", "Prometheus metrics listen address (empty to disable)") logPaths := flag.String("logs", "", "comma-separated log file paths/globs to tail") logsFile := flag.String("logs-file", "", "file containing one log path/glob per line") source := flag.String("source", hostname(), "name for this collector (default: hostname)") @@ -40,6 +42,18 @@ func main() { ch := make(chan LogRecord, 200_000) store := NewStore(*source) + if *promListen != "" { + ps := NewPromStore() + store.prom = ps + mux := http.NewServeMux() + mux.Handle("/metrics", ps) + go func() { + log.Printf("collector: Prometheus metrics on %s/metrics", *promListen) + if err := http.ListenAndServe(*promListen, mux); err != nil { + log.Fatalf("collector: Prometheus server: %v", err) + } + }() + } go store.Run(ch) tailer := NewMultiTailer(patterns, *scanInterval, *v4prefix, *v6prefix, ch) diff --git a/cmd/collector/parser.go b/cmd/collector/parser.go index ad0ea90..8a48d18 100644 --- a/cmd/collector/parser.go +++ b/cmd/collector/parser.go @@ -9,12 +9,15 @@ import ( // LogRecord holds the dimensions extracted from a single nginx log line. type LogRecord struct { - Website string - ClientPrefix string - URI string - Status string - IsTor bool - ASN int32 + Website string + ClientPrefix string + URI string + Status string + IsTor bool + ASN int32 + Method string + BodyBytesSent int64 + RequestTime float64 } // ParseLine parses a tab-separated logtail log line: @@ -51,13 +54,26 @@ func ParseLine(line string, v4bits, v6bits int) (LogRecord, bool) { } } + var bodyBytes int64 + if n, err := strconv.ParseInt(fields[6], 10, 64); err == nil { + bodyBytes = n + } + + var reqTime float64 + if f, err := strconv.ParseFloat(fields[7], 64); err == nil { + reqTime = f + } + return LogRecord{ - Website: fields[0], - ClientPrefix: prefix, - URI: uri, - Status: fields[5], - IsTor: isTor, - ASN: asn, + Website: fields[0], + ClientPrefix: prefix, + URI: uri, + Status: fields[5], + IsTor: isTor, + ASN: asn, + Method: fields[3], + BodyBytesSent: bodyBytes, + RequestTime: reqTime, }, true } diff --git a/cmd/collector/parser_test.go b/cmd/collector/parser_test.go index 7d84946..eb7c29b 100644 --- a/cmd/collector/parser_test.go +++ b/cmd/collector/parser_test.go @@ -18,10 +18,13 @@ func TestParseLine(t *testing.T) { line: good, wantOK: true, want: LogRecord{ - Website: "www.example.com", - ClientPrefix: "1.2.3.0/24", - URI: "/api/v1/search", - Status: "200", + Website: "www.example.com", + ClientPrefix: "1.2.3.0/24", + URI: "/api/v1/search", + Status: "200", + Method: "GET", + BodyBytesSent: 1452, + RequestTime: 0.043, }, }, { @@ -33,6 +36,8 @@ func TestParseLine(t *testing.T) { ClientPrefix: "10.0.0.0/24", URI: "/submit", Status: "201", + Method: "POST", + RequestTime: 0.001, }, }, { @@ -44,6 +49,8 @@ func TestParseLine(t *testing.T) { ClientPrefix: "2001:db8:cafe::/48", // /48 = 3 full 16-bit groups intact URI: "/", Status: "200", + Method: "GET", + RequestTime: 0.001, }, }, { @@ -70,6 +77,8 @@ func TestParseLine(t *testing.T) { ClientPrefix: "5.6.7.0/24", URI: "/rate-limited", Status: "429", + Method: "GET", + RequestTime: 0.001, }, }, { @@ -82,6 +91,8 @@ func TestParseLine(t *testing.T) { URI: "/", Status: "200", IsTor: true, + Method: "GET", + RequestTime: 0.001, }, }, { @@ -94,6 +105,8 @@ func TestParseLine(t *testing.T) { URI: "/", Status: "200", IsTor: false, + Method: "GET", + RequestTime: 0.001, }, }, { @@ -106,6 +119,8 @@ func TestParseLine(t *testing.T) { URI: "/", Status: "200", IsTor: false, + Method: "GET", + RequestTime: 0.001, }, }, { @@ -119,6 +134,8 @@ func TestParseLine(t *testing.T) { Status: "200", IsTor: false, ASN: 12345, + Method: "GET", + RequestTime: 0.001, }, }, { @@ -132,6 +149,8 @@ func TestParseLine(t *testing.T) { Status: "200", IsTor: true, ASN: 65535, + Method: "GET", + RequestTime: 0.001, }, }, { @@ -145,6 +164,8 @@ func TestParseLine(t *testing.T) { Status: "200", IsTor: true, ASN: 0, + Method: "GET", + RequestTime: 0.001, }, }, { @@ -158,6 +179,8 @@ func TestParseLine(t *testing.T) { Status: "200", IsTor: false, ASN: 0, + Method: "GET", + RequestTime: 0.001, }, }, } diff --git a/cmd/collector/prom.go b/cmd/collector/prom.go new file mode 100644 index 0000000..b487740 --- /dev/null +++ b/cmd/collector/prom.go @@ -0,0 +1,209 @@ +package main + +import ( + "bufio" + "fmt" + "net/http" + "sort" + "strings" + "sync" +) + +// Body-size histogram bucket upper bounds in bytes. +const promNumBodyBounds = 7 + +var promBodyBounds = [promNumBodyBounds]int64{256, 1024, 4096, 16384, 65536, 262144, 1048576} + +// Request-time histogram bucket upper bounds in seconds (standard Prometheus defaults). +const promNumTimeBounds = 11 + +var promTimeBounds = [promNumTimeBounds]float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10} + +const promCounterCap = 100_000 // safety cap on {host,method,status} counter entries + +// promCounterKey is the label set for per-request counters. +type promCounterKey struct { + Host string + Method string + Status string +} + +// promBodyEntry holds the body_bytes_sent histogram for one host. +type promBodyEntry struct { + buckets [promNumBodyBounds + 1]int64 // indices 0..N-1: le=bound[i]; index N: le=+Inf + sum int64 +} + +// promTimeEntry holds the request_time histogram for one host. +type promTimeEntry struct { + buckets [promNumTimeBounds + 1]int64 + sum float64 +} + +// PromStore accumulates Prometheus metrics ingested from log records. +// +// Ingest must be called from exactly one goroutine (the store's Run goroutine). +// ServeHTTP may be called from any number of goroutines concurrently. +type PromStore struct { + mu sync.Mutex + counters map[promCounterKey]int64 + body map[string]*promBodyEntry // keyed by host + reqTime map[string]*promTimeEntry // keyed by host +} + +// NewPromStore returns an empty PromStore ready for use. +func NewPromStore() *PromStore { + return &PromStore{ + counters: make(map[promCounterKey]int64, 1024), + body: make(map[string]*promBodyEntry, 64), + reqTime: make(map[string]*promTimeEntry, 64), + } +} + +// Ingest records one log record into the Prometheus metrics. +// Must be called from a single goroutine. +func (p *PromStore) Ingest(r LogRecord) { + p.mu.Lock() + + // --- per-{host,method,status} request counter --- + ck := promCounterKey{Host: r.Website, Method: r.Method, Status: r.Status} + if _, ok := p.counters[ck]; ok { + p.counters[ck]++ + } else if len(p.counters) < promCounterCap { + p.counters[ck] = 1 + } + + // --- body_bytes_sent histogram (keyed by host only) --- + be, ok := p.body[r.Website] + if !ok { + be = &promBodyEntry{} + p.body[r.Website] = be + } + for i, bound := range promBodyBounds { + if r.BodyBytesSent <= bound { + be.buckets[i]++ + } + } + be.buckets[promNumBodyBounds]++ // +Inf + be.sum += r.BodyBytesSent + + // --- request_time histogram (keyed by host only) --- + te, ok := p.reqTime[r.Website] + if !ok { + te = &promTimeEntry{} + p.reqTime[r.Website] = te + } + for i, bound := range promTimeBounds { + if r.RequestTime <= bound { + te.buckets[i]++ + } + } + te.buckets[promNumTimeBounds]++ // +Inf + te.sum += r.RequestTime + + p.mu.Unlock() +} + +// ServeHTTP renders all metrics in the Prometheus text exposition format (0.0.4). +func (p *PromStore) ServeHTTP(w http.ResponseWriter, _ *http.Request) { + // Snapshot everything under the lock, then render without holding it. + p.mu.Lock() + + type counterSnap struct { + k promCounterKey + v int64 + } + counters := make([]counterSnap, 0, len(p.counters)) + for k, v := range p.counters { + counters = append(counters, counterSnap{k, v}) + } + + type bodySnap struct { + host string + e promBodyEntry + } + bodySnaps := make([]bodySnap, 0, len(p.body)) + for h, e := range p.body { + bodySnaps = append(bodySnaps, bodySnap{h, *e}) + } + + type timeSnap struct { + host string + e promTimeEntry + } + timeSnaps := make([]timeSnap, 0, len(p.reqTime)) + for h, e := range p.reqTime { + timeSnaps = append(timeSnaps, timeSnap{h, *e}) + } + + p.mu.Unlock() + + // Sort for stable, human-readable output. + sort.Slice(counters, func(i, j int) bool { + a, b := counters[i].k, counters[j].k + if a.Host != b.Host { + return a.Host < b.Host + } + if a.Method != b.Method { + return a.Method < b.Method + } + return a.Status < b.Status + }) + sort.Slice(bodySnaps, func(i, j int) bool { return bodySnaps[i].host < bodySnaps[j].host }) + sort.Slice(timeSnaps, func(i, j int) bool { return timeSnaps[i].host < timeSnaps[j].host }) + + w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + bw := bufio.NewWriterSize(w, 256*1024) + + // nginx_http_requests_total + fmt.Fprintln(bw, "# HELP nginx_http_requests_total Total number of HTTP requests processed.") + fmt.Fprintln(bw, "# TYPE nginx_http_requests_total counter") + for _, c := range counters { + fmt.Fprintf(bw, "nginx_http_requests_total{host=%q,method=%q,status=%q} %d\n", + c.k.Host, c.k.Method, c.k.Status, c.v) + } + + // nginx_http_response_body_bytes (histogram, labeled by host) + fmt.Fprintln(bw, "# HELP nginx_http_response_body_bytes HTTP response body size distribution in bytes.") + fmt.Fprintln(bw, "# TYPE nginx_http_response_body_bytes histogram") + for _, s := range bodySnaps { + for i, bound := range promBodyBounds { + fmt.Fprintf(bw, "nginx_http_response_body_bytes_bucket{host=%q,le=%q} %d\n", + s.host, fmt.Sprintf("%d", bound), s.e.buckets[i]) + } + fmt.Fprintf(bw, "nginx_http_response_body_bytes_bucket{host=%q,le=\"+Inf\"} %d\n", + s.host, s.e.buckets[promNumBodyBounds]) + fmt.Fprintf(bw, "nginx_http_response_body_bytes_count{host=%q} %d\n", + s.host, s.e.buckets[promNumBodyBounds]) + fmt.Fprintf(bw, "nginx_http_response_body_bytes_sum{host=%q} %d\n", + s.host, s.e.sum) + } + + // nginx_http_request_duration_seconds (histogram, labeled by host) + fmt.Fprintln(bw, "# HELP nginx_http_request_duration_seconds HTTP request processing time in seconds.") + fmt.Fprintln(bw, "# TYPE nginx_http_request_duration_seconds histogram") + for _, s := range timeSnaps { + for i, bound := range promTimeBounds { + fmt.Fprintf(bw, "nginx_http_request_duration_seconds_bucket{host=%q,le=%q} %d\n", + s.host, formatFloat(bound), s.e.buckets[i]) + } + fmt.Fprintf(bw, "nginx_http_request_duration_seconds_bucket{host=%q,le=\"+Inf\"} %d\n", + s.host, s.e.buckets[promNumTimeBounds]) + fmt.Fprintf(bw, "nginx_http_request_duration_seconds_count{host=%q} %d\n", + s.host, s.e.buckets[promNumTimeBounds]) + fmt.Fprintf(bw, "nginx_http_request_duration_seconds_sum{host=%q} %g\n", + s.host, s.e.sum) + } + + bw.Flush() +} + +// formatFloat renders a float64 bucket bound without trailing zeros but always +// with at least one decimal place, matching Prometheus convention (e.g. "0.5", "10"). +func formatFloat(f float64) string { + s := fmt.Sprintf("%g", f) + if !strings.Contains(s, ".") && !strings.Contains(s, "e") { + s += ".0" // ensure it looks like a float, not an integer + } + return s +} diff --git a/cmd/collector/prom_test.go b/cmd/collector/prom_test.go new file mode 100644 index 0000000..39f9b26 --- /dev/null +++ b/cmd/collector/prom_test.go @@ -0,0 +1,130 @@ +package main + +import ( + "net/http/httptest" + "strings" + "testing" +) + +func TestPromStoreIngestBodyBuckets(t *testing.T) { + ps := NewPromStore() + // 512 bytes: > 256, ≤ 1024 → bucket[0] stays 0, buckets[1..N] get 1 + ps.Ingest(LogRecord{Website: "example.com", Method: "GET", Status: "200", BodyBytesSent: 512}) + + ps.mu.Lock() + be := ps.body["example.com"] + ps.mu.Unlock() + + if be == nil { + t.Fatal("expected body entry, got nil") + } + if be.buckets[0] != 0 { // le=256: 512 > 256 + t.Errorf("le=256 bucket = %d, want 0", be.buckets[0]) + } + if be.buckets[1] != 1 { // le=1024: 512 ≤ 1024 + t.Errorf("le=1024 bucket = %d, want 1", be.buckets[1]) + } + for i := 2; i <= promNumBodyBounds; i++ { + if be.buckets[i] != 1 { + t.Errorf("bucket[%d] = %d, want 1", i, be.buckets[i]) + } + } + if be.sum != 512 { + t.Errorf("sum = %d, want 512", be.sum) + } +} + +func TestPromStoreIngestTimeBuckets(t *testing.T) { + ps := NewPromStore() + // 0.075s: > 0.05, ≤ 0.1 + ps.Ingest(LogRecord{Website: "example.com", Method: "GET", Status: "200", RequestTime: 0.075}) + + ps.mu.Lock() + te := ps.reqTime["example.com"] + ps.mu.Unlock() + + if te == nil { + t.Fatal("expected time entry, got nil") + } + // le=0.05 (index 3): 0.075 > 0.05 → 0 + if te.buckets[3] != 0 { + t.Errorf("le=0.05 bucket = %d, want 0", te.buckets[3]) + } + // le=0.1 (index 4): 0.075 ≤ 0.1 → 1 + if te.buckets[4] != 1 { + t.Errorf("le=0.1 bucket = %d, want 1", te.buckets[4]) + } + // +Inf (last): always 1 + if te.buckets[promNumTimeBounds] != 1 { + t.Errorf("+Inf bucket = %d, want 1", te.buckets[promNumTimeBounds]) + } +} + +func TestPromStoreCounter(t *testing.T) { + ps := NewPromStore() + ps.Ingest(LogRecord{Website: "a.com", Method: "GET", Status: "200"}) + ps.Ingest(LogRecord{Website: "a.com", Method: "GET", Status: "200"}) + ps.Ingest(LogRecord{Website: "a.com", Method: "POST", Status: "201"}) + + ps.mu.Lock() + c1 := ps.counters[promCounterKey{"a.com", "GET", "200"}] + c2 := ps.counters[promCounterKey{"a.com", "POST", "201"}] + ps.mu.Unlock() + + if c1 != 2 { + t.Errorf("GET/200 count = %d, want 2", c1) + } + if c2 != 1 { + t.Errorf("POST/201 count = %d, want 1", c2) + } +} + +func TestPromStoreServeHTTP(t *testing.T) { + ps := NewPromStore() + ps.Ingest(LogRecord{ + Website: "example.com", Method: "GET", Status: "200", + BodyBytesSent: 100, RequestTime: 0.042, + }) + + req := httptest.NewRequest("GET", "/metrics", nil) + rec := httptest.NewRecorder() + ps.ServeHTTP(rec, req) + + body := rec.Body.String() + + checks := []string{ + "# TYPE nginx_http_requests_total counter", + `nginx_http_requests_total{host="example.com",method="GET",status="200"} 1`, + "# TYPE nginx_http_response_body_bytes histogram", + `nginx_http_response_body_bytes_bucket{host="example.com",le="256"} 1`, // 100 ≤ 256 + `nginx_http_response_body_bytes_count{host="example.com"} 1`, + `nginx_http_response_body_bytes_sum{host="example.com"} 100`, + "# TYPE nginx_http_request_duration_seconds histogram", + `nginx_http_request_duration_seconds_bucket{host="example.com",le="0.05"} 1`, // 0.042 ≤ 0.05 + `nginx_http_request_duration_seconds_count{host="example.com"} 1`, + } + for _, want := range checks { + if !strings.Contains(body, want) { + t.Errorf("missing %q in output:\n%s", want, body) + } + } +} + +func TestPromStoreCounterCap(t *testing.T) { + ps := NewPromStore() + // Fill to cap with distinct {host,method,status} combos + for i := 0; i < promCounterCap+10; i++ { + host := strings.Repeat("x", i%10+1) + ".com" + status := "200" + if i%3 == 0 { + status = "404" + } + ps.Ingest(LogRecord{Website: host, Method: "GET", Status: status}) + } + ps.mu.Lock() + n := len(ps.counters) + ps.mu.Unlock() + if n > promCounterCap { + t.Errorf("counter map size %d exceeds cap %d", n, promCounterCap) + } +} diff --git a/cmd/collector/store.go b/cmd/collector/store.go index 81b72ab..28abbcb 100644 --- a/cmd/collector/store.go +++ b/cmd/collector/store.go @@ -13,6 +13,7 @@ const liveMapCap = 100_000 // hard cap on live map entries // Store holds the live map and both ring buffers. type Store struct { source string + prom *PromStore // optional; if non-nil, receives every ingested record // live map — written only by the Run goroutine; no locking needed on writes live map[st.Tuple6]int64 @@ -41,9 +42,12 @@ func NewStore(source string) *Store { } } -// ingest records one log record into the live map. +// ingest records one log record into the live map and the Prometheus store (if set). // Must only be called from the Run goroutine. func (s *Store) ingest(r LogRecord) { + if s.prom != nil { + s.prom.Ingest(r) + } key := st.Tuple6{Website: r.Website, Prefix: r.ClientPrefix, URI: r.URI, Status: r.Status, IsTor: r.IsTor, ASN: r.ASN} if _, exists := s.live[key]; !exists { if s.liveLen >= liveMapCap { diff --git a/docs/USERGUIDE.md b/docs/USERGUIDE.md index 4b32042..83b8d57 100644 --- a/docs/USERGUIDE.md +++ b/docs/USERGUIDE.md @@ -76,6 +76,7 @@ windows, and exposes a gRPC interface for the aggregator (and directly for the C | Flag | Default | Description | |-------------------|--------------|-----------------------------------------------------------| | `--listen` | `:9090` | gRPC listen address | +| `--prom-listen` | `:9100` | Prometheus metrics address; empty string to disable | | `--logs` | — | Comma-separated log file paths or glob patterns | | `--logs-file` | — | File containing one log path/glob per line | | `--source` | hostname | Name for this collector in query responses | @@ -123,6 +124,73 @@ The collector handles logrotate automatically. On `RENAME`/`REMOVE` events it dr descriptor to EOF (so no lines are lost), then retries opening the original path with backoff until the new file appears. No restart or SIGHUP required. +### Prometheus metrics + +The collector exposes a Prometheus-compatible `/metrics` endpoint on `--prom-listen` (default +`:9100`). Set `--prom-listen ""` to disable it entirely. + +Three metrics are exported: + +**`nginx_http_requests_total`** — counter, labeled `{host, method, status}`: +``` +nginx_http_requests_total{host="example.com",method="GET",status="200"} 18432 +nginx_http_requests_total{host="example.com",method="POST",status="201"} 304 +nginx_http_requests_total{host="api.example.com",method="GET",status="429"} 57 +``` + +**`nginx_http_response_body_bytes`** — histogram, labeled `{host}`. Observes the +`$body_bytes_sent` value for every request. Bucket upper bounds (bytes): +`256, 1024, 4096, 16384, 65536, 262144, 1048576, +Inf`. + +**`nginx_http_request_duration_seconds`** — histogram, labeled `{host}`. Observes the +`$request_time` value for every request. Bucket upper bounds (seconds): +`0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, +Inf`. + +Body and request-time histograms use only the `host` label (not method/status) to keep +cardinality bounded — the label sets stay proportional to the number of virtual hosts, not +the number of unique method × status combinations. + +The counter map is capped at 100 000 distinct `{host, method, status}` tuples. Entries beyond +the cap are silently dropped for the current scrape interval, so memory is bounded regardless +of traffic patterns. + +**Prometheus scrape config:** + +```yaml +scrape_configs: + - job_name: nginx_logtail + static_configs: + - targets: + - nginx1:9100 + - nginx2:9100 + - nginx3:9100 +``` + +Or with service discovery — the collector has no special requirements beyond a reachable +TCP port. + +**Example queries:** + +```promql +# Request rate per host over last 5 minutes +rate(nginx_http_requests_total[5m]) + +# 5xx error rate fraction per host +sum by (host) (rate(nginx_http_requests_total{status=~"5.."}[5m])) + / +sum by (host) (rate(nginx_http_requests_total[5m])) + +# 95th percentile response time per host +histogram_quantile(0.95, + sum by (host, le) (rate(nginx_http_request_duration_seconds_bucket[5m])) +) + +# Median response body size per host +histogram_quantile(0.50, + sum by (host, le) (rate(nginx_http_response_body_bytes_bucket[5m])) +) +``` + ### Memory usage The collector is designed to stay well under 1 GB: