Add prometheus exporter on :9100
This commit is contained in:
@@ -13,9 +13,11 @@ SPECIFICATION
|
||||
This project contains four programs:
|
||||
|
||||
1) A **collector** that tails any number of nginx log files and maintains an in-memory structure of
|
||||
`{website, client_prefix, http_request_uri, http_response}` counts across all files. It answers
|
||||
TopN and Trend queries via gRPC and pushes minute snapshots to the aggregator via server-streaming.
|
||||
Runs on each nginx machine in the cluster. No UI — gRPC interface only.
|
||||
`{website, client_prefix, http_request_uri, http_response, is_tor, asn}` counts across all files.
|
||||
It answers TopN and Trend queries via gRPC and pushes minute snapshots to the aggregator via
|
||||
server-streaming. It also exposes a Prometheus `/metrics` endpoint (default `:9100`) with per-host
|
||||
request counters and response-body/request-time histograms.
|
||||
Runs on each nginx machine in the cluster. No UI — gRPC and HTTP interfaces only.
|
||||
|
||||
2) An **aggregator** that subscribes to the snapshot stream from all collectors, merges their data
|
||||
into a unified in-memory cache, and exposes the same gRPC interface. Answers questions like "what
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"flag"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
@@ -19,6 +20,7 @@ import (
|
||||
|
||||
func main() {
|
||||
listen := flag.String("listen", ":9090", "gRPC listen address")
|
||||
promListen := flag.String("prom-listen", ":9100", "Prometheus metrics listen address (empty to disable)")
|
||||
logPaths := flag.String("logs", "", "comma-separated log file paths/globs to tail")
|
||||
logsFile := flag.String("logs-file", "", "file containing one log path/glob per line")
|
||||
source := flag.String("source", hostname(), "name for this collector (default: hostname)")
|
||||
@@ -40,6 +42,18 @@ func main() {
|
||||
ch := make(chan LogRecord, 200_000)
|
||||
|
||||
store := NewStore(*source)
|
||||
if *promListen != "" {
|
||||
ps := NewPromStore()
|
||||
store.prom = ps
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/metrics", ps)
|
||||
go func() {
|
||||
log.Printf("collector: Prometheus metrics on %s/metrics", *promListen)
|
||||
if err := http.ListenAndServe(*promListen, mux); err != nil {
|
||||
log.Fatalf("collector: Prometheus server: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
go store.Run(ch)
|
||||
|
||||
tailer := NewMultiTailer(patterns, *scanInterval, *v4prefix, *v6prefix, ch)
|
||||
|
||||
@@ -9,12 +9,15 @@ import (
|
||||
|
||||
// LogRecord holds the dimensions extracted from a single nginx log line.
|
||||
type LogRecord struct {
|
||||
Website string
|
||||
ClientPrefix string
|
||||
URI string
|
||||
Status string
|
||||
IsTor bool
|
||||
ASN int32
|
||||
Website string
|
||||
ClientPrefix string
|
||||
URI string
|
||||
Status string
|
||||
IsTor bool
|
||||
ASN int32
|
||||
Method string
|
||||
BodyBytesSent int64
|
||||
RequestTime float64
|
||||
}
|
||||
|
||||
// ParseLine parses a tab-separated logtail log line:
|
||||
@@ -51,13 +54,26 @@ func ParseLine(line string, v4bits, v6bits int) (LogRecord, bool) {
|
||||
}
|
||||
}
|
||||
|
||||
var bodyBytes int64
|
||||
if n, err := strconv.ParseInt(fields[6], 10, 64); err == nil {
|
||||
bodyBytes = n
|
||||
}
|
||||
|
||||
var reqTime float64
|
||||
if f, err := strconv.ParseFloat(fields[7], 64); err == nil {
|
||||
reqTime = f
|
||||
}
|
||||
|
||||
return LogRecord{
|
||||
Website: fields[0],
|
||||
ClientPrefix: prefix,
|
||||
URI: uri,
|
||||
Status: fields[5],
|
||||
IsTor: isTor,
|
||||
ASN: asn,
|
||||
Website: fields[0],
|
||||
ClientPrefix: prefix,
|
||||
URI: uri,
|
||||
Status: fields[5],
|
||||
IsTor: isTor,
|
||||
ASN: asn,
|
||||
Method: fields[3],
|
||||
BodyBytesSent: bodyBytes,
|
||||
RequestTime: reqTime,
|
||||
}, true
|
||||
}
|
||||
|
||||
|
||||
@@ -18,10 +18,13 @@ func TestParseLine(t *testing.T) {
|
||||
line: good,
|
||||
wantOK: true,
|
||||
want: LogRecord{
|
||||
Website: "www.example.com",
|
||||
ClientPrefix: "1.2.3.0/24",
|
||||
URI: "/api/v1/search",
|
||||
Status: "200",
|
||||
Website: "www.example.com",
|
||||
ClientPrefix: "1.2.3.0/24",
|
||||
URI: "/api/v1/search",
|
||||
Status: "200",
|
||||
Method: "GET",
|
||||
BodyBytesSent: 1452,
|
||||
RequestTime: 0.043,
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -33,6 +36,8 @@ func TestParseLine(t *testing.T) {
|
||||
ClientPrefix: "10.0.0.0/24",
|
||||
URI: "/submit",
|
||||
Status: "201",
|
||||
Method: "POST",
|
||||
RequestTime: 0.001,
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -44,6 +49,8 @@ func TestParseLine(t *testing.T) {
|
||||
ClientPrefix: "2001:db8:cafe::/48", // /48 = 3 full 16-bit groups intact
|
||||
URI: "/",
|
||||
Status: "200",
|
||||
Method: "GET",
|
||||
RequestTime: 0.001,
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -70,6 +77,8 @@ func TestParseLine(t *testing.T) {
|
||||
ClientPrefix: "5.6.7.0/24",
|
||||
URI: "/rate-limited",
|
||||
Status: "429",
|
||||
Method: "GET",
|
||||
RequestTime: 0.001,
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -82,6 +91,8 @@ func TestParseLine(t *testing.T) {
|
||||
URI: "/",
|
||||
Status: "200",
|
||||
IsTor: true,
|
||||
Method: "GET",
|
||||
RequestTime: 0.001,
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -94,6 +105,8 @@ func TestParseLine(t *testing.T) {
|
||||
URI: "/",
|
||||
Status: "200",
|
||||
IsTor: false,
|
||||
Method: "GET",
|
||||
RequestTime: 0.001,
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -106,6 +119,8 @@ func TestParseLine(t *testing.T) {
|
||||
URI: "/",
|
||||
Status: "200",
|
||||
IsTor: false,
|
||||
Method: "GET",
|
||||
RequestTime: 0.001,
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -119,6 +134,8 @@ func TestParseLine(t *testing.T) {
|
||||
Status: "200",
|
||||
IsTor: false,
|
||||
ASN: 12345,
|
||||
Method: "GET",
|
||||
RequestTime: 0.001,
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -132,6 +149,8 @@ func TestParseLine(t *testing.T) {
|
||||
Status: "200",
|
||||
IsTor: true,
|
||||
ASN: 65535,
|
||||
Method: "GET",
|
||||
RequestTime: 0.001,
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -145,6 +164,8 @@ func TestParseLine(t *testing.T) {
|
||||
Status: "200",
|
||||
IsTor: true,
|
||||
ASN: 0,
|
||||
Method: "GET",
|
||||
RequestTime: 0.001,
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -158,6 +179,8 @@ func TestParseLine(t *testing.T) {
|
||||
Status: "200",
|
||||
IsTor: false,
|
||||
ASN: 0,
|
||||
Method: "GET",
|
||||
RequestTime: 0.001,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
209
cmd/collector/prom.go
Normal file
209
cmd/collector/prom.go
Normal file
@@ -0,0 +1,209 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Body-size histogram bucket upper bounds in bytes.
|
||||
const promNumBodyBounds = 7
|
||||
|
||||
var promBodyBounds = [promNumBodyBounds]int64{256, 1024, 4096, 16384, 65536, 262144, 1048576}
|
||||
|
||||
// Request-time histogram bucket upper bounds in seconds (standard Prometheus defaults).
|
||||
const promNumTimeBounds = 11
|
||||
|
||||
var promTimeBounds = [promNumTimeBounds]float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
|
||||
|
||||
const promCounterCap = 100_000 // safety cap on {host,method,status} counter entries
|
||||
|
||||
// promCounterKey is the label set for per-request counters.
|
||||
type promCounterKey struct {
|
||||
Host string
|
||||
Method string
|
||||
Status string
|
||||
}
|
||||
|
||||
// promBodyEntry holds the body_bytes_sent histogram for one host.
|
||||
type promBodyEntry struct {
|
||||
buckets [promNumBodyBounds + 1]int64 // indices 0..N-1: le=bound[i]; index N: le=+Inf
|
||||
sum int64
|
||||
}
|
||||
|
||||
// promTimeEntry holds the request_time histogram for one host.
|
||||
type promTimeEntry struct {
|
||||
buckets [promNumTimeBounds + 1]int64
|
||||
sum float64
|
||||
}
|
||||
|
||||
// PromStore accumulates Prometheus metrics ingested from log records.
|
||||
//
|
||||
// Ingest must be called from exactly one goroutine (the store's Run goroutine).
|
||||
// ServeHTTP may be called from any number of goroutines concurrently.
|
||||
type PromStore struct {
|
||||
mu sync.Mutex
|
||||
counters map[promCounterKey]int64
|
||||
body map[string]*promBodyEntry // keyed by host
|
||||
reqTime map[string]*promTimeEntry // keyed by host
|
||||
}
|
||||
|
||||
// NewPromStore returns an empty PromStore ready for use.
|
||||
func NewPromStore() *PromStore {
|
||||
return &PromStore{
|
||||
counters: make(map[promCounterKey]int64, 1024),
|
||||
body: make(map[string]*promBodyEntry, 64),
|
||||
reqTime: make(map[string]*promTimeEntry, 64),
|
||||
}
|
||||
}
|
||||
|
||||
// Ingest records one log record into the Prometheus metrics.
|
||||
// Must be called from a single goroutine.
|
||||
func (p *PromStore) Ingest(r LogRecord) {
|
||||
p.mu.Lock()
|
||||
|
||||
// --- per-{host,method,status} request counter ---
|
||||
ck := promCounterKey{Host: r.Website, Method: r.Method, Status: r.Status}
|
||||
if _, ok := p.counters[ck]; ok {
|
||||
p.counters[ck]++
|
||||
} else if len(p.counters) < promCounterCap {
|
||||
p.counters[ck] = 1
|
||||
}
|
||||
|
||||
// --- body_bytes_sent histogram (keyed by host only) ---
|
||||
be, ok := p.body[r.Website]
|
||||
if !ok {
|
||||
be = &promBodyEntry{}
|
||||
p.body[r.Website] = be
|
||||
}
|
||||
for i, bound := range promBodyBounds {
|
||||
if r.BodyBytesSent <= bound {
|
||||
be.buckets[i]++
|
||||
}
|
||||
}
|
||||
be.buckets[promNumBodyBounds]++ // +Inf
|
||||
be.sum += r.BodyBytesSent
|
||||
|
||||
// --- request_time histogram (keyed by host only) ---
|
||||
te, ok := p.reqTime[r.Website]
|
||||
if !ok {
|
||||
te = &promTimeEntry{}
|
||||
p.reqTime[r.Website] = te
|
||||
}
|
||||
for i, bound := range promTimeBounds {
|
||||
if r.RequestTime <= bound {
|
||||
te.buckets[i]++
|
||||
}
|
||||
}
|
||||
te.buckets[promNumTimeBounds]++ // +Inf
|
||||
te.sum += r.RequestTime
|
||||
|
||||
p.mu.Unlock()
|
||||
}
|
||||
|
||||
// ServeHTTP renders all metrics in the Prometheus text exposition format (0.0.4).
|
||||
func (p *PromStore) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
|
||||
// Snapshot everything under the lock, then render without holding it.
|
||||
p.mu.Lock()
|
||||
|
||||
type counterSnap struct {
|
||||
k promCounterKey
|
||||
v int64
|
||||
}
|
||||
counters := make([]counterSnap, 0, len(p.counters))
|
||||
for k, v := range p.counters {
|
||||
counters = append(counters, counterSnap{k, v})
|
||||
}
|
||||
|
||||
type bodySnap struct {
|
||||
host string
|
||||
e promBodyEntry
|
||||
}
|
||||
bodySnaps := make([]bodySnap, 0, len(p.body))
|
||||
for h, e := range p.body {
|
||||
bodySnaps = append(bodySnaps, bodySnap{h, *e})
|
||||
}
|
||||
|
||||
type timeSnap struct {
|
||||
host string
|
||||
e promTimeEntry
|
||||
}
|
||||
timeSnaps := make([]timeSnap, 0, len(p.reqTime))
|
||||
for h, e := range p.reqTime {
|
||||
timeSnaps = append(timeSnaps, timeSnap{h, *e})
|
||||
}
|
||||
|
||||
p.mu.Unlock()
|
||||
|
||||
// Sort for stable, human-readable output.
|
||||
sort.Slice(counters, func(i, j int) bool {
|
||||
a, b := counters[i].k, counters[j].k
|
||||
if a.Host != b.Host {
|
||||
return a.Host < b.Host
|
||||
}
|
||||
if a.Method != b.Method {
|
||||
return a.Method < b.Method
|
||||
}
|
||||
return a.Status < b.Status
|
||||
})
|
||||
sort.Slice(bodySnaps, func(i, j int) bool { return bodySnaps[i].host < bodySnaps[j].host })
|
||||
sort.Slice(timeSnaps, func(i, j int) bool { return timeSnaps[i].host < timeSnaps[j].host })
|
||||
|
||||
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
|
||||
bw := bufio.NewWriterSize(w, 256*1024)
|
||||
|
||||
// nginx_http_requests_total
|
||||
fmt.Fprintln(bw, "# HELP nginx_http_requests_total Total number of HTTP requests processed.")
|
||||
fmt.Fprintln(bw, "# TYPE nginx_http_requests_total counter")
|
||||
for _, c := range counters {
|
||||
fmt.Fprintf(bw, "nginx_http_requests_total{host=%q,method=%q,status=%q} %d\n",
|
||||
c.k.Host, c.k.Method, c.k.Status, c.v)
|
||||
}
|
||||
|
||||
// nginx_http_response_body_bytes (histogram, labeled by host)
|
||||
fmt.Fprintln(bw, "# HELP nginx_http_response_body_bytes HTTP response body size distribution in bytes.")
|
||||
fmt.Fprintln(bw, "# TYPE nginx_http_response_body_bytes histogram")
|
||||
for _, s := range bodySnaps {
|
||||
for i, bound := range promBodyBounds {
|
||||
fmt.Fprintf(bw, "nginx_http_response_body_bytes_bucket{host=%q,le=%q} %d\n",
|
||||
s.host, fmt.Sprintf("%d", bound), s.e.buckets[i])
|
||||
}
|
||||
fmt.Fprintf(bw, "nginx_http_response_body_bytes_bucket{host=%q,le=\"+Inf\"} %d\n",
|
||||
s.host, s.e.buckets[promNumBodyBounds])
|
||||
fmt.Fprintf(bw, "nginx_http_response_body_bytes_count{host=%q} %d\n",
|
||||
s.host, s.e.buckets[promNumBodyBounds])
|
||||
fmt.Fprintf(bw, "nginx_http_response_body_bytes_sum{host=%q} %d\n",
|
||||
s.host, s.e.sum)
|
||||
}
|
||||
|
||||
// nginx_http_request_duration_seconds (histogram, labeled by host)
|
||||
fmt.Fprintln(bw, "# HELP nginx_http_request_duration_seconds HTTP request processing time in seconds.")
|
||||
fmt.Fprintln(bw, "# TYPE nginx_http_request_duration_seconds histogram")
|
||||
for _, s := range timeSnaps {
|
||||
for i, bound := range promTimeBounds {
|
||||
fmt.Fprintf(bw, "nginx_http_request_duration_seconds_bucket{host=%q,le=%q} %d\n",
|
||||
s.host, formatFloat(bound), s.e.buckets[i])
|
||||
}
|
||||
fmt.Fprintf(bw, "nginx_http_request_duration_seconds_bucket{host=%q,le=\"+Inf\"} %d\n",
|
||||
s.host, s.e.buckets[promNumTimeBounds])
|
||||
fmt.Fprintf(bw, "nginx_http_request_duration_seconds_count{host=%q} %d\n",
|
||||
s.host, s.e.buckets[promNumTimeBounds])
|
||||
fmt.Fprintf(bw, "nginx_http_request_duration_seconds_sum{host=%q} %g\n",
|
||||
s.host, s.e.sum)
|
||||
}
|
||||
|
||||
bw.Flush()
|
||||
}
|
||||
|
||||
// formatFloat renders a float64 bucket bound without trailing zeros but always
|
||||
// with at least one decimal place, matching Prometheus convention (e.g. "0.5", "10").
|
||||
func formatFloat(f float64) string {
|
||||
s := fmt.Sprintf("%g", f)
|
||||
if !strings.Contains(s, ".") && !strings.Contains(s, "e") {
|
||||
s += ".0" // ensure it looks like a float, not an integer
|
||||
}
|
||||
return s
|
||||
}
|
||||
130
cmd/collector/prom_test.go
Normal file
130
cmd/collector/prom_test.go
Normal file
@@ -0,0 +1,130 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestPromStoreIngestBodyBuckets(t *testing.T) {
|
||||
ps := NewPromStore()
|
||||
// 512 bytes: > 256, ≤ 1024 → bucket[0] stays 0, buckets[1..N] get 1
|
||||
ps.Ingest(LogRecord{Website: "example.com", Method: "GET", Status: "200", BodyBytesSent: 512})
|
||||
|
||||
ps.mu.Lock()
|
||||
be := ps.body["example.com"]
|
||||
ps.mu.Unlock()
|
||||
|
||||
if be == nil {
|
||||
t.Fatal("expected body entry, got nil")
|
||||
}
|
||||
if be.buckets[0] != 0 { // le=256: 512 > 256
|
||||
t.Errorf("le=256 bucket = %d, want 0", be.buckets[0])
|
||||
}
|
||||
if be.buckets[1] != 1 { // le=1024: 512 ≤ 1024
|
||||
t.Errorf("le=1024 bucket = %d, want 1", be.buckets[1])
|
||||
}
|
||||
for i := 2; i <= promNumBodyBounds; i++ {
|
||||
if be.buckets[i] != 1 {
|
||||
t.Errorf("bucket[%d] = %d, want 1", i, be.buckets[i])
|
||||
}
|
||||
}
|
||||
if be.sum != 512 {
|
||||
t.Errorf("sum = %d, want 512", be.sum)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPromStoreIngestTimeBuckets(t *testing.T) {
|
||||
ps := NewPromStore()
|
||||
// 0.075s: > 0.05, ≤ 0.1
|
||||
ps.Ingest(LogRecord{Website: "example.com", Method: "GET", Status: "200", RequestTime: 0.075})
|
||||
|
||||
ps.mu.Lock()
|
||||
te := ps.reqTime["example.com"]
|
||||
ps.mu.Unlock()
|
||||
|
||||
if te == nil {
|
||||
t.Fatal("expected time entry, got nil")
|
||||
}
|
||||
// le=0.05 (index 3): 0.075 > 0.05 → 0
|
||||
if te.buckets[3] != 0 {
|
||||
t.Errorf("le=0.05 bucket = %d, want 0", te.buckets[3])
|
||||
}
|
||||
// le=0.1 (index 4): 0.075 ≤ 0.1 → 1
|
||||
if te.buckets[4] != 1 {
|
||||
t.Errorf("le=0.1 bucket = %d, want 1", te.buckets[4])
|
||||
}
|
||||
// +Inf (last): always 1
|
||||
if te.buckets[promNumTimeBounds] != 1 {
|
||||
t.Errorf("+Inf bucket = %d, want 1", te.buckets[promNumTimeBounds])
|
||||
}
|
||||
}
|
||||
|
||||
func TestPromStoreCounter(t *testing.T) {
|
||||
ps := NewPromStore()
|
||||
ps.Ingest(LogRecord{Website: "a.com", Method: "GET", Status: "200"})
|
||||
ps.Ingest(LogRecord{Website: "a.com", Method: "GET", Status: "200"})
|
||||
ps.Ingest(LogRecord{Website: "a.com", Method: "POST", Status: "201"})
|
||||
|
||||
ps.mu.Lock()
|
||||
c1 := ps.counters[promCounterKey{"a.com", "GET", "200"}]
|
||||
c2 := ps.counters[promCounterKey{"a.com", "POST", "201"}]
|
||||
ps.mu.Unlock()
|
||||
|
||||
if c1 != 2 {
|
||||
t.Errorf("GET/200 count = %d, want 2", c1)
|
||||
}
|
||||
if c2 != 1 {
|
||||
t.Errorf("POST/201 count = %d, want 1", c2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPromStoreServeHTTP(t *testing.T) {
|
||||
ps := NewPromStore()
|
||||
ps.Ingest(LogRecord{
|
||||
Website: "example.com", Method: "GET", Status: "200",
|
||||
BodyBytesSent: 100, RequestTime: 0.042,
|
||||
})
|
||||
|
||||
req := httptest.NewRequest("GET", "/metrics", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
ps.ServeHTTP(rec, req)
|
||||
|
||||
body := rec.Body.String()
|
||||
|
||||
checks := []string{
|
||||
"# TYPE nginx_http_requests_total counter",
|
||||
`nginx_http_requests_total{host="example.com",method="GET",status="200"} 1`,
|
||||
"# TYPE nginx_http_response_body_bytes histogram",
|
||||
`nginx_http_response_body_bytes_bucket{host="example.com",le="256"} 1`, // 100 ≤ 256
|
||||
`nginx_http_response_body_bytes_count{host="example.com"} 1`,
|
||||
`nginx_http_response_body_bytes_sum{host="example.com"} 100`,
|
||||
"# TYPE nginx_http_request_duration_seconds histogram",
|
||||
`nginx_http_request_duration_seconds_bucket{host="example.com",le="0.05"} 1`, // 0.042 ≤ 0.05
|
||||
`nginx_http_request_duration_seconds_count{host="example.com"} 1`,
|
||||
}
|
||||
for _, want := range checks {
|
||||
if !strings.Contains(body, want) {
|
||||
t.Errorf("missing %q in output:\n%s", want, body)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPromStoreCounterCap(t *testing.T) {
|
||||
ps := NewPromStore()
|
||||
// Fill to cap with distinct {host,method,status} combos
|
||||
for i := 0; i < promCounterCap+10; i++ {
|
||||
host := strings.Repeat("x", i%10+1) + ".com"
|
||||
status := "200"
|
||||
if i%3 == 0 {
|
||||
status = "404"
|
||||
}
|
||||
ps.Ingest(LogRecord{Website: host, Method: "GET", Status: status})
|
||||
}
|
||||
ps.mu.Lock()
|
||||
n := len(ps.counters)
|
||||
ps.mu.Unlock()
|
||||
if n > promCounterCap {
|
||||
t.Errorf("counter map size %d exceeds cap %d", n, promCounterCap)
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@ const liveMapCap = 100_000 // hard cap on live map entries
|
||||
// Store holds the live map and both ring buffers.
|
||||
type Store struct {
|
||||
source string
|
||||
prom *PromStore // optional; if non-nil, receives every ingested record
|
||||
|
||||
// live map — written only by the Run goroutine; no locking needed on writes
|
||||
live map[st.Tuple6]int64
|
||||
@@ -41,9 +42,12 @@ func NewStore(source string) *Store {
|
||||
}
|
||||
}
|
||||
|
||||
// ingest records one log record into the live map.
|
||||
// ingest records one log record into the live map and the Prometheus store (if set).
|
||||
// Must only be called from the Run goroutine.
|
||||
func (s *Store) ingest(r LogRecord) {
|
||||
if s.prom != nil {
|
||||
s.prom.Ingest(r)
|
||||
}
|
||||
key := st.Tuple6{Website: r.Website, Prefix: r.ClientPrefix, URI: r.URI, Status: r.Status, IsTor: r.IsTor, ASN: r.ASN}
|
||||
if _, exists := s.live[key]; !exists {
|
||||
if s.liveLen >= liveMapCap {
|
||||
|
||||
@@ -76,6 +76,7 @@ windows, and exposes a gRPC interface for the aggregator (and directly for the C
|
||||
| Flag | Default | Description |
|
||||
|-------------------|--------------|-----------------------------------------------------------|
|
||||
| `--listen` | `:9090` | gRPC listen address |
|
||||
| `--prom-listen` | `:9100` | Prometheus metrics address; empty string to disable |
|
||||
| `--logs` | — | Comma-separated log file paths or glob patterns |
|
||||
| `--logs-file` | — | File containing one log path/glob per line |
|
||||
| `--source` | hostname | Name for this collector in query responses |
|
||||
@@ -123,6 +124,73 @@ The collector handles logrotate automatically. On `RENAME`/`REMOVE` events it dr
|
||||
descriptor to EOF (so no lines are lost), then retries opening the original path with backoff until
|
||||
the new file appears. No restart or SIGHUP required.
|
||||
|
||||
### Prometheus metrics
|
||||
|
||||
The collector exposes a Prometheus-compatible `/metrics` endpoint on `--prom-listen` (default
|
||||
`:9100`). Set `--prom-listen ""` to disable it entirely.
|
||||
|
||||
Three metrics are exported:
|
||||
|
||||
**`nginx_http_requests_total`** — counter, labeled `{host, method, status}`:
|
||||
```
|
||||
nginx_http_requests_total{host="example.com",method="GET",status="200"} 18432
|
||||
nginx_http_requests_total{host="example.com",method="POST",status="201"} 304
|
||||
nginx_http_requests_total{host="api.example.com",method="GET",status="429"} 57
|
||||
```
|
||||
|
||||
**`nginx_http_response_body_bytes`** — histogram, labeled `{host}`. Observes the
|
||||
`$body_bytes_sent` value for every request. Bucket upper bounds (bytes):
|
||||
`256, 1024, 4096, 16384, 65536, 262144, 1048576, +Inf`.
|
||||
|
||||
**`nginx_http_request_duration_seconds`** — histogram, labeled `{host}`. Observes the
|
||||
`$request_time` value for every request. Bucket upper bounds (seconds):
|
||||
`0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, +Inf`.
|
||||
|
||||
Body and request-time histograms use only the `host` label (not method/status) to keep
|
||||
cardinality bounded — the label sets stay proportional to the number of virtual hosts, not
|
||||
the number of unique method × status combinations.
|
||||
|
||||
The counter map is capped at 100 000 distinct `{host, method, status}` tuples. Entries beyond
|
||||
the cap are silently dropped for the current scrape interval, so memory is bounded regardless
|
||||
of traffic patterns.
|
||||
|
||||
**Prometheus scrape config:**
|
||||
|
||||
```yaml
|
||||
scrape_configs:
|
||||
- job_name: nginx_logtail
|
||||
static_configs:
|
||||
- targets:
|
||||
- nginx1:9100
|
||||
- nginx2:9100
|
||||
- nginx3:9100
|
||||
```
|
||||
|
||||
Or with service discovery — the collector has no special requirements beyond a reachable
|
||||
TCP port.
|
||||
|
||||
**Example queries:**
|
||||
|
||||
```promql
|
||||
# Request rate per host over last 5 minutes
|
||||
rate(nginx_http_requests_total[5m])
|
||||
|
||||
# 5xx error rate fraction per host
|
||||
sum by (host) (rate(nginx_http_requests_total{status=~"5.."}[5m]))
|
||||
/
|
||||
sum by (host) (rate(nginx_http_requests_total[5m]))
|
||||
|
||||
# 95th percentile response time per host
|
||||
histogram_quantile(0.95,
|
||||
sum by (host, le) (rate(nginx_http_request_duration_seconds_bucket[5m]))
|
||||
)
|
||||
|
||||
# Median response body size per host
|
||||
histogram_quantile(0.50,
|
||||
sum by (host, le) (rate(nginx_http_response_body_bytes_bucket[5m]))
|
||||
)
|
||||
```
|
||||
|
||||
### Memory usage
|
||||
|
||||
The collector is designed to stay well under 1 GB:
|
||||
|
||||
Reference in New Issue
Block a user