From 6647f95be460b4c8db8bae072e56edafa59bae95 Mon Sep 17 00:00:00 2001 From: Pim van Pelt Date: Fri, 1 May 2026 15:40:53 +0200 Subject: [PATCH] RELEASE 1.0.1: v2 log format, source_tag-labeled metrics, lint cleanup Wire-format and metric overhaul. Both file and UDP ingest now share one versioned ParseLine that dispatches on the v\t prefix; v1 stays unchanged, v2 adds $bytes_sent (replacing $body_bytes_sent), $request_length, $upstream_response_time, and $upstream_status. File ingest gains the same versioning, and the legacy positional file format is removed (no live deployments). Prometheus exposition is rewritten: - nginx_http_bytes_sent and nginx_http_request_duration_seconds gain a source_tag label. - nginx_http_requests_by_source_total gains status_class. - New v2-only metrics: nginx_http_request_bytes, nginx_http_upstream_duration_seconds, nginx_http_upstream_requests_total{status_class}. - Dropped nginx_http_response_body_bytes_by_source (subsumed by the dual-labeled bytes_sent metric). Adds 'make fixstyle' (gofmt -w) and clears all golangci-lint findings across the repo (errcheck, S1001, ST1005, unused). Docs in design.md FR-2/FR-8 and user-guide.md are rewritten to present v2 as the recommended log format. --- Makefile | 10 +- cmd/aggregator/aggregator_test.go | 6 +- cmd/aggregator/backfill.go | 2 +- cmd/aggregator/cache.go | 8 +- cmd/aggregator/subscriber.go | 2 +- cmd/cli/cli_test.go | 30 +-- cmd/cli/cmd_stream.go | 4 +- cmd/cli/cmd_targets.go | 6 +- cmd/cli/cmd_topn.go | 4 +- cmd/cli/cmd_trend.go | 4 +- cmd/cli/flags.go | 6 - cmd/cli/format.go | 6 +- cmd/collector/main.go | 2 +- cmd/collector/parser.go | 161 +++++++----- cmd/collector/parser_test.go | 411 ++++++++++++------------------ cmd/collector/prom.go | 387 ++++++++++++++++++---------- cmd/collector/prom_test.go | 154 ++++++++--- cmd/collector/smoke_test.go | 17 +- cmd/collector/tailer.go | 12 +- cmd/collector/tailer_test.go | 15 +- cmd/collector/udp.go | 8 +- cmd/collector/udp_test.go | 16 +- cmd/frontend/filter.go | 2 +- cmd/frontend/frontend_test.go | 4 +- cmd/frontend/handler.go | 6 +- cmd/frontend/main.go | 2 +- docs/design.md | 164 ++++++++---- docs/user-guide.md | 206 ++++++++------- 28 files changed, 931 insertions(+), 724 deletions(-) diff --git a/Makefile b/Makefile index 1215727..d3c5531 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ PROTO_FILE := $(PROTO_DIR)/logtail.proto GEN_FILES := proto/logtailpb/logtail.pb.go proto/logtailpb/logtail_grpc.pb.go NATIVE_ARCH := $(shell go env GOARCH) -VERSION := 0.9.2 +VERSION := 1.0.1 COMMIT_HASH := $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE := $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -s -w \ @@ -28,7 +28,7 @@ GO_VERSION ?= 1.24.6 # accepts. Older releases can't parse recent Go syntax. GOLANGCI_LINT_VERSION ?= 1.64.0 -.PHONY: help all build build-amd64 build-arm64 test lint proto pkg-deb docker docker-push clean \ +.PHONY: help all build build-amd64 build-arm64 test lint fixstyle proto pkg-deb docker docker-push clean \ install-deps install-deps-apt install-deps-go install-deps-go-tools # help is the default target. Keep the list aligned with the .PHONY block above. @@ -40,6 +40,7 @@ help: @echo " build-arm64 build all four binaries for linux/arm64 into build/arm64/" @echo " test run 'go test ./...'" @echo " lint run 'golangci-lint run ./...'" + @echo " fixstyle rewrite all .go files in place with 'gofmt -w'" @echo " proto regenerate proto/logtailpb/*.pb.go from proto/logtail.proto" @echo " pkg-deb build amd64 + arm64 .deb packages (requires dpkg-deb)" @echo " docker buildx --load two tags ($(IMAGE):v$(VERSION), $(IMAGE):latest) for the native arch" @@ -74,6 +75,11 @@ test: $(GEN_FILES) lint: golangci-lint run ./... +# fixstyle rewrites all tracked .go files with gofmt. Generated proto files are +# excluded — they are regenerated from .proto via `make proto`. +fixstyle: + gofmt -w $(shell find . -name '*.go' -not -path './proto/logtailpb/*') + proto: $(GEN_FILES) # protoc's go_package option places output at the go_package path (not source-relative). diff --git a/cmd/aggregator/aggregator_test.go b/cmd/aggregator/aggregator_test.go index 29c84fb..24f4416 100644 --- a/cmd/aggregator/aggregator_test.go +++ b/cmd/aggregator/aggregator_test.go @@ -262,7 +262,7 @@ func startFakeCollector(t *testing.T, snaps []*pb.Snapshot) string { } srv := grpc.NewServer() pb.RegisterLogtailServiceServer(srv, &fakeCollector{snaps: snaps}) - go srv.Serve(lis) + go func() { _ = srv.Serve(lis) }() t.Cleanup(srv.Stop) return lis.Addr().String() } @@ -310,7 +310,7 @@ func TestGRPCEndToEnd(t *testing.T) { } grpcSrv := grpc.NewServer() pb.RegisterLogtailServiceServer(grpcSrv, NewServer(cache, "agg-test", NewTargetRegistry(nil))) - go grpcSrv.Serve(lis) + go func() { _ = grpcSrv.Serve(lis) }() defer grpcSrv.Stop() conn, err := grpc.NewClient(lis.Addr().String(), @@ -318,7 +318,7 @@ func TestGRPCEndToEnd(t *testing.T) { if err != nil { t.Fatal(err) } - defer conn.Close() + defer func() { _ = conn.Close() }() client := pb.NewLogtailServiceClient(conn) qctx, qcancel := context.WithTimeout(context.Background(), 5*time.Second) defer qcancel() diff --git a/cmd/aggregator/backfill.go b/cmd/aggregator/backfill.go index e3c2fe0..62bf495 100644 --- a/cmd/aggregator/backfill.go +++ b/cmd/aggregator/backfill.go @@ -84,7 +84,7 @@ func dumpCollector(ctx context.Context, addr string) (fine, coarse []st.Snapshot if err != nil { return nil, nil, err } - defer conn.Close() + defer func() { _ = conn.Close() }() client := pb.NewLogtailServiceClient(conn) stream, err := client.DumpSnapshots(ctx, &pb.DumpSnapshotsRequest{}) diff --git a/cmd/aggregator/cache.go b/cmd/aggregator/cache.go index 00be9e7..45227c6 100644 --- a/cmd/aggregator/cache.go +++ b/cmd/aggregator/cache.go @@ -97,15 +97,11 @@ func (c *Cache) LoadHistorical(fine, coarse []st.Snapshot) { c.mu.Lock() defer c.mu.Unlock() - for i, snap := range fine { - c.fineRing[i] = snap - } + copy(c.fineRing[:], fine) c.fineFilled = len(fine) c.fineHead = len(fine) % st.FineRingSize - for i, snap := range coarse { - c.coarseRing[i] = snap - } + copy(c.coarseRing[:], coarse) c.coarseFilled = len(coarse) c.coarseHead = len(coarse) % st.CoarseRingSize } diff --git a/cmd/aggregator/subscriber.go b/cmd/aggregator/subscriber.go index 7e0c497..70606ae 100644 --- a/cmd/aggregator/subscriber.go +++ b/cmd/aggregator/subscriber.go @@ -77,7 +77,7 @@ func (cs *CollectorSub) stream(ctx context.Context) (bool, error) { if err != nil { return false, err } - defer conn.Close() + defer func() { _ = conn.Close() }() client := pb.NewLogtailServiceClient(conn) stream, err := client.StreamSnapshots(ctx, &pb.SnapshotRequest{}) diff --git a/cmd/cli/cli_test.go b/cmd/cli/cli_test.go index 93dc72e..788d024 100644 --- a/cmd/cli/cli_test.go +++ b/cmd/cli/cli_test.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "context" "encoding/json" "net" @@ -11,7 +10,6 @@ import ( pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) // --- Unit tests --- @@ -149,21 +147,11 @@ func startFake(t *testing.T, fs *fakeServer) string { } srv := grpc.NewServer() pb.RegisterLogtailServiceServer(srv, fs) - go srv.Serve(lis) + go func() { _ = srv.Serve(lis) }() t.Cleanup(srv.GracefulStop) return lis.Addr().String() } -func dialTest(t *testing.T, addr string) pb.LogtailServiceClient { - t.Helper() - conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - t.Fatal(err) - } - t.Cleanup(func() { conn.Close() }) - return pb.NewLogtailServiceClient(conn) -} - // --- TopN tests --- func TestTopNSingleTarget(t *testing.T) { @@ -225,23 +213,7 @@ func TestTopNJSON(t *testing.T) { }) results := fanOutTopN([]string{addr}, nil, pb.GroupBy_WEBSITE, 10, pb.Window_W5M) - var buf bytes.Buffer - // Redirect stdout not needed; call JSON formatter directly. r := results[0] - // Build expected JSON by calling printTopNJSON with a captured stdout. - // We test indirectly: marshal manually and compare fields. - type entry struct { - Label string `json:"label"` - Count int64 `json:"count"` - } - type out struct { - Source string `json:"source"` - Target string `json:"target"` - Entries []entry `json:"entries"` - } - _ = buf - _ = r - // Verify the response fields are correct for JSON serialization. if r.resp.Source != "agg" { t.Errorf("source = %q", r.resp.Source) } diff --git a/cmd/cli/cmd_stream.go b/cmd/cli/cmd_stream.go index 7065214..7262417 100644 --- a/cmd/cli/cmd_stream.go +++ b/cmd/cli/cmd_stream.go @@ -24,7 +24,7 @@ type streamEvent struct { func runStream(args []string) { fs := flag.NewFlagSet("stream", flag.ExitOnError) sf, targetFlag := bindShared(fs) - fs.Parse(args) + _ = fs.Parse(args) // ExitOnError: only returns nil here sf.resolve(*targetFlag) filter := buildFilter(sf) @@ -85,7 +85,7 @@ func streamOnce(ctx context.Context, addr string, filter *pb.Filter, events chan if err != nil { return err } - defer conn.Close() + defer func() { _ = conn.Close() }() stream, err := client.StreamSnapshots(ctx, &pb.SnapshotRequest{}) if err != nil { diff --git a/cmd/cli/cmd_targets.go b/cmd/cli/cmd_targets.go index a1eab4e..ef557c1 100644 --- a/cmd/cli/cmd_targets.go +++ b/cmd/cli/cmd_targets.go @@ -18,7 +18,7 @@ func runTargets(args []string) { fmt.Fprintln(os.Stderr, "usage: logtail-cli targets [--target host:port] [--json]") fs.PrintDefaults() } - fs.Parse(args) + _ = fs.Parse(args) // ExitOnError: only returns nil here sf.resolve(*target) for _, addr := range sf.targets { @@ -30,7 +30,7 @@ func runTargets(args []string) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) resp, err := client.ListTargets(ctx, &pb.ListTargetsRequest{}) cancel() - conn.Close() + _ = conn.Close() if err != nil { fmt.Fprintf(os.Stderr, "targets: %s: %v\n", addr, err) continue @@ -43,7 +43,7 @@ func runTargets(args []string) { Addr string `json:"addr"` } for _, t := range resp.Targets { - json.NewEncoder(os.Stdout).Encode(row{QueryTarget: addr, Name: t.Name, Addr: t.Addr}) + _ = json.NewEncoder(os.Stdout).Encode(row{QueryTarget: addr, Name: t.Name, Addr: t.Addr}) } } else { if len(sf.targets) > 1 { diff --git a/cmd/cli/cmd_topn.go b/cmd/cli/cmd_topn.go index 1a5f57c..34d5d04 100644 --- a/cmd/cli/cmd_topn.go +++ b/cmd/cli/cmd_topn.go @@ -24,7 +24,7 @@ func runTopN(args []string) { n := fs.Int("n", 10, "number of entries") window := fs.String("window", "5m", "time window: 1m 5m 15m 60m 6h 24h") groupBy := fs.String("group-by", "website", "group by: website prefix uri status") - fs.Parse(args) + _ = fs.Parse(args) // ExitOnError: only returns nil here sf.resolve(*targetFlag) win := parseWindow(*window) @@ -66,7 +66,7 @@ func fanOutTopN(targets []string, filter *pb.Filter, groupBy pb.GroupBy, n int, results[i].resp = &pb.TopNResponse{} return } - defer conn.Close() + defer func() { _ = conn.Close() }() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() resp, err := client.TopN(ctx, &pb.TopNRequest{ diff --git a/cmd/cli/cmd_trend.go b/cmd/cli/cmd_trend.go index cd0e70f..e1af39a 100644 --- a/cmd/cli/cmd_trend.go +++ b/cmd/cli/cmd_trend.go @@ -22,7 +22,7 @@ func runTrend(args []string) { fs := flag.NewFlagSet("trend", flag.ExitOnError) sf, targetFlag := bindShared(fs) window := fs.String("window", "5m", "time window: 1m 5m 15m 60m 6h 24h") - fs.Parse(args) + _ = fs.Parse(args) // ExitOnError: only returns nil here sf.resolve(*targetFlag) win := parseWindow(*window) @@ -63,7 +63,7 @@ func fanOutTrend(targets []string, filter *pb.Filter, window pb.Window) []trendR results[i].resp = &pb.TrendResponse{} return } - defer conn.Close() + defer func() { _ = conn.Close() }() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() resp, err := client.Trend(ctx, &pb.TrendRequest{ diff --git a/cmd/cli/flags.go b/cmd/cli/flags.go index 5facc48..89d1aac 100644 --- a/cmd/cli/flags.go +++ b/cmd/cli/flags.go @@ -167,9 +167,3 @@ func parseGroupBy(s string) pb.GroupBy { panic("unreachable") } } - -func dieUsage(fs *flag.FlagSet, msg string) { - fmt.Fprintln(os.Stderr, msg) - fs.PrintDefaults() - os.Exit(1) -} diff --git a/cmd/cli/format.go b/cmd/cli/format.go index cbb6261..41170ee 100644 --- a/cmd/cli/format.go +++ b/cmd/cli/format.go @@ -16,17 +16,17 @@ func printTable(w io.Writer, rows [][]string) { } tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0) for i, row := range rows { - fmt.Fprintln(tw, strings.Join(row, "\t")) + _, _ = fmt.Fprintln(tw, strings.Join(row, "\t")) if i == 0 { // Print a divider matching the header width. dashes := make([]string, len(row)) for j, h := range row { dashes[j] = strings.Repeat("-", len(h)) } - fmt.Fprintln(tw, strings.Join(dashes, "\t")) + _, _ = fmt.Fprintln(tw, strings.Join(dashes, "\t")) } } - tw.Flush() + _ = tw.Flush() } // fmtCount formats a count with a space as the thousands separator. diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 9388fb0..61cd134 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -128,7 +128,7 @@ func collectPatterns(logPaths, logsFile string) []string { if err != nil { log.Fatalf("collector: cannot open --logs-file %s: %v", logsFile, err) } - defer f.Close() + defer func() { _ = f.Close() }() sc := bufio.NewScanner(f) for sc.Scan() { if p := strings.TrimSpace(sc.Text()); p != "" && !strings.HasPrefix(p, "#") { diff --git a/cmd/collector/parser.go b/cmd/collector/parser.go index 59c7dd4..6a16e81 100644 --- a/cmd/collector/parser.go +++ b/cmd/collector/parser.go @@ -8,87 +8,57 @@ import ( ) // LogRecord holds the dimensions extracted from a single nginx log line. +// +// BytesSent carries $body_bytes_sent for v1 records and $bytes_sent for v2 +// records — operators see a small step up when emitters move to v2 because v2 +// includes header overhead. +// +// RequestLength, UpstreamResponseTime, UpstreamStatus, HasUpstream are v2-only. +// In v1 records HasUpstream is always false and the related fields are zero. type LogRecord struct { - Website string - ClientPrefix string - URI string - Status string - IsTor bool - ASN int32 - Method string - BodyBytesSent int64 - RequestTime float64 - SourceTag string + Website string + ClientPrefix string + URI string + Status string + IsTor bool + ASN int32 + Method string + BytesSent int64 + RequestLength int64 + RequestTime float64 + UpstreamResponseTime float64 + UpstreamStatus string + HasUpstream bool + SourceTag string } -// fileSourceTag is the SourceTag assigned to records read from on-disk log -// files, which pre-date the tag concept. Mirrors nginx's fallback label. -const fileSourceTag = "direct" - -// ParseLine parses a tab-separated logtail log line from a file: -// -// $host \t $remote_addr \t $msec \t $request_method \t $request_uri \t $status \t $body_bytes_sent \t $request_time \t $is_tor \t $asn -// -// The is_tor (field 9) and asn (field 10) fields are optional for backward -// compatibility with older log files that omit them; they default to false/0 -// when absent. SourceTag is always set to "direct" (file origin has no tag). -// Returns false for lines with fewer than 8 fields. +// ParseLine parses a versioned nginx-logtail line. Both file ingest and UDP +// ingest funnel through here. Every line MUST start with "v\t"; unknown or +// missing versions return false so operators can ship a parser update before +// the emitter switches. func ParseLine(line string, v4bits, v6bits int) (LogRecord, bool) { - fields := strings.SplitN(line, "\t", 10) - if len(fields) < 8 { - return LogRecord{}, false - } - prefix, ok := truncateIP(fields[1], v4bits, v6bits) - if !ok { - return LogRecord{}, false - } - isTor := len(fields) >= 9 && fields[8] == "1" - var asn int32 - if len(fields) == 10 { - if n, err := strconv.ParseInt(fields[9], 10, 32); err == nil { - asn = int32(n) - } - } - return LogRecord{ - Website: fields[0], - ClientPrefix: prefix, - URI: stripQuery(fields[4]), - Status: fields[5], - IsTor: isTor, - ASN: asn, - Method: fields[3], - BodyBytesSent: parseInt(fields[6]), - RequestTime: parseFloat(fields[7]), - SourceTag: fileSourceTag, - }, true -} - -// ParseUDPLine dispatches on the version prefix emitted by -// nginx-ipng-stats-plugin's ipng_stats_logtail directive. The wire format is -// "v\t", where is version-specific. Unknown or missing -// versions return false so operators can roll out a v2 parser before -// upgrading emitters. -func ParseUDPLine(line string, v4bits, v6bits int) (LogRecord, bool) { i := strings.IndexByte(line, '\t') if i < 0 { return LogRecord{}, false } switch line[:i] { case "v1": - return parseUDPLineV1(line[i+1:], v4bits, v6bits) + return parseV1(line[i+1:], v4bits, v6bits) + case "v2": + return parseV2(line[i+1:], v4bits, v6bits) default: return LogRecord{}, false } } -// parseUDPLineV1 parses the v1 payload (12 tab-separated fields): +// parseV1 parses the v1 payload (12 tab-separated fields): // // $host \t $remote_addr \t $request_method \t $request_uri \t $status \t // $body_bytes_sent \t $request_time \t $is_tor \t $asn \t // $ipng_source_tag \t $server_addr \t $scheme // -// server_addr and scheme are parsed but discarded. -func parseUDPLineV1(payload string, v4bits, v6bits int) (LogRecord, bool) { +// $server_addr and $scheme are parsed but discarded. +func parseV1(payload string, v4bits, v6bits int) (LogRecord, bool) { fields := strings.Split(payload, "\t") if len(fields) != 12 { return LogRecord{}, false @@ -102,17 +72,76 @@ func parseUDPLineV1(payload string, v4bits, v6bits int) (LogRecord, bool) { asn = int32(n) } return LogRecord{ + Website: fields[0], + ClientPrefix: prefix, + URI: stripQuery(fields[3]), + Status: fields[4], + IsTor: fields[7] == "1", + ASN: asn, + Method: fields[2], + BytesSent: parseInt(fields[5]), + RequestTime: parseFloat(fields[6]), + SourceTag: fields[9], + }, true +} + +// parseV2 parses the v2 payload (15 tab-separated fields): +// +// $host \t $remote_addr \t $request_method \t $request_uri \t $status \t +// $bytes_sent \t $request_length \t $request_time \t +// $upstream_response_time \t $upstream_status \t +// $is_tor \t $asn \t $ipng_source_tag \t $server_addr \t $scheme +// +// $upstream_response_time and $upstream_status are "-" (or empty) when nginx +// served the response directly — HasUpstream is left false in that case. +// When nginx retried across multiple upstreams the fields are comma-separated; +// the parser keeps the last entry, since that's the upstream that actually +// served the response. $server_addr and $scheme are parsed but discarded. +func parseV2(payload string, v4bits, v6bits int) (LogRecord, bool) { + fields := strings.Split(payload, "\t") + if len(fields) != 15 { + return LogRecord{}, false + } + prefix, ok := truncateIP(fields[1], v4bits, v6bits) + if !ok { + return LogRecord{}, false + } + var asn int32 + if n, err := strconv.ParseInt(fields[11], 10, 32); err == nil { + asn = int32(n) + } + r := LogRecord{ Website: fields[0], ClientPrefix: prefix, URI: stripQuery(fields[3]), Status: fields[4], - IsTor: fields[7] == "1", + IsTor: fields[10] == "1", ASN: asn, Method: fields[2], - BodyBytesSent: parseInt(fields[5]), - RequestTime: parseFloat(fields[6]), - SourceTag: fields[9], - }, true + BytesSent: parseInt(fields[5]), + RequestLength: parseInt(fields[6]), + RequestTime: parseFloat(fields[7]), + SourceTag: fields[12], + } + if fields[8] != "-" && fields[8] != "" { + timeStr := lastCommaPart(fields[8]) + statusStr := lastCommaPart(fields[9]) + if t, err := strconv.ParseFloat(timeStr, 64); err == nil { + r.UpstreamResponseTime = t + r.UpstreamStatus = statusStr + r.HasUpstream = true + } + } + return r, true +} + +// lastCommaPart returns the substring after the last ", " (nginx's separator +// for retried upstreams). Plain values pass through unchanged. +func lastCommaPart(s string) string { + if i := strings.LastIndex(s, ", "); i >= 0 { + return s[i+2:] + } + return s } func stripQuery(uri string) string { diff --git a/cmd/collector/parser_test.go b/cmd/collector/parser_test.go index 5020cc5..6dee8f4 100644 --- a/cmd/collector/parser_test.go +++ b/cmd/collector/parser_test.go @@ -4,215 +4,7 @@ import ( "testing" ) -func TestParseLine(t *testing.T) { - good := "www.example.com\t1.2.3.4\t1741954800.123\tGET\t/api/v1/search?q=foo&x=1\t200\t1452\t0.043" - - tests := []struct { - name string - line string - wantOK bool - want LogRecord - }{ - { - name: "normal IPv4 line strips query string", - line: good, - wantOK: true, - want: LogRecord{ - Website: "www.example.com", - ClientPrefix: "1.2.3.0/24", - URI: "/api/v1/search", - Status: "200", - Method: "GET", - BodyBytesSent: 1452, - RequestTime: 0.043, - SourceTag: "direct", - }, - }, - { - name: "URI with no query string", - line: "host\t10.0.0.1\t0\tPOST\t/submit\t201\t0\t0.001", - wantOK: true, - want: LogRecord{ - Website: "host", - ClientPrefix: "10.0.0.0/24", - URI: "/submit", - Status: "201", - Method: "POST", - RequestTime: 0.001, - SourceTag: "direct", - }, - }, - { - name: "IPv6 address truncated to /48", - line: "host\t2001:db8:cafe::1\t0\tGET\t/\t200\t0\t0.001", - wantOK: true, - want: LogRecord{ - Website: "host", - ClientPrefix: "2001:db8:cafe::/48", - URI: "/", - Status: "200", - Method: "GET", - RequestTime: 0.001, - SourceTag: "direct", - }, - }, - { - name: "too few fields returns false", - line: "host\t1.2.3.4\t0\tGET\t/", - wantOK: false, - }, - { - name: "empty line returns false", - line: "", - wantOK: false, - }, - { - name: "invalid IP returns false", - line: "host\tnot-an-ip\t0\tGET\t/\t200\t0\t0.001", - wantOK: false, - }, - { - name: "status 429", - line: "api.example.com\t5.6.7.8\t0\tGET\t/rate-limited\t429\t0\t0.001", - wantOK: true, - want: LogRecord{ - Website: "api.example.com", - ClientPrefix: "5.6.7.0/24", - URI: "/rate-limited", - Status: "429", - Method: "GET", - RequestTime: 0.001, - SourceTag: "direct", - }, - }, - { - name: "is_tor=1 sets IsTor true", - line: "tor.example.com\t1.2.3.4\t0\tGET\t/\t200\t0\t0.001\t1", - wantOK: true, - want: LogRecord{ - Website: "tor.example.com", - ClientPrefix: "1.2.3.0/24", - URI: "/", - Status: "200", - IsTor: true, - Method: "GET", - RequestTime: 0.001, - SourceTag: "direct", - }, - }, - { - name: "is_tor=0 sets IsTor false", - line: "normal.example.com\t1.2.3.4\t0\tGET\t/\t200\t0\t0.001\t0", - wantOK: true, - want: LogRecord{ - Website: "normal.example.com", - ClientPrefix: "1.2.3.0/24", - URI: "/", - Status: "200", - IsTor: false, - Method: "GET", - RequestTime: 0.001, - SourceTag: "direct", - }, - }, - { - name: "missing is_tor field defaults to false (backward compat)", - line: "old.example.com\t1.2.3.4\t0\tGET\t/\t200\t0\t0.001", - wantOK: true, - want: LogRecord{ - Website: "old.example.com", - ClientPrefix: "1.2.3.0/24", - URI: "/", - Status: "200", - IsTor: false, - Method: "GET", - RequestTime: 0.001, - SourceTag: "direct", - }, - }, - { - name: "asn field parsed", - line: "asn.example.com\t1.2.3.4\t0\tGET\t/\t200\t0\t0.001\t0\t12345", - wantOK: true, - want: LogRecord{ - Website: "asn.example.com", - ClientPrefix: "1.2.3.0/24", - URI: "/", - Status: "200", - IsTor: false, - ASN: 12345, - Method: "GET", - RequestTime: 0.001, - SourceTag: "direct", - }, - }, - { - name: "asn field with is_tor=1", - line: "both.example.com\t1.2.3.4\t0\tGET\t/\t200\t0\t0.001\t1\t65535", - wantOK: true, - want: LogRecord{ - Website: "both.example.com", - ClientPrefix: "1.2.3.0/24", - URI: "/", - Status: "200", - IsTor: true, - ASN: 65535, - Method: "GET", - RequestTime: 0.001, - SourceTag: "direct", - }, - }, - { - name: "missing asn field defaults to 0 (backward compat)", - line: "noasn.example.com\t1.2.3.4\t0\tGET\t/\t200\t0\t0.001\t1", - wantOK: true, - want: LogRecord{ - Website: "noasn.example.com", - ClientPrefix: "1.2.3.0/24", - URI: "/", - Status: "200", - IsTor: true, - ASN: 0, - Method: "GET", - RequestTime: 0.001, - SourceTag: "direct", - }, - }, - { - name: "invalid asn field defaults to 0", - line: "badann.example.com\t1.2.3.4\t0\tGET\t/\t200\t0\t0.001\t0\tnot-a-number", - wantOK: true, - want: LogRecord{ - Website: "badann.example.com", - ClientPrefix: "1.2.3.0/24", - URI: "/", - Status: "200", - IsTor: false, - ASN: 0, - Method: "GET", - RequestTime: 0.001, - SourceTag: "direct", - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - got, ok := ParseLine(tc.line, 24, 48) - if ok != tc.wantOK { - t.Fatalf("ParseLine ok=%v, want %v", ok, tc.wantOK) - } - if !tc.wantOK { - return - } - if got != tc.want { - t.Errorf("got %+v, want %+v", got, tc.want) - } - }) - } -} - -func TestParseUDPLine(t *testing.T) { +func TestParseLineV1(t *testing.T) { // v1 \t host \t remote_addr \t method \t uri \t status \t body_bytes \t req_time \t // is_tor \t asn \t source_tag \t server_addr \t scheme good := "v1\twww.example.com\t1.2.3.4\tGET\t/api/v1/search?q=foo\t200\t1452\t0.043\t0\t12345\tcdn\t10.0.0.1\thttps" @@ -228,16 +20,16 @@ func TestParseUDPLine(t *testing.T) { line: good, wantOK: true, want: LogRecord{ - Website: "www.example.com", - ClientPrefix: "1.2.3.0/24", - URI: "/api/v1/search", - Status: "200", - IsTor: false, - ASN: 12345, - Method: "GET", - BodyBytesSent: 1452, - RequestTime: 0.043, - SourceTag: "cdn", + Website: "www.example.com", + ClientPrefix: "1.2.3.0/24", + URI: "/api/v1/search", + Status: "200", + IsTor: false, + ASN: 12345, + Method: "GET", + BytesSent: 1452, + RequestTime: 0.043, + SourceTag: "cdn", }, }, { @@ -245,16 +37,16 @@ func TestParseUDPLine(t *testing.T) { line: "v1\th\t2001:db8::1\tGET\t/\t200\t0\t0\t1\t65535\tdirect\t::1\thttp", wantOK: true, want: LogRecord{ - Website: "h", - ClientPrefix: "2001:db8::/48", - URI: "/", - Status: "200", - IsTor: true, - ASN: 65535, - Method: "GET", - BodyBytesSent: 0, - RequestTime: 0, - SourceTag: "direct", + Website: "h", + ClientPrefix: "2001:db8::/48", + URI: "/", + Status: "200", + IsTor: true, + ASN: 65535, + Method: "GET", + BytesSent: 0, + RequestTime: 0, + SourceTag: "direct", }, }, { @@ -272,28 +64,13 @@ func TestParseUDPLine(t *testing.T) { line: "v1\th\tnope\tGET\t/\t200\t0\t0\t0\t0\ttag\t10.0.0.1\thttp", wantOK: false, }, - { - name: "unknown version rejected (future v2)", - line: "v2\twww.example.com\t1.2.3.4\tGET\t/\t200\t0\t0\t0\t0\ttag\t10.0.0.1\thttp", - wantOK: false, - }, - { - name: "missing version prefix rejected (legacy 12-field line)", - line: "www.example.com\t1.2.3.4\tGET\t/\t200\t0\t0\t0\t0\ttag\t10.0.0.1\thttp", - wantOK: false, - }, - { - name: "no tab at all rejected", - line: "v1", - wantOK: false, - }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - got, ok := ParseUDPLine(tc.line, 24, 48) + got, ok := ParseLine(tc.line, 24, 48) if ok != tc.wantOK { - t.Fatalf("ParseUDPLine ok=%v, want %v; got=%+v", ok, tc.wantOK, got) + t.Fatalf("ParseLine ok=%v, want %v; got=%+v", ok, tc.wantOK, got) } if !tc.wantOK { return @@ -305,6 +82,148 @@ func TestParseUDPLine(t *testing.T) { } } +func TestParseLineV2(t *testing.T) { + // v2 \t host \t remote_addr \t method \t uri \t status \t bytes_sent \t request_length \t + // request_time \t upstream_response_time \t upstream_status \t is_tor \t asn \t + // source_tag \t server_addr \t scheme + full := "v2\twww.example.com\t1.2.3.4\tGET\t/api/v1/search?q=foo\t200\t1500\t421\t0.043\t0.012\t200\t0\t12345\tcdn\t10.0.0.1\thttps" + + tests := []struct { + name string + line string + wantOK bool + want LogRecord + }{ + { + name: "v2 with upstream", + line: full, + wantOK: true, + want: LogRecord{ + Website: "www.example.com", + ClientPrefix: "1.2.3.0/24", + URI: "/api/v1/search", + Status: "200", + ASN: 12345, + Method: "GET", + BytesSent: 1500, + RequestLength: 421, + RequestTime: 0.043, + UpstreamResponseTime: 0.012, + UpstreamStatus: "200", + HasUpstream: true, + SourceTag: "cdn", + }, + }, + { + name: "v2 no upstream (dash sentinels)", + line: "v2\twww.example.com\t1.2.3.4\tGET\t/static.html\t200\t900\t300\t0.001\t-\t-\t0\t0\tdirect\t10.0.0.1\thttps", + wantOK: true, + want: LogRecord{ + Website: "www.example.com", + ClientPrefix: "1.2.3.0/24", + URI: "/static.html", + Status: "200", + Method: "GET", + BytesSent: 900, + RequestLength: 300, + RequestTime: 0.001, + SourceTag: "direct", + }, + }, + { + name: "v2 no upstream (empty fields)", + line: "v2\thh\t1.2.3.4\tGET\t/\t301\t200\t100\t0\t\t\t0\t0\tdirect\t10.0.0.1\thttps", + wantOK: true, + want: LogRecord{ + Website: "hh", + ClientPrefix: "1.2.3.0/24", + URI: "/", + Status: "301", + Method: "GET", + BytesSent: 200, + RequestLength: 100, + SourceTag: "direct", + }, + }, + { + name: "v2 retried upstreams (comma-separated, last wins)", + line: "v2\twww.example.com\t1.2.3.4\tGET\t/api\t502\t900\t300\t1.500\t0.500, 1.000\t504, 502\t0\t0\tcdn\t10.0.0.1\thttps", + wantOK: true, + want: LogRecord{ + Website: "www.example.com", + ClientPrefix: "1.2.3.0/24", + URI: "/api", + Status: "502", + Method: "GET", + BytesSent: 900, + RequestLength: 300, + RequestTime: 1.500, + UpstreamResponseTime: 1.000, + UpstreamStatus: "502", + HasUpstream: true, + SourceTag: "cdn", + }, + }, + { + name: "v2 wrong field count (14) rejected", + line: "v2\twww.example.com\t1.2.3.4\tGET\t/\t200\t0\t0\t0\t-\t-\t0\t0\tcdn\t10.0.0.1", + wantOK: false, + }, + { + name: "v2 bad IP rejected", + line: "v2\thh\tnope\tGET\t/\t200\t0\t0\t0\t-\t-\t0\t0\tcdn\t10.0.0.1\thttps", + wantOK: false, + }, + { + name: "v2 bad upstream time leaves HasUpstream=false", + line: "v2\thh\t1.2.3.4\tGET\t/\t200\t0\t0\t0\tnotanumber\t200\t0\t0\tcdn\t10.0.0.1\thttps", + wantOK: true, + want: LogRecord{ + Website: "hh", + ClientPrefix: "1.2.3.0/24", + URI: "/", + Status: "200", + Method: "GET", + SourceTag: "cdn", + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got, ok := ParseLine(tc.line, 24, 48) + if ok != tc.wantOK { + t.Fatalf("ParseLine ok=%v, want %v; got=%+v", ok, tc.wantOK, got) + } + if !tc.wantOK { + return + } + if got != tc.want { + t.Errorf("got %+v, want %+v", got, tc.want) + } + }) + } +} + +func TestParseLineRejections(t *testing.T) { + tests := []struct { + name string + line string + }{ + {"empty line", ""}, + {"no tab at all", "v1"}, + {"unknown version v3", "v3\twww.example.com\t1.2.3.4\tGET\t/\t200\t0\t0\t0\t0\ttag\t10.0.0.1\thttp"}, + {"missing version prefix (legacy file format)", "www.example.com\t1.2.3.4\t1741954800.123\tGET\t/api\t200\t1452\t0.043"}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if _, ok := ParseLine(tc.line, 24, 48); ok { + t.Errorf("expected rejection for %q", tc.line) + } + }) + } +} + func TestTruncateIP(t *testing.T) { tests := []struct { addr string diff --git a/cmd/collector/prom.go b/cmd/collector/prom.go index 916100c..69e99b8 100644 --- a/cmd/collector/prom.go +++ b/cmd/collector/prom.go @@ -14,27 +14,46 @@ const promNumBodyBounds = 7 var promBodyBounds = [promNumBodyBounds]int64{256, 1024, 4096, 16384, 65536, 262144, 1048576} -// Request-time histogram bucket upper bounds in seconds (standard Prometheus defaults). +// Duration histogram bucket upper bounds in seconds (Prometheus defaults). const promNumTimeBounds = 11 var promTimeBounds = [promNumTimeBounds]float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10} const promCounterCap = 250_000 // safety cap on {host,method,status} counter entries -// promCounterKey is the label set for per-request counters. +// promCounterKey is the label set for the per-request counter. type promCounterKey struct { Host string Method string Status string } -// promBodyEntry holds the body_bytes_sent histogram for one host. +// hostSourceKey labels histograms by {host, source_tag}. +type hostSourceKey struct { + Host string + SourceTag string +} + +// sourceClassKey labels the source-tag rollup counter. +type sourceClassKey struct { + SourceTag string + StatusClass string +} + +// upstreamKey labels the upstream-only request counter. +type upstreamKey struct { + Host string + SourceTag string + StatusClass string // class of $upstream_status +} + +// promBodyEntry holds one body-size histogram (one label-set worth). type promBodyEntry struct { buckets [promNumBodyBounds + 1]int64 // indices 0..N-1: le=bound[i]; index N: le=+Inf sum int64 } -// promTimeEntry holds the request_time histogram for one host. +// promTimeEntry holds one duration histogram (one label-set worth). type promTimeEntry struct { buckets [promNumTimeBounds + 1]int64 sum float64 @@ -45,30 +64,31 @@ type promTimeEntry struct { // Ingest must be called from exactly one goroutine (the store's Run goroutine). // ServeHTTP may be called from any number of goroutines concurrently. type PromStore struct { - mu sync.Mutex - counters map[promCounterKey]int64 - body map[string]*promBodyEntry // keyed by host - reqTime map[string]*promTimeEntry // keyed by host + mu sync.Mutex + counters map[promCounterKey]int64 + bytesSent map[hostSourceKey]*promBodyEntry + requestDuration map[hostSourceKey]*promTimeEntry + requestBytes map[hostSourceKey]*promBodyEntry // v2 only + upstreamDuration map[hostSourceKey]*promTimeEntry // v2 only + upstreamCounters map[upstreamKey]int64 // v2 only + sourceCounters map[sourceClassKey]int64 - // per-source_tag rollups (parallel series, not a cross-product with host) - sourceCounters map[string]int64 // keyed by source_tag - sourceBody map[string]*promBodyEntry // keyed by source_tag - - // UDP ingest counters — protected by their own atomic-friendly lock. - udpMu sync.Mutex - udpPacketsReceived int64 // datagrams read off the socket - udpLoglinesSuccess int64 // successfully parsed - udpLoglinesConsumed int64 // successfully forwarded to the store channel + udpMu sync.Mutex + udpPacketsReceived int64 + udpLoglinesSuccess int64 + udpLoglinesConsumed int64 } // NewPromStore returns an empty PromStore ready for use. func NewPromStore() *PromStore { return &PromStore{ - counters: make(map[promCounterKey]int64, 1024), - body: make(map[string]*promBodyEntry, 64), - reqTime: make(map[string]*promTimeEntry, 64), - sourceCounters: make(map[string]int64, 32), - sourceBody: make(map[string]*promBodyEntry, 32), + counters: make(map[promCounterKey]int64, 1024), + bytesSent: make(map[hostSourceKey]*promBodyEntry, 64), + requestDuration: make(map[hostSourceKey]*promTimeEntry, 64), + requestBytes: make(map[hostSourceKey]*promBodyEntry, 64), + upstreamDuration: make(map[hostSourceKey]*promTimeEntry, 64), + upstreamCounters: make(map[upstreamKey]int64, 64), + sourceCounters: make(map[sourceClassKey]int64, 32), } } @@ -76,8 +96,11 @@ func NewPromStore() *PromStore { // Must be called from a single goroutine. func (p *PromStore) Ingest(r LogRecord) { p.mu.Lock() + defer p.mu.Unlock() - // --- per-{host,method,status} request counter --- + hsk := hostSourceKey{Host: r.Website, SourceTag: r.SourceTag} + + // nginx_http_requests_total{host,method,status} — capped. ck := promCounterKey{Host: r.Website, Method: r.Method, Status: r.Status} if _, ok := p.counters[ck]; ok { p.counters[ck]++ @@ -85,37 +108,54 @@ func (p *PromStore) Ingest(r LogRecord) { p.counters[ck] = 1 } - // --- body_bytes_sent histogram (keyed by host only) --- - observeBody(p.body, r.Website, r.BodyBytesSent) - - // --- request_time histogram (keyed by host only) --- - te, ok := p.reqTime[r.Website] - if !ok { - te = &promTimeEntry{} - p.reqTime[r.Website] = te + observeBody(p.bytesSent, hsk, r.BytesSent) + observeTime(p.requestDuration, hsk, r.RequestTime) + if r.RequestLength > 0 { + observeBody(p.requestBytes, hsk, r.RequestLength) } - for i, bound := range promTimeBounds { - if r.RequestTime <= bound { - te.buckets[i]++ - } + + p.sourceCounters[sourceClassKey{ + SourceTag: r.SourceTag, + StatusClass: statusClass(r.Status), + }]++ + + if r.HasUpstream { + observeTime(p.upstreamDuration, hsk, r.UpstreamResponseTime) + p.upstreamCounters[upstreamKey{ + Host: r.Website, + SourceTag: r.SourceTag, + StatusClass: statusClass(r.UpstreamStatus), + }]++ } - te.buckets[promNumTimeBounds]++ // +Inf - te.sum += r.RequestTime - - // --- per-source_tag rollups --- - p.sourceCounters[r.SourceTag]++ - observeBody(p.sourceBody, r.SourceTag, r.BodyBytesSent) - - p.mu.Unlock() } -// IncUDPPacket, IncUDPSuccess, and IncUDPConsumed bump their respective -// UDP ingest counters. They are called from the UDP listener goroutine. +// IncUDPPacket, IncUDPSuccess, IncUDPConsumed bump UDP-ingest counters from +// the listener goroutine. func (p *PromStore) IncUDPPacket() { p.udpMu.Lock(); p.udpPacketsReceived++; p.udpMu.Unlock() } func (p *PromStore) IncUDPSuccess() { p.udpMu.Lock(); p.udpLoglinesSuccess++; p.udpMu.Unlock() } func (p *PromStore) IncUDPConsumed() { p.udpMu.Lock(); p.udpLoglinesConsumed++; p.udpMu.Unlock() } -func observeBody(m map[string]*promBodyEntry, key string, bytes int64) { +// statusClass folds an HTTP status code into 2xx/3xx/4xx/5xx, with anything +// else falling to "other" (including empty input). +func statusClass(status string) string { + if status == "" { + return "other" + } + switch status[0] { + case '2': + return "2xx" + case '3': + return "3xx" + case '4': + return "4xx" + case '5': + return "5xx" + default: + return "other" + } +} + +func observeBody(m map[hostSourceKey]*promBodyEntry, key hostSourceKey, bytes int64) { e, ok := m[key] if !ok { e = &promBodyEntry{} @@ -126,53 +166,77 @@ func observeBody(m map[string]*promBodyEntry, key string, bytes int64) { e.buckets[i]++ } } - e.buckets[promNumBodyBounds]++ // +Inf + e.buckets[promNumBodyBounds]++ e.sum += bytes } +func observeTime(m map[hostSourceKey]*promTimeEntry, key hostSourceKey, seconds float64) { + e, ok := m[key] + if !ok { + e = &promTimeEntry{} + m[key] = e + } + for i, bound := range promTimeBounds { + if seconds <= bound { + e.buckets[i]++ + } + } + e.buckets[promNumTimeBounds]++ + e.sum += seconds +} + // ServeHTTP renders all metrics in the Prometheus text exposition format (0.0.4). func (p *PromStore) ServeHTTP(w http.ResponseWriter, _ *http.Request) { - // Snapshot everything under the lock, then render without holding it. - p.mu.Lock() - type counterSnap struct { k promCounterKey v int64 } + type bodySnap struct { + k hostSourceKey + e promBodyEntry + } + type timeSnap struct { + k hostSourceKey + e promTimeEntry + } + type upstreamCounterSnap struct { + k upstreamKey + v int64 + } + type sourceCounterSnap struct { + k sourceClassKey + v int64 + } + + p.mu.Lock() + counters := make([]counterSnap, 0, len(p.counters)) for k, v := range p.counters { counters = append(counters, counterSnap{k, v}) } - - type bodySnap struct { - label string - e promBodyEntry + bytesSnaps := make([]bodySnap, 0, len(p.bytesSent)) + for k, e := range p.bytesSent { + bytesSnaps = append(bytesSnaps, bodySnap{k, *e}) } - bodySnaps := make([]bodySnap, 0, len(p.body)) - for h, e := range p.body { - bodySnaps = append(bodySnaps, bodySnap{h, *e}) + requestBytesSnaps := make([]bodySnap, 0, len(p.requestBytes)) + for k, e := range p.requestBytes { + requestBytesSnaps = append(requestBytesSnaps, bodySnap{k, *e}) } - - type timeSnap struct { - host string - e promTimeEntry + requestDurationSnaps := make([]timeSnap, 0, len(p.requestDuration)) + for k, e := range p.requestDuration { + requestDurationSnaps = append(requestDurationSnaps, timeSnap{k, *e}) } - timeSnaps := make([]timeSnap, 0, len(p.reqTime)) - for h, e := range p.reqTime { - timeSnaps = append(timeSnaps, timeSnap{h, *e}) + upstreamDurationSnaps := make([]timeSnap, 0, len(p.upstreamDuration)) + for k, e := range p.upstreamDuration { + upstreamDurationSnaps = append(upstreamDurationSnaps, timeSnap{k, *e}) } - - type sourceCounterSnap struct { - tag string - v int64 + upstreamCounters := make([]upstreamCounterSnap, 0, len(p.upstreamCounters)) + for k, v := range p.upstreamCounters { + upstreamCounters = append(upstreamCounters, upstreamCounterSnap{k, v}) } sourceCounters := make([]sourceCounterSnap, 0, len(p.sourceCounters)) - for t, v := range p.sourceCounters { - sourceCounters = append(sourceCounters, sourceCounterSnap{t, v}) - } - sourceBodySnaps := make([]bodySnap, 0, len(p.sourceBody)) - for t, e := range p.sourceBody { - sourceBodySnaps = append(sourceBodySnaps, bodySnap{t, *e}) + for k, v := range p.sourceCounters { + sourceCounters = append(sourceCounters, sourceCounterSnap{k, v}) } p.mu.Unlock() @@ -183,7 +247,6 @@ func (p *PromStore) ServeHTTP(w http.ResponseWriter, _ *http.Request) { udpConsumed := p.udpLoglinesConsumed p.udpMu.Unlock() - // Sort for stable, human-readable output. sort.Slice(counters, func(i, j int) bool { a, b := counters[i].k, counters[j].k if a.Host != b.Host { @@ -194,85 +257,139 @@ func (p *PromStore) ServeHTTP(w http.ResponseWriter, _ *http.Request) { } return a.Status < b.Status }) - sort.Slice(bodySnaps, func(i, j int) bool { return bodySnaps[i].label < bodySnaps[j].label }) - sort.Slice(timeSnaps, func(i, j int) bool { return timeSnaps[i].host < timeSnaps[j].host }) - sort.Slice(sourceCounters, func(i, j int) bool { return sourceCounters[i].tag < sourceCounters[j].tag }) - sort.Slice(sourceBodySnaps, func(i, j int) bool { return sourceBodySnaps[i].label < sourceBodySnaps[j].label }) + sortBody := func(s []bodySnap) { + sort.Slice(s, func(i, j int) bool { + a, b := s[i].k, s[j].k + if a.Host != b.Host { + return a.Host < b.Host + } + return a.SourceTag < b.SourceTag + }) + } + sortTime := func(s []timeSnap) { + sort.Slice(s, func(i, j int) bool { + a, b := s[i].k, s[j].k + if a.Host != b.Host { + return a.Host < b.Host + } + return a.SourceTag < b.SourceTag + }) + } + sortBody(bytesSnaps) + sortBody(requestBytesSnaps) + sortTime(requestDurationSnaps) + sortTime(upstreamDurationSnaps) + sort.Slice(upstreamCounters, func(i, j int) bool { + a, b := upstreamCounters[i].k, upstreamCounters[j].k + if a.Host != b.Host { + return a.Host < b.Host + } + if a.SourceTag != b.SourceTag { + return a.SourceTag < b.SourceTag + } + return a.StatusClass < b.StatusClass + }) + sort.Slice(sourceCounters, func(i, j int) bool { + a, b := sourceCounters[i].k, sourceCounters[j].k + if a.SourceTag != b.SourceTag { + return a.SourceTag < b.SourceTag + } + return a.StatusClass < b.StatusClass + }) w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") bw := bufio.NewWriterSize(w, 256*1024) - // nginx_http_requests_total - fmt.Fprintln(bw, "# HELP nginx_http_requests_total Total number of HTTP requests processed.") - fmt.Fprintln(bw, "# TYPE nginx_http_requests_total counter") + // pf, pln are short helpers so the metric block reads cleanly. Errors on a + // bufio writer wrapping http.ResponseWriter mean the client disconnected; + // there's nothing useful to do mid-write — the next call will simply no-op. + pf := func(format string, a ...any) { _, _ = fmt.Fprintf(bw, format, a...) } + pln := func(s string) { _, _ = fmt.Fprintln(bw, s) } + + pln("# HELP nginx_http_requests_total Total number of HTTP requests processed.") + pln("# TYPE nginx_http_requests_total counter") for _, c := range counters { - fmt.Fprintf(bw, "nginx_http_requests_total{host=%q,method=%q,status=%q} %d\n", + pf("nginx_http_requests_total{host=%q,method=%q,status=%q} %d\n", c.k.Host, c.k.Method, c.k.Status, c.v) } - // nginx_http_response_body_bytes (histogram, labeled by host) - fmt.Fprintln(bw, "# HELP nginx_http_response_body_bytes HTTP response body size distribution in bytes.") - fmt.Fprintln(bw, "# TYPE nginx_http_response_body_bytes histogram") - for _, s := range bodySnaps { - writeBodyHistogram(bw, "nginx_http_response_body_bytes", "host", s.label, s.e) + pln("# HELP nginx_http_bytes_sent HTTP response size distribution in bytes (body for v1 records, full wire bytes for v2).") + pln("# TYPE nginx_http_bytes_sent histogram") + for _, s := range bytesSnaps { + writeBodyHistogramHS(bw, "nginx_http_bytes_sent", s.k, s.e) } - // nginx_http_request_duration_seconds (histogram, labeled by host) - fmt.Fprintln(bw, "# HELP nginx_http_request_duration_seconds HTTP request processing time in seconds.") - fmt.Fprintln(bw, "# TYPE nginx_http_request_duration_seconds histogram") - for _, s := range timeSnaps { - for i, bound := range promTimeBounds { - fmt.Fprintf(bw, "nginx_http_request_duration_seconds_bucket{host=%q,le=%q} %d\n", - s.host, formatFloat(bound), s.e.buckets[i]) - } - fmt.Fprintf(bw, "nginx_http_request_duration_seconds_bucket{host=%q,le=\"+Inf\"} %d\n", - s.host, s.e.buckets[promNumTimeBounds]) - fmt.Fprintf(bw, "nginx_http_request_duration_seconds_count{host=%q} %d\n", - s.host, s.e.buckets[promNumTimeBounds]) - fmt.Fprintf(bw, "nginx_http_request_duration_seconds_sum{host=%q} %g\n", - s.host, s.e.sum) + pln("# HELP nginx_http_request_bytes HTTP request size distribution in bytes (v2 emitters only).") + pln("# TYPE nginx_http_request_bytes histogram") + for _, s := range requestBytesSnaps { + writeBodyHistogramHS(bw, "nginx_http_request_bytes", s.k, s.e) } - // nginx_http_requests_by_source_total (counter, labeled by source_tag) - fmt.Fprintln(bw, "# HELP nginx_http_requests_by_source_total HTTP requests rolled up by nginx source tag.") - fmt.Fprintln(bw, "# TYPE nginx_http_requests_by_source_total counter") + pln("# HELP nginx_http_request_duration_seconds HTTP request processing time in seconds.") + pln("# TYPE nginx_http_request_duration_seconds histogram") + for _, s := range requestDurationSnaps { + writeTimeHistogramHS(bw, "nginx_http_request_duration_seconds", s.k, s.e) + } + + pln("# HELP nginx_http_upstream_duration_seconds Upstream response time in seconds (v2 emitters only).") + pln("# TYPE nginx_http_upstream_duration_seconds histogram") + for _, s := range upstreamDurationSnaps { + writeTimeHistogramHS(bw, "nginx_http_upstream_duration_seconds", s.k, s.e) + } + + pln("# HELP nginx_http_upstream_requests_total Requests served via an upstream, by upstream-status class (v2 emitters only).") + pln("# TYPE nginx_http_upstream_requests_total counter") + for _, c := range upstreamCounters { + pf("nginx_http_upstream_requests_total{host=%q,source_tag=%q,status_class=%q} %d\n", + c.k.Host, c.k.SourceTag, c.k.StatusClass, c.v) + } + + pln("# HELP nginx_http_requests_by_source_total HTTP requests rolled up by source_tag and status class.") + pln("# TYPE nginx_http_requests_by_source_total counter") for _, c := range sourceCounters { - fmt.Fprintf(bw, "nginx_http_requests_by_source_total{source_tag=%q} %d\n", c.tag, c.v) + pf("nginx_http_requests_by_source_total{source_tag=%q,status_class=%q} %d\n", + c.k.SourceTag, c.k.StatusClass, c.v) } - // nginx_http_response_body_bytes_by_source (histogram, labeled by source_tag) - fmt.Fprintln(bw, "# HELP nginx_http_response_body_bytes_by_source HTTP response body size distribution by nginx source tag.") - fmt.Fprintln(bw, "# TYPE nginx_http_response_body_bytes_by_source histogram") - for _, s := range sourceBodySnaps { - writeBodyHistogram(bw, "nginx_http_response_body_bytes_by_source", "source_tag", s.label, s.e) - } + pln("# HELP logtail_udp_packets_received_total Datagrams read from the UDP socket.") + pln("# TYPE logtail_udp_packets_received_total counter") + pf("logtail_udp_packets_received_total %d\n", udpPackets) + pln("# HELP logtail_udp_loglines_success_total UDP loglines that parsed successfully.") + pln("# TYPE logtail_udp_loglines_success_total counter") + pf("logtail_udp_loglines_success_total %d\n", udpSuccess) + pln("# HELP logtail_udp_loglines_consumed_total UDP loglines forwarded to the store (not dropped).") + pln("# TYPE logtail_udp_loglines_consumed_total counter") + pf("logtail_udp_loglines_consumed_total %d\n", udpConsumed) - // UDP ingest counters — lets operators distinguish parse failures - // (received - success) from channel-full drops (success - consumed). - fmt.Fprintln(bw, "# HELP logtail_udp_packets_received_total Datagrams read from the UDP socket.") - fmt.Fprintln(bw, "# TYPE logtail_udp_packets_received_total counter") - fmt.Fprintf(bw, "logtail_udp_packets_received_total %d\n", udpPackets) - fmt.Fprintln(bw, "# HELP logtail_udp_loglines_success_total UDP loglines that parsed successfully.") - fmt.Fprintln(bw, "# TYPE logtail_udp_loglines_success_total counter") - fmt.Fprintf(bw, "logtail_udp_loglines_success_total %d\n", udpSuccess) - fmt.Fprintln(bw, "# HELP logtail_udp_loglines_consumed_total UDP loglines forwarded to the store (not dropped).") - fmt.Fprintln(bw, "# TYPE logtail_udp_loglines_consumed_total counter") - fmt.Fprintf(bw, "logtail_udp_loglines_consumed_total %d\n", udpConsumed) - - bw.Flush() + _ = bw.Flush() } -func writeBodyHistogram(bw *bufio.Writer, metric, labelName, labelValue string, e promBodyEntry) { +func writeBodyHistogramHS(bw *bufio.Writer, metric string, k hostSourceKey, e promBodyEntry) { + pf := func(format string, a ...any) { _, _ = fmt.Fprintf(bw, format, a...) } for i, bound := range promBodyBounds { - fmt.Fprintf(bw, "%s_bucket{%s=%q,le=%q} %d\n", - metric, labelName, labelValue, fmt.Sprintf("%d", bound), e.buckets[i]) + pf("%s_bucket{host=%q,source_tag=%q,le=\"%d\"} %d\n", + metric, k.Host, k.SourceTag, bound, e.buckets[i]) } - fmt.Fprintf(bw, "%s_bucket{%s=%q,le=\"+Inf\"} %d\n", - metric, labelName, labelValue, e.buckets[promNumBodyBounds]) - fmt.Fprintf(bw, "%s_count{%s=%q} %d\n", - metric, labelName, labelValue, e.buckets[promNumBodyBounds]) - fmt.Fprintf(bw, "%s_sum{%s=%q} %d\n", - metric, labelName, labelValue, e.sum) + pf("%s_bucket{host=%q,source_tag=%q,le=\"+Inf\"} %d\n", + metric, k.Host, k.SourceTag, e.buckets[promNumBodyBounds]) + pf("%s_count{host=%q,source_tag=%q} %d\n", + metric, k.Host, k.SourceTag, e.buckets[promNumBodyBounds]) + pf("%s_sum{host=%q,source_tag=%q} %d\n", + metric, k.Host, k.SourceTag, e.sum) +} + +func writeTimeHistogramHS(bw *bufio.Writer, metric string, k hostSourceKey, e promTimeEntry) { + pf := func(format string, a ...any) { _, _ = fmt.Fprintf(bw, format, a...) } + for i, bound := range promTimeBounds { + pf("%s_bucket{host=%q,source_tag=%q,le=%q} %d\n", + metric, k.Host, k.SourceTag, formatFloat(bound), e.buckets[i]) + } + pf("%s_bucket{host=%q,source_tag=%q,le=\"+Inf\"} %d\n", + metric, k.Host, k.SourceTag, e.buckets[promNumTimeBounds]) + pf("%s_count{host=%q,source_tag=%q} %d\n", + metric, k.Host, k.SourceTag, e.buckets[promNumTimeBounds]) + pf("%s_sum{host=%q,source_tag=%q} %g\n", + metric, k.Host, k.SourceTag, e.sum) } // formatFloat renders a float64 bucket bound without trailing zeros but always @@ -280,7 +397,7 @@ func writeBodyHistogram(bw *bufio.Writer, metric, labelName, labelValue string, func formatFloat(f float64) string { s := fmt.Sprintf("%g", f) if !strings.Contains(s, ".") && !strings.Contains(s, "e") { - s += ".0" // ensure it looks like a float, not an integer + s += ".0" } return s } diff --git a/cmd/collector/prom_test.go b/cmd/collector/prom_test.go index d005544..12690d1 100644 --- a/cmd/collector/prom_test.go +++ b/cmd/collector/prom_test.go @@ -6,22 +6,22 @@ import ( "testing" ) -func TestPromStoreIngestBodyBuckets(t *testing.T) { +func TestPromStoreIngestBytesBuckets(t *testing.T) { ps := NewPromStore() // 512 bytes: > 256, ≤ 1024 → bucket[0] stays 0, buckets[1..N] get 1 - ps.Ingest(LogRecord{Website: "example.com", Method: "GET", Status: "200", BodyBytesSent: 512}) + ps.Ingest(LogRecord{Website: "example.com", Method: "GET", Status: "200", BytesSent: 512, SourceTag: "direct"}) ps.mu.Lock() - be := ps.body["example.com"] + be := ps.bytesSent[hostSourceKey{"example.com", "direct"}] ps.mu.Unlock() if be == nil { - t.Fatal("expected body entry, got nil") + t.Fatal("expected bytes entry, got nil") } - if be.buckets[0] != 0 { // le=256: 512 > 256 + if be.buckets[0] != 0 { t.Errorf("le=256 bucket = %d, want 0", be.buckets[0]) } - if be.buckets[1] != 1 { // le=1024: 512 ≤ 1024 + if be.buckets[1] != 1 { t.Errorf("le=1024 bucket = %d, want 1", be.buckets[1]) } for i := 2; i <= promNumBodyBounds; i++ { @@ -37,24 +37,21 @@ func TestPromStoreIngestBodyBuckets(t *testing.T) { func TestPromStoreIngestTimeBuckets(t *testing.T) { ps := NewPromStore() // 0.075s: > 0.05, ≤ 0.1 - ps.Ingest(LogRecord{Website: "example.com", Method: "GET", Status: "200", RequestTime: 0.075}) + ps.Ingest(LogRecord{Website: "example.com", Method: "GET", Status: "200", RequestTime: 0.075, SourceTag: "direct"}) ps.mu.Lock() - te := ps.reqTime["example.com"] + te := ps.requestDuration[hostSourceKey{"example.com", "direct"}] ps.mu.Unlock() if te == nil { - t.Fatal("expected time entry, got nil") + t.Fatal("expected request_duration entry, got nil") } - // le=0.05 (index 3): 0.075 > 0.05 → 0 if te.buckets[3] != 0 { t.Errorf("le=0.05 bucket = %d, want 0", te.buckets[3]) } - // le=0.1 (index 4): 0.075 ≤ 0.1 → 1 if te.buckets[4] != 1 { t.Errorf("le=0.1 bucket = %d, want 1", te.buckets[4]) } - // +Inf (last): always 1 if te.buckets[promNumTimeBounds] != 1 { t.Errorf("+Inf bucket = %d, want 1", te.buckets[promNumTimeBounds]) } @@ -83,7 +80,7 @@ func TestPromStoreServeHTTP(t *testing.T) { ps := NewPromStore() ps.Ingest(LogRecord{ Website: "example.com", Method: "GET", Status: "200", - BodyBytesSent: 100, RequestTime: 0.042, + BytesSent: 100, RequestTime: 0.042, SourceTag: "direct", }) req := httptest.NewRequest("GET", "/metrics", nil) @@ -95,13 +92,15 @@ func TestPromStoreServeHTTP(t *testing.T) { checks := []string{ "# TYPE nginx_http_requests_total counter", `nginx_http_requests_total{host="example.com",method="GET",status="200"} 1`, - "# TYPE nginx_http_response_body_bytes histogram", - `nginx_http_response_body_bytes_bucket{host="example.com",le="256"} 1`, // 100 ≤ 256 - `nginx_http_response_body_bytes_count{host="example.com"} 1`, - `nginx_http_response_body_bytes_sum{host="example.com"} 100`, + "# TYPE nginx_http_bytes_sent histogram", + `nginx_http_bytes_sent_bucket{host="example.com",source_tag="direct",le="256"} 1`, + `nginx_http_bytes_sent_count{host="example.com",source_tag="direct"} 1`, + `nginx_http_bytes_sent_sum{host="example.com",source_tag="direct"} 100`, "# TYPE nginx_http_request_duration_seconds histogram", - `nginx_http_request_duration_seconds_bucket{host="example.com",le="0.05"} 1`, // 0.042 ≤ 0.05 - `nginx_http_request_duration_seconds_count{host="example.com"} 1`, + `nginx_http_request_duration_seconds_bucket{host="example.com",source_tag="direct",le="0.05"} 1`, + `nginx_http_request_duration_seconds_count{host="example.com",source_tag="direct"} 1`, + "# TYPE nginx_http_requests_by_source_total counter", + `nginx_http_requests_by_source_total{source_tag="direct",status_class="2xx"} 1`, } for _, want := range checks { if !strings.Contains(body, want) { @@ -110,12 +109,12 @@ func TestPromStoreServeHTTP(t *testing.T) { } } -func TestPromStoreSourceTagRollup(t *testing.T) { +func TestPromStoreSourceTagAndStatusClass(t *testing.T) { ps := NewPromStore() - // same host, two tags; each tag should appear with its own series. - ps.Ingest(LogRecord{Website: "h", Method: "GET", Status: "200", BodyBytesSent: 100, SourceTag: "direct"}) - ps.Ingest(LogRecord{Website: "h", Method: "GET", Status: "200", BodyBytesSent: 300, SourceTag: "cdn"}) - ps.Ingest(LogRecord{Website: "h", Method: "GET", Status: "200", BodyBytesSent: 100, SourceTag: "cdn"}) + ps.Ingest(LogRecord{Website: "h", Method: "GET", Status: "200", BytesSent: 100, SourceTag: "direct"}) + ps.Ingest(LogRecord{Website: "h", Method: "GET", Status: "200", BytesSent: 300, SourceTag: "cdn"}) + ps.Ingest(LogRecord{Website: "h", Method: "GET", Status: "404", BytesSent: 100, SourceTag: "cdn"}) + ps.Ingest(LogRecord{Website: "h", Method: "GET", Status: "500", BytesSent: 50, SourceTag: "cdn"}) req := httptest.NewRequest("GET", "/metrics", nil) rec := httptest.NewRecorder() @@ -124,13 +123,17 @@ func TestPromStoreSourceTagRollup(t *testing.T) { checks := []string{ "# TYPE nginx_http_requests_by_source_total counter", - `nginx_http_requests_by_source_total{source_tag="direct"} 1`, - `nginx_http_requests_by_source_total{source_tag="cdn"} 2`, - "# TYPE nginx_http_response_body_bytes_by_source histogram", - `nginx_http_response_body_bytes_by_source_sum{source_tag="direct"} 100`, - `nginx_http_response_body_bytes_by_source_sum{source_tag="cdn"} 400`, - // host-series totals are unchanged (one row, counting 3 requests). - `nginx_http_requests_total{host="h",method="GET",status="200"} 3`, + `nginx_http_requests_by_source_total{source_tag="direct",status_class="2xx"} 1`, + `nginx_http_requests_by_source_total{source_tag="cdn",status_class="2xx"} 1`, + `nginx_http_requests_by_source_total{source_tag="cdn",status_class="4xx"} 1`, + `nginx_http_requests_by_source_total{source_tag="cdn",status_class="5xx"} 1`, + // dual-labeled histogram subsumes the old by-source bytes metric + `nginx_http_bytes_sent_sum{host="h",source_tag="direct"} 100`, + `nginx_http_bytes_sent_sum{host="h",source_tag="cdn"} 450`, + // host counter unchanged + `nginx_http_requests_total{host="h",method="GET",status="200"} 2`, + `nginx_http_requests_total{host="h",method="GET",status="404"} 1`, + `nginx_http_requests_total{host="h",method="GET",status="500"} 1`, } for _, want := range checks { if !strings.Contains(body, want) { @@ -139,6 +142,79 @@ func TestPromStoreSourceTagRollup(t *testing.T) { } } +func TestPromStoreUpstreamMetrics(t *testing.T) { + ps := NewPromStore() + // One upstream-served 200, one upstream-served 502, one no-upstream 200. + ps.Ingest(LogRecord{ + Website: "h", Method: "GET", Status: "200", BytesSent: 100, SourceTag: "cdn", + RequestLength: 500, + HasUpstream: true, + UpstreamResponseTime: 0.020, + UpstreamStatus: "200", + }) + ps.Ingest(LogRecord{ + Website: "h", Method: "GET", Status: "502", BytesSent: 100, SourceTag: "cdn", + RequestLength: 500, + HasUpstream: true, + UpstreamResponseTime: 0.150, + UpstreamStatus: "502", + }) + ps.Ingest(LogRecord{ + Website: "h", Method: "GET", Status: "200", BytesSent: 100, SourceTag: "direct", + RequestLength: 200, + }) + + req := httptest.NewRequest("GET", "/metrics", nil) + rec := httptest.NewRecorder() + ps.ServeHTTP(rec, req) + body := rec.Body.String() + + checks := []string{ + // upstream counter: only the two upstream-served requests + `nginx_http_upstream_requests_total{host="h",source_tag="cdn",status_class="2xx"} 1`, + `nginx_http_upstream_requests_total{host="h",source_tag="cdn",status_class="5xx"} 1`, + // upstream duration histogram only for upstream-served requests + `nginx_http_upstream_duration_seconds_count{host="h",source_tag="cdn"} 2`, + // request_bytes only observed when RequestLength > 0 — all three here + `nginx_http_request_bytes_count{host="h",source_tag="cdn"} 2`, + `nginx_http_request_bytes_count{host="h",source_tag="direct"} 1`, + } + for _, want := range checks { + if !strings.Contains(body, want) { + t.Errorf("missing %q in output:\n%s", want, body) + } + } + + // no-upstream request must not bump upstream counters + if strings.Contains(body, `source_tag="direct",status_class=`) && + strings.Contains(body, "nginx_http_upstream_requests_total") { + // Better check: ensure no direct-tagged upstream entry + for _, bad := range []string{ + `nginx_http_upstream_requests_total{host="h",source_tag="direct"`, + `nginx_http_upstream_duration_seconds_bucket{host="h",source_tag="direct"`, + } { + if strings.Contains(body, bad) { + t.Errorf("unexpected %q in output:\n%s", bad, body) + } + } + } +} + +func TestPromStoreRequestBytesSkippedWhenZero(t *testing.T) { + ps := NewPromStore() + // v1 record — RequestLength=0, so request_bytes histogram should be empty. + ps.Ingest(LogRecord{Website: "h", Method: "GET", Status: "200", BytesSent: 100, SourceTag: "cdn"}) + + req := httptest.NewRequest("GET", "/metrics", nil) + rec := httptest.NewRecorder() + ps.ServeHTTP(rec, req) + body := rec.Body.String() + + if strings.Contains(body, "nginx_http_request_bytes_bucket{") { + t.Errorf("expected no request_bytes series for v1 record:\n%s", body) + } +} + func TestPromStoreUDPCounters(t *testing.T) { ps := NewPromStore() ps.IncUDPPacket() @@ -167,7 +243,6 @@ func TestPromStoreUDPCounters(t *testing.T) { func TestPromStoreCounterCap(t *testing.T) { ps := NewPromStore() - // Fill to cap with distinct {host,method,status} combos for i := 0; i < promCounterCap+10; i++ { host := strings.Repeat("x", i%10+1) + ".com" status := "200" @@ -183,3 +258,18 @@ func TestPromStoreCounterCap(t *testing.T) { t.Errorf("counter map size %d exceeds cap %d", n, promCounterCap) } } + +func TestStatusClass(t *testing.T) { + cases := map[string]string{ + "200": "2xx", "201": "2xx", "299": "2xx", + "301": "3xx", + "404": "4xx", "418": "4xx", + "500": "5xx", "504": "5xx", + "100": "other", "": "other", "abc": "other", + } + for in, want := range cases { + if got := statusClass(in); got != want { + t.Errorf("statusClass(%q) = %q, want %q", in, got, want) + } + } +} diff --git a/cmd/collector/smoke_test.go b/cmd/collector/smoke_test.go index 51fe604..bdb153e 100644 --- a/cmd/collector/smoke_test.go +++ b/cmd/collector/smoke_test.go @@ -32,9 +32,18 @@ func BenchmarkIngest(b *testing.B) { } } -// BenchmarkParseLine measures parser throughput. +// BenchmarkParseLine measures parser throughput on a v1 line. func BenchmarkParseLine(b *testing.B) { - line := "www.example.com\t1.2.3.4\t1741954800.123\tGET\t/api/v1/search?q=foo\t200\t1452\t0.043" + line := "v1\twww.example.com\t1.2.3.4\tGET\t/api/v1/search?q=foo\t200\t1452\t0.043\t0\t12345\tcdn\t10.0.0.1\thttps" + b.ResetTimer() + for i := 0; i < b.N; i++ { + ParseLine(line, 24, 48) + } +} + +// BenchmarkParseLineV2 measures parser throughput on a v2 line with upstream. +func BenchmarkParseLineV2(b *testing.B) { + line := "v2\twww.example.com\t1.2.3.4\tGET\t/api/v1/search?q=foo\t200\t1500\t421\t0.043\t0.012\t200\t0\t12345\tcdn\t10.0.0.1\thttps" b.ResetTimer() for i := 0; i < b.N; i++ { ParseLine(line, 24, 48) @@ -118,7 +127,7 @@ func TestGRPCEndToEnd(t *testing.T) { } grpcSrv := grpc.NewServer() pb.RegisterLogtailServiceServer(grpcSrv, NewServer(store, "e2e-test")) - go grpcSrv.Serve(lis) + go func() { _ = grpcSrv.Serve(lis) }() defer grpcSrv.Stop() // Dial it @@ -127,7 +136,7 @@ func TestGRPCEndToEnd(t *testing.T) { if err != nil { t.Fatal(err) } - defer conn.Close() + defer func() { _ = conn.Close() }() client := pb.NewLogtailServiceClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) diff --git a/cmd/collector/tailer.go b/cmd/collector/tailer.go index d35d428..4cb7ed2 100644 --- a/cmd/collector/tailer.go +++ b/cmd/collector/tailer.go @@ -52,7 +52,7 @@ func (mt *MultiTailer) Run(ctx context.Context) { if err != nil { log.Fatalf("tailer: failed to create watcher: %v", err) } - defer watcher.Close() + defer func() { _ = watcher.Close() }() files := make(map[string]*fileState) retrying := make(map[string]struct{}) // paths currently in a retryOpen goroutine @@ -85,7 +85,7 @@ func (mt *MultiTailer) Run(ctx context.Context) { select { case <-ctx.Done(): for _, fs := range files { - fs.f.Close() + _ = fs.f.Close() } return @@ -117,7 +117,7 @@ func (mt *MultiTailer) Run(ctx context.Context) { if event.Has(fsnotify.Rename) || event.Has(fsnotify.Remove) { // Drain remaining bytes in the old fd before it disappears. mt.readLines(fs.reader) - fs.f.Close() + _ = fs.f.Close() delete(files, event.Name) _ = watcher.Remove(event.Name) startRetry(event.Name) @@ -167,7 +167,7 @@ func (mt *MultiTailer) rescan( for path, fs := range files { if _, matched := current[path]; !matched { mt.readLines(fs.reader) - fs.f.Close() + _ = fs.f.Close() _ = watcher.Remove(path) delete(files, path) log.Printf("tailer: retired %s (no longer matched by any pattern)", path) @@ -182,11 +182,11 @@ func openAndSeekEOF(path string, watcher *fsnotify.Watcher) (*fileState, error) return nil, err } if _, err := f.Seek(0, io.SeekEnd); err != nil { - f.Close() + _ = f.Close() return nil, err } if err := watcher.Add(path); err != nil { - f.Close() + _ = f.Close() return nil, err } return &fileState{f: f, reader: bufio.NewReader(f)}, nil diff --git a/cmd/collector/tailer_test.go b/cmd/collector/tailer_test.go index e5a617c..584133a 100644 --- a/cmd/collector/tailer_test.go +++ b/cmd/collector/tailer_test.go @@ -11,7 +11,8 @@ import ( func writeLine(t *testing.T, f *os.File, website string) { t.Helper() - _, err := fmt.Fprintf(f, "%s\t1.2.3.4\t0\tGET\t/path\t200\t0\t0.001\n", website) + // v1 layout: 12 payload fields after the v1 prefix. + _, err := fmt.Fprintf(f, "v1\t%s\t1.2.3.4\tGET\t/path\t200\t0\t0.001\t0\t0\tdirect\t10.0.0.1\thttps\n", website) if err != nil { t.Fatalf("writeLine: %v", err) } @@ -25,7 +26,7 @@ func TestMultiTailerReadsLines(t *testing.T) { if err != nil { t.Fatal(err) } - defer f.Close() + defer func() { _ = f.Close() }() ch := make(chan LogRecord, 100) mt := NewMultiTailer([]string{path}, time.Hour, 24, 48, ch) @@ -62,7 +63,7 @@ func TestMultiTailerMultipleFiles(t *testing.T) { if err != nil { t.Fatal(err) } - defer f.Close() + defer func() { _ = f.Close() }() files[i] = f } @@ -108,7 +109,7 @@ func TestMultiTailerLogRotation(t *testing.T) { // Simulate logrotate: rename the old file, create a new one rotated := filepath.Join(dir, "access.log.1") - f.Close() + _ = f.Close() if err := os.Rename(path, rotated); err != nil { t.Fatal(err) } @@ -121,7 +122,7 @@ func TestMultiTailerLogRotation(t *testing.T) { if err != nil { t.Fatal(err) } - defer newF.Close() + defer func() { _ = newF.Close() }() // Allow retry goroutine to pick it up time.Sleep(300 * time.Millisecond) @@ -137,7 +138,7 @@ func TestExpandGlobs(t *testing.T) { dir := t.TempDir() for _, name := range []string{"a.log", "b.log", "other.txt"} { f, _ := os.Create(filepath.Join(dir, name)) - f.Close() + _ = f.Close() } pattern := filepath.Join(dir, "*.log") @@ -151,7 +152,7 @@ func TestExpandGlobsDeduplication(t *testing.T) { dir := t.TempDir() p := filepath.Join(dir, "access.log") f, _ := os.Create(p) - f.Close() + _ = f.Close() // Same file listed twice via explicit path and glob paths := expandGlobs([]string{p, filepath.Join(dir, "*.log")}) diff --git a/cmd/collector/udp.go b/cmd/collector/udp.go index c6c88bc..95b823c 100644 --- a/cmd/collector/udp.go +++ b/cmd/collector/udp.go @@ -16,7 +16,7 @@ const udpReadBufBytes = 4 << 20 const udpPacketBuf = 64 << 10 // UDPListener receives nginx_ipng_stats_logtail datagrams on a local socket, -// parses each packet as one log line, and forwards LogRecords to ch. +// parses each line through the versioned ParseLine, and forwards LogRecords to ch. type UDPListener struct { addr string v4bits int @@ -50,7 +50,7 @@ func (u *UDPListener) Run(ctx context.Context) { if err != nil { log.Fatalf("udp: listen %s: %v", u.addr, err) } - defer conn.Close() + defer func() { _ = conn.Close() }() if err := conn.SetReadBuffer(udpReadBufBytes); err != nil { log.Printf("udp: SetReadBuffer(%d): %v", udpReadBufBytes, err) } @@ -58,7 +58,7 @@ func (u *UDPListener) Run(ctx context.Context) { go func() { <-ctx.Done() - conn.Close() + _ = conn.Close() }() buf := make([]byte, udpPacketBuf) @@ -84,7 +84,7 @@ func (u *UDPListener) Run(ctx context.Context) { if line == "" { continue } - rec, ok := ParseUDPLine(line, u.v4bits, u.v6bits) + rec, ok := ParseLine(line, u.v4bits, u.v6bits) if !ok { continue } diff --git a/cmd/collector/udp_test.go b/cmd/collector/udp_test.go index 51cc665..55451c4 100644 --- a/cmd/collector/udp_test.go +++ b/cmd/collector/udp_test.go @@ -17,7 +17,7 @@ func TestUDPListenerRoundTrip(t *testing.T) { t.Fatalf("listen probe: %v", err) } addr := pc.LocalAddr().String() - pc.Close() // release; listener will re-bind + _ = pc.Close() // release; listener will re-bind ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -31,15 +31,15 @@ func TestUDPListenerRoundTrip(t *testing.T) { if err != nil { t.Fatalf("dial: %v", err) } - defer conn.Close() + defer func() { _ = conn.Close() }() // The listener is started asynchronously; retry for up to 1s. good := "v1\twww.example.com\t1.2.3.4\tGET\t/\t200\t42\t0.010\t0\t12345\tdirect\t10.0.0.1\thttps" bad := "not enough\tfields" deadline := time.Now().Add(time.Second) for time.Now().Before(deadline) { - conn.Write([]byte(good)) - conn.Write([]byte(bad)) + _, _ = conn.Write([]byte(good)) + _, _ = conn.Write([]byte(bad)) select { case rec := <-ch: if rec.Website != "www.example.com" || rec.SourceTag != "direct" { @@ -80,7 +80,7 @@ func TestUDPListenerBatchedDatagram(t *testing.T) { t.Fatalf("listen probe: %v", err) } addr := pc.LocalAddr().(*net.UDPAddr) - pc.Close() + _ = pc.Close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -100,7 +100,7 @@ func TestUDPListenerBatchedDatagram(t *testing.T) { if err != nil { t.Fatalf("src listen: %v", err) } - defer src.Close() + defer func() { _ = src.Close() }() // Drive the listener with retries until all three records land. got := make(map[string]bool) @@ -148,7 +148,7 @@ func TestUDPListenerMultipleSources(t *testing.T) { t.Fatalf("listen probe: %v", err) } addr := pc.LocalAddr().(*net.UDPAddr) - pc.Close() + _ = pc.Close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -166,7 +166,7 @@ func TestUDPListenerMultipleSources(t *testing.T) { if err != nil { t.Fatalf("%s listen: %v", tag, err) } - defer src.Close() + defer func() { _ = src.Close() }() deadline := time.Now().Add(time.Second) for time.Now().Before(deadline) { if _, err := src.WriteTo(good, addr); err != nil { diff --git a/cmd/frontend/filter.go b/cmd/frontend/filter.go index f148c8e..cc80211 100644 --- a/cmd/frontend/filter.go +++ b/cmd/frontend/filter.go @@ -51,7 +51,7 @@ func applyTerm(term string, fs *filterState) error { // Find the first operator character: ~, !, >, <, = opIdx := strings.IndexAny(term, "~!><=") if opIdx <= 0 { - return fmt.Errorf("invalid term %q: expected field=value, field>=value, field~=regex, etc.", term) + return fmt.Errorf("invalid term %q: expected field=value, field>=value, field~=regex, etc", term) } field := strings.ToLower(strings.TrimSpace(term[:opIdx])) diff --git a/cmd/frontend/frontend_test.go b/cmd/frontend/frontend_test.go index 784c3af..f9b7415 100644 --- a/cmd/frontend/frontend_test.go +++ b/cmd/frontend/frontend_test.go @@ -52,7 +52,7 @@ func startFake(t *testing.T, fs *fakeServer) string { } srv := grpc.NewServer() pb.RegisterLogtailServiceServer(srv, fs) - go srv.Serve(lis) + go func() { _ = srv.Serve(lis) }() t.Cleanup(srv.GracefulStop) return lis.Addr().String() } @@ -541,7 +541,7 @@ func TestDialFake(t *testing.T) { if err != nil { t.Fatal(err) } - defer conn.Close() + defer func() { _ = conn.Close() }() _ = client // If we got here without error, the fake server is reachable. diff --git a/cmd/frontend/handler.go b/cmd/frontend/handler.go index 2bcfeca..0b85d85 100644 --- a/cmd/frontend/handler.go +++ b/cmd/frontend/handler.go @@ -542,7 +542,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { fmt.Sprintf("cannot connect to %s: %v", params.Target, err))) return } - defer conn.Close() + defer func() { _ = conn.Close() }() ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() @@ -589,7 +589,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } resp, err := ltClient.ListTargets(ctx, &pb.ListTargetsRequest{}) if ltConn != nil { - ltConn.Close() + _ = ltConn.Close() } if err != nil { ltCh <- nil @@ -683,5 +683,5 @@ func writeRawJSON(w http.ResponseWriter, params QueryParams, resp *pb.TopNRespon o.Entries[i] = entry{Label: e.Label, Count: e.Count} } w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(o) + _ = json.NewEncoder(w).Encode(o) } diff --git a/cmd/frontend/main.go b/cmd/frontend/main.go index aa0c37e..2844124 100644 --- a/cmd/frontend/main.go +++ b/cmd/frontend/main.go @@ -59,7 +59,7 @@ func main() { <-ctx.Done() log.Printf("frontend: shutting down") - srv.Shutdown(context.Background()) + _ = srv.Shutdown(context.Background()) } func envOr(key, def string) string { diff --git a/docs/design.md b/docs/design.md index 6851b76..df48c12 100644 --- a/docs/design.md +++ b/docs/design.md @@ -64,8 +64,9 @@ natively, so operators can run it either from on-disk log files, from the UDP fe ### Non-Goals -- The system does **not** parse arbitrary nginx `log_format` strings. Two fixed tab-separated formats are supported: a file format and - a UDP format (see FR-2). Operators who need general parsing should use Vector, Fluent Bit, or Promtail. +- The system does **not** parse arbitrary nginx `log_format` strings. A single versioned tab-separated format is + supported on both file and UDP ingest (see FR-2). Operators who need general parsing should use Vector, Fluent Bit, or + Promtail. - The system does **not** store raw log lines. Counts are aggregated at ingest; the original log lines are not kept in memory or on disk. The project does not replace an access log. - The system does **not** persist counters across restarts. Ring buffers are in-memory only. On aggregator restart, historical state @@ -98,50 +99,92 @@ Each requirement carries a unique identifier (`FR-X.Y` or `NFR-X.Y`) so that lat working set. - **FR-1.5** `http_response` MUST be the HTTP status code as recorded by nginx. - **FR-1.6** `is_tor` MUST be a boolean, populated by the operator in the log format (typically via a lookup against a TOR exit-node - list). For the file format, lines without this field default to `false` for backward compatibility. -- **FR-1.7** `asn` MUST be an int32 decimal value sourced from MaxMind GeoIP2 (or equivalent). For the file format, lines without - this field default to `0`. -- **FR-1.8** `ipng_source_tag` MUST be a short string identifying which attribution tag the request arrived under. For records from - on-disk log files, the collector MUST assign the tag `"direct"` (mirroring `nginx-ipng-stats-plugin`'s default-source convention). For - records from the UDP stream, the tag is taken from the log line as emitted by the plugin. + list). Operators without TOR data MUST emit literal `0`. +- **FR-1.7** `asn` MUST be an int32 decimal value sourced from MaxMind GeoIP2 (or equivalent). Operators without GeoIP data MUST + emit literal `0`. +- **FR-1.8** `ipng_source_tag` MUST be a short string identifying which attribution tag the request arrived under. The tag is + always taken verbatim from the log line; the collector does NOT synthesise a fallback. Operators not running + `nginx-ipng-stats-plugin` MUST emit a literal value (typically `"direct"`). **FR-2 Log formats** -- **FR-2.1 File format.** The collector MUST accept nginx access logs in the following tab-separated layout, with the last two fields - (`is_tor`, `asn`) optional for backward compatibility: +- **FR-2.1 Versioned dispatch.** Both the file tailer and the UDP listener MUST funnel every input line through a single + parser that switches on a leading `v\t` version tag. Lines without a recognised tag — including the legacy + positional file format — MUST be rejected and counted as parse failures. Two versions are defined: `v1` (FR-2.2) and + `v2` (FR-2.3). Both ingest paths accept both versions; downstream processing is identical regardless of which path the + line came in over. `$server_addr` and `$scheme` are parsed but discarded — they are reserved for future use. + +- **FR-2.2 v1 format.** The v1 payload MUST be exactly 12 tab-separated fields after the `v1` tag (13 fields total). ```nginx - log_format logtail '$host\t$remote_addr\t$msec\t$request_method\t$request_uri\t$status\t$body_bytes_sent\t$request_time\t$is_tor\t$asn'; + log_format ipng_stats_logtail + 'v1\t$host\t$remote_addr\t$request_method\t$request_uri\t$status\t' + '$body_bytes_sent\t$request_time\t$is_tor\t$asn\t$ipng_source_tag\t$server_addr\t$scheme'; ``` - | # | Field | Ingested into | - |---|-------------------|----------------------------| - | 0 | `$host` | `website` | - | 1 | `$remote_addr` | `client_prefix` (truncated)| - | 2 | `$msec` | (discarded) | - | 3 | `$request_method` | Prom `method` label | - | 4 | `$request_uri` | `http_request_uri` | - | 5 | `$status` | `http_response` | - | 6 | `$body_bytes_sent`| Prom body histogram | - | 7 | `$request_time` | Prom duration histogram | - | 8 | `$is_tor` | `is_tor` (optional) | - | 9 | `$asn` | `asn` (optional) | + | # | Field | Ingested into | + |---|-------------------|-------------------------------------| + | 0 | `v1` | version tag | + | 1 | `$host` | `website` | + | 2 | `$remote_addr` | `client_prefix` (truncated) | + | 3 | `$request_method` | Prom `method` label | + | 4 | `$request_uri` | `http_request_uri` (query stripped) | + | 5 | `$status` | `http_response` | + | 6 | `$body_bytes_sent`| Prom `nginx_http_bytes_sent` | + | 7 | `$request_time` | Prom `nginx_http_request_duration_seconds` | + | 8 | `$is_tor` | `is_tor` | + | 9 | `$asn` | `asn` | + | 10| `$ipng_source_tag`| `source_tag` | + | 11| `$server_addr` | *(parsed and discarded)* | + | 12| `$scheme` | *(parsed and discarded)* | -- **FR-2.2 UDP format.** The collector MUST accept datagrams in a versioned tab-separated layout, as emitted by - `nginx-ipng-stats-plugin`'s `ipng_stats_logtail` directive. Every datagram MUST begin with a literal version tag - (`v\t`) so the collector can route each packet to the appropriate parser. Only `v1` is defined in this revision; - unknown versions MUST be counted as parse failures and dropped. +- **FR-2.3 v2 format.** The v2 payload MUST be exactly 15 tab-separated fields after the `v2` tag (16 fields total). + v2 replaces `$body_bytes_sent` with `$bytes_sent` (full wire bytes including headers) and adds four operationally + important fields: `$request_length` (request size including headers), `$upstream_response_time`, `$upstream_status`, + and the existing v1 fields rearranged for clarity. ```nginx - log_format ipng_stats_logtail 'v1\t$host\t$remote_addr\t$request_method\t$request_uri\t$status\t$body_bytes_sent\t$request_time\t$is_tor\t$asn\t$ipng_source_tag\t$server_addr\t$scheme'; + log_format ipng_stats_logtail + 'v2\t$host\t$remote_addr\t$request_method\t$request_uri\t$status\t' + '$bytes_sent\t$request_length\t$request_time\t$upstream_response_time\t$upstream_status\t' + '$is_tor\t$asn\t$ipng_source_tag\t$server_addr\t$scheme'; ``` - The v1 payload MUST have exactly 12 tab-separated fields after the `v1` tag (13 fields total). `$server_addr` and - `$scheme` MUST be parsed but dropped; they are reserved for future use. Malformed datagrams (wrong version, wrong - field count, bad IP) MUST be counted (FR-8.5) and silently dropped. + | # | Field | Ingested into | + |---|---------------------------|----------------------------------------------| + | 0 | `v2` | version tag | + | 1 | `$host` | `website` | + | 2 | `$remote_addr` | `client_prefix` (truncated) | + | 3 | `$request_method` | Prom `method` label | + | 4 | `$request_uri` | `http_request_uri` (query stripped) | + | 5 | `$status` | `http_response` | + | 6 | `$bytes_sent` | Prom `nginx_http_bytes_sent` | + | 7 | `$request_length` | Prom `nginx_http_request_bytes` (v2-only) | + | 8 | `$request_time` | Prom `nginx_http_request_duration_seconds` | + | 9 | `$upstream_response_time` | Prom `nginx_http_upstream_duration_seconds` (v2-only) | + | 10| `$upstream_status` | Prom `nginx_http_upstream_requests_total` (v2-only) | + | 11| `$is_tor` | `is_tor` | + | 12| `$asn` | `asn` | + | 13| `$ipng_source_tag` | `source_tag` | + | 14| `$server_addr` | *(parsed and discarded)* | + | 15| `$scheme` | *(parsed and discarded)* | -- **FR-2.3** The file tailer MUST set `source_tag="direct"` on every record it parses. The UDP listener MUST propagate - `$ipng_source_tag` verbatim. This is the only difference in downstream processing between the two ingest paths. + When nginx serves the response without an upstream (static files, redirects, errors), nginx emits literal `-` for + `$upstream_response_time` and `$upstream_status`. The parser MUST treat that as "no upstream", skip the upstream + histograms, and not increment the upstream counter. When nginx retries across multiple upstreams, both fields are + comma-separated; the parser MUST keep the last entry, since that is the upstream that ultimately served the response. + +- **FR-2.4 Semantic shift on v2 rollout.** v1 fills `nginx_http_bytes_sent` from `$body_bytes_sent`; v2 fills it from + `$bytes_sent`. Operators MUST expect a small step up in the metric when emitters move from v1 to v2 (header overhead; + typically a few hundred bytes per response). + +- **FR-2.5 Malformed input.** Lines with an unknown version, the wrong field count for the claimed version, or an + unparsable IP MUST be silently dropped. UDP drops MUST be counted via FR-8.6; file-path drops are implicit (the tailer + falls behind the file). + +- **FR-2.6 Unknown `$is_tor` / `$asn`.** Operators without TOR or GeoIP data MUST emit literal `0` for both fields. A + literal `0` in `$is_tor` parses as `false`; a literal `0` in `$asn` parses as ASN `0`, filterable at query time with + `--asn '!=0'`. **FR-3 Ring buffers and time windows** @@ -242,15 +285,21 @@ Each requirement carries a unique identifier (`FR-X.Y` or `NFR-X.Y`) so that lat - **FR-8.2** The collector MUST expose a per-request counter `nginx_http_requests_total{host, method, status}` capped at `promCounterCap = 250 000` distinct label sets. When the cap is reached, further new label sets MUST be dropped (existing series keep incrementing) until the map is rolled over. -- **FR-8.3** The collector MUST expose per-host histograms - `nginx_http_response_body_bytes{host, le}` (body-size distribution) and - `nginx_http_request_duration_seconds{host, le}` (request-time distribution). The duration histogram MUST NOT be split by - `source_tag` — its bucket count would multiply without operational benefit. -- **FR-8.4** The collector MUST expose two parallel roll-ups labeled by `source_tag` only (not cross-producted with host): - `nginx_http_requests_by_source_total{source_tag}` and - `nginx_http_response_body_bytes_by_source{source_tag, le}`. These are separate metric names to avoid inconsistent label sets - under a single name. -- **FR-8.5** The collector MUST expose three counters that let operators distinguish UDP parse failures from back-pressure drops: +- **FR-8.3** The collector MUST expose two histograms keyed by `{host, source_tag}`: + `nginx_http_bytes_sent{host, source_tag, le}` (response wire-bytes distribution; FR-2.4) and + `nginx_http_request_duration_seconds{host, source_tag, le}` (end-to-end request time distribution). + Cardinality is bounded by `host × source_tag × bucket_count`, which is small enough that no explicit cap is required. +- **FR-8.4** The collector MUST expose three v2-only metrics that are populated only when v2 records arrive (and, for the + upstream metrics, only when nginx involved an upstream): + `nginx_http_request_bytes{host, source_tag, le}` from `$request_length`, + `nginx_http_upstream_duration_seconds{host, source_tag, le}` from `$upstream_response_time`, and + `nginx_http_upstream_requests_total{host, source_tag, status_class}` from `$upstream_status`. `status_class` is the + HTTP class of the upstream's status code, folded to `2xx`/`3xx`/`4xx`/`5xx`/`other`. +- **FR-8.5** The collector MUST expose a source-tag rollup counter + `nginx_http_requests_by_source_total{source_tag, status_class}`. `status_class` is the HTTP class of `$status`, folded + the same way as in FR-8.4. This rollup is intentionally not cross-producted with `host` — its purpose is fleet-wide + source-attribution health, not per-host detail. +- **FR-8.6** The collector MUST expose three counters that let operators distinguish UDP parse failures from back-pressure drops: `logtail_udp_packets_received_total` (datagrams off the socket, one increment per `recvfrom`), `logtail_udp_loglines_success_total` (log lines that parsed OK, incremented once per log line — a single batched datagram from the nginx plugin may contribute many), and @@ -279,13 +328,13 @@ Each requirement carries a unique identifier (`FR-X.Y` or `NFR-X.Y`) so that lat for a collector is therefore approximately 845 MB (live map ~19 MB + fine ring ~558 MB + coarse ring ~268 MB). - **NFR-2.3** The aggregator MUST apply the same tier caps as the collector. Its steady-state memory is roughly equivalent to one collector regardless of the number of collectors subscribed. -- **NFR-2.4** The Prometheus counter map (FR-8.2) MUST be capped at `promCounterCap = 250 000` entries. The per-host and per-source - histograms MUST NOT be capped explicitly — they grow only with the distinct host count, which is bounded by the operator's vhost - configuration. +- **NFR-2.4** The Prometheus counter map (FR-8.2) MUST be capped at `promCounterCap = 250 000` entries. The dual-labeled + `{host, source_tag}` histograms MUST NOT be capped explicitly — they grow only with the cross-product of distinct + hosts and distinct source tags, both bounded by the operator's nginx configuration. **NFR-3 Performance** -- **NFR-3.1** `ParseLine` and `ParseUDPLine` MUST use `strings.Split` / `strings.SplitN` (no regex), so that per-line cost stays +- **NFR-3.1** `ParseLine` MUST use `strings.Split` / `strings.IndexByte` (no regex), so that per-line cost stays around 50 ns on commodity hardware. - **NFR-3.2** `TopN` and `Trend` queries across the full 24-hour coarse ring MUST complete in well under 250 ms at the 50 000-entry fine cap, for fully-specified filters. @@ -417,8 +466,10 @@ connected. #### Key data types -- `LogRecord` — ten fields (website, client_prefix, URI, status, is_tor, asn, method, body_bytes_sent, request_time, source_tag). - Produced by `ParseLine` or `ParseUDPLine` and consumed by the store goroutine. +- `LogRecord` — fourteen fields (website, client_prefix, URI, status, is_tor, asn, method, bytes_sent, request_length, + request_time, upstream_response_time, upstream_status, has_upstream, source_tag). Produced by `ParseLine` (which + dispatches on the `v\t` prefix) and consumed by the store goroutine. v1 records leave the v2-only fields + (`request_length`, upstream_*) at zero / false. - `Tuple6` (historical name; carries seven fields now) — the aggregation key. NUL-separated when encoded as a map key for snapshots. The code name is intentionally stable so downstream tests and consumers are not churned. - `Snapshot` — `(timestamp, []Entry)` where `Entry = (label, count)` and `label` is an encoded `Tuple6`. @@ -559,9 +610,10 @@ transitions. No per-request logging. - **Frontend crash.** Stateless. Operator restarts. - **UDP datagram loss.** Any datagram dropped in-kernel (socket buffer full, network drop) does not register as a parse failure; it is simply invisible. Operators should size `SO_RCVBUF` appropriately; the collector already requests 4 MiB. -- **Malformed log lines.** File format: lines with <8 tab-separated fields are silently skipped; an invalid IP also drops the line. - UDP: packets without a recognised `v\t` prefix, or with the wrong field count for the claimed version, or with a bad IP, are - counted as received-but-not-success and dropped. +- **Malformed log lines.** Both ingest paths use the versioned `v\t` parser (FR-2). Lines without a recognised version + prefix, with the wrong field count for the claimed version, or with a bad IP are silently dropped. UDP drops are + visible as `packets_received_total - loglines_success_total`; file-path drops are implicit (the tailer simply moves + past them). - **Clock skew between collectors.** Trend sparklines derived from merged data assume collectors are roughly NTP-synced. Per-bucket alignment is to the local minute / 5-minute boundary of each collector. - **gRPC traffic over untrusted links.** The system does not ship TLS; operators should front the gRPC ports with a TLS-terminating @@ -588,11 +640,11 @@ transitions. No per-request logging. - **pull-based collector polling (aggregator polls collectors every second).** Rejected in favor of push. Polling multiplies query latency and makes the aggregator's cache stale by the poll interval. Push-stream with delta merge keeps the cache within seconds of real time. -- **One metric name for both per-host and per-source_tag roll-ups.** Rejected for Prometheus hygiene. Mixing different label sets - under one metric name breaks aggregation rules; separate metric names (`_by_source`) are clearer and easier to query. -- **Cross-product of `host × source_tag` for every counter and histogram.** Rejected. With ~20 tags and ~50 hosts the cardinality - explodes quickly on the duration histogram without operational benefit. The duration histogram stays per-host; requests and body - size get a parallel `_by_source` rollup. +- **Separate `_by_source` metric names with a single label.** The original v0.2 layout exposed `_by_source` siblings to + avoid mixing label sets under one metric name. Superseded by the v0.3 layout: histograms now carry both `host` and + `source_tag` directly, and the source-tag rollup counter gains a `status_class` label. Cardinality stays bounded + (~7 hosts × ~6 tags × 11 buckets ≈ 460 series per histogram), and Grafana queries become simpler (`sum by(source_tag)` + rather than picking a different metric name). - **Writing every `snapshot` to disk for restart recovery.** Rejected in favor of `DumpSnapshots` RPC backfill. Disk-backed persistence would multiply operational surface (rotation, fsck, permissions) for a feature that needs to survive only an aggregator restart. diff --git a/docs/user-guide.md b/docs/user-guide.md index 693dfbb..f8b03f4 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -131,101 +131,98 @@ or for temporary overrides, without editing the unit. The file is **not a dpkg conffile**: postinst writes it only when absent, so operator edits survive upgrades, and `dpkg --purge` removes it. -### nginx — file-based ingest +### nginx — log format -Add the `logtail` format and attach it to whichever `server` blocks you want tracked: +Both ingest paths (file and UDP) use the same versioned tab-separated format. Every line MUST +begin with a literal `v1\t` or `v2\t` prefix; lines without a recognised prefix are dropped. +Two versions are defined; you can mix them across a fleet during a rollout (the collector +parses both). -```nginx -http { - log_format logtail '$host\t$remote_addr\t$msec\t$request_method\t$request_uri\t$status\t$body_bytes_sent\t$request_time\t$is_tor\t$asn'; +#### v2 (recommended) - server { - access_log /var/log/nginx/access.log logtail; - # or per-vhost: - access_log /var/log/nginx/www.example.com.access.log logtail; - } -} -``` - -Tab-separated, fixed field order, ten fields. The precise layout: - -| # | Field | Ingested into | -|---|-------------------|--------------------------| -| 0 | `$host` | `website` | -| 1 | `$remote_addr` | `client_prefix` (truncated) | -| 2 | `$msec` | *(discarded)* | -| 3 | `$request_method` | Prom `method` label | -| 4 | `$request_uri` | `http_request_uri` (query stripped) | -| 5 | `$status` | `http_response` | -| 6 | `$body_bytes_sent`| Prom body histogram | -| 7 | `$request_time` | Prom duration histogram | -| 8 | `$is_tor` | `is_tor` (optional) | -| 9 | `$asn` | `asn` (optional) | - -`$is_tor` is `1` if the client IP is a TOR exit node and `0` otherwise (typically populated -via a Lua script or `$geoip2_data_*`). `$asn` is the client AS number as a decimal integer -(e.g. MaxMind GeoIP2's `$geoip2_data_autonomous_system_number`). - -**If either is unknown, emit `0`.** A literal `0` in `$is_tor` parses as `false`; a literal -`0` in `$asn` parses as ASN `0`, which you can exclude at query time with `--asn '!=0'` / the -`asn!=0` filter expression. Operators who don't have TOR or GeoIP data can simply emit `0` for -both columns and everything works. - -Both fields are also **positionally optional** for backward compatibility — older 8-field -lines are accepted and default to `false` / `0`. Records from the file tailer are always -tagged `source_tag="direct"`. - -Then point the collector at the log files via `COLLECTOR_LOGS` — comma-separated paths or -glob patterns. Make sure the files are group-readable by `www-data` (the collector's primary -group in the systemd unit). - -### nginx — UDP ingest (`nginx-ipng-stats-plugin`) - -If the nginx host runs [`nginx-ipng-stats-plugin`](https://git.ipng.ch/ipng/nginx-ipng-stats-plugin), -the plugin's `ipng_stats_logtail` directive emits one UDP datagram per request directly to -the collector, no log file involved. The wire format is **versioned** — every datagram starts -with a literal `v1\t` prefix so the collector can ship new parser versions (v2, v3, …) before -emitters are upgraded and route each packet accordingly. +v2 carries five operationally important fields v1 lacks: `$bytes_sent` (full wire bytes, +replaces `$body_bytes_sent`), `$request_length` (request size including headers), +`$upstream_response_time`, and `$upstream_status`. Together they let dashboards split +end-to-end latency into upstream vs. nginx overhead, attribute errors to the upstream vs. the +edge, and report ingress bandwidth. ```nginx http { log_format ipng_stats_logtail - 'v1\t$host\t$remote_addr\t$request_method\t$request_uri\t$status\t$body_bytes_sent\t$request_time\t$is_tor\t$asn\t$ipng_source_tag\t$server_addr\t$scheme'; + 'v2\t$host\t$remote_addr\t$request_method\t$request_uri\t$status\t' + '$bytes_sent\t$request_length\t$request_time\t$upstream_response_time\t$upstream_status\t' + '$is_tor\t$asn\t$ipng_source_tag\t$server_addr\t$scheme'; + # File ingest: + server { + access_log /var/log/nginx/access.log ipng_stats_logtail; + } + # UDP ingest (nginx-ipng-stats-plugin): ipng_stats_logtail ipng_stats_logtail udp://127.0.0.1:9514 buffer=64k flush=1s; } ``` -Precise v1 layout — 13 tab-separated fields total (version prefix + 12 payload fields): +| # | Field | Ingested into | +|---|---------------------------|---------------------------------------------------| +| 0 | `v2` | version tag | +| 1 | `$host` | `website` | +| 2 | `$remote_addr` | `client_prefix` (truncated) | +| 3 | `$request_method` | Prom `method` label | +| 4 | `$request_uri` | `http_request_uri` (query stripped) | +| 5 | `$status` | `http_response` | +| 6 | `$bytes_sent` | Prom `nginx_http_bytes_sent` | +| 7 | `$request_length` | Prom `nginx_http_request_bytes` | +| 8 | `$request_time` | Prom `nginx_http_request_duration_seconds` | +| 9 | `$upstream_response_time` | Prom `nginx_http_upstream_duration_seconds` | +| 10| `$upstream_status` | Prom `nginx_http_upstream_requests_total` | +| 11| `$is_tor` | `is_tor` | +| 12| `$asn` | `asn` | +| 13| `$ipng_source_tag` | `source_tag` | +| 14| `$server_addr` | *(parsed and discarded)* | +| 15| `$scheme` | *(parsed and discarded)* | -| # | Field | Ingested into | -|---|-------------------|------------------------------| -| 0 | `v1` | version tag | -| 1 | `$host` | `website` | -| 2 | `$remote_addr` | `client_prefix` (truncated) | -| 3 | `$request_method` | Prom `method` label | -| 4 | `$request_uri` | `http_request_uri` (query stripped) | -| 5 | `$status` | `http_response` | -| 6 | `$body_bytes_sent`| Prom body histogram | -| 7 | `$request_time` | Prom duration histogram | -| 8 | `$is_tor` | `is_tor` | -| 9 | `$asn` | `asn` | -| 10| `$ipng_source_tag`| `source_tag` | -| 11| `$server_addr` | *(parsed and discarded)* | -| 12| `$scheme` | *(parsed and discarded)* | +For requests served without an upstream (static files, redirects, errors), nginx emits +literal `-` for `$upstream_response_time` and `$upstream_status`; the parser treats those as +"no upstream" and skips the upstream metrics rather than counting them as zeros. When nginx +retries across multiple upstreams, both fields are comma-separated and the parser keeps the +last value (the upstream that ultimately served the response). -Compared to the file format: the version tag is added, `$msec` is dropped, and three fields -are appended — `$ipng_source_tag` (propagated into the data model), `$server_addr` and -`$scheme` (reserved for future use). +#### v1 (legacy) -**Unknown `$is_tor` / `$asn`: emit `0`.** Same convention as the file format — operators -without TOR or GeoIP data can emit `0` for both columns and everything works. A literal `0` -in `$is_tor` is `false`; a literal `0` in `$asn` is ASN `0`, filterable at query time. +v1 is preserved unchanged so existing emitters can be upgraded after the collector. Layout: -All 13 fields are required for v1 — malformed packets (wrong version, wrong field count, bad -IP) are silently dropped and counted via `logtail_udp_packets_received_total` minus -`logtail_udp_loglines_success_total`. Both paths (file + UDP) can feed the same collector -simultaneously; they converge on the same aggregation pipeline. +```nginx +log_format ipng_stats_logtail + 'v1\t$host\t$remote_addr\t$request_method\t$request_uri\t$status\t' + '$body_bytes_sent\t$request_time\t$is_tor\t$asn\t$ipng_source_tag\t$server_addr\t$scheme'; +``` + +12 tab-separated payload fields after the `v1` prefix. v1 fills `nginx_http_bytes_sent` from +`$body_bytes_sent`; v2 fills it from `$bytes_sent`. Operators will see a small step up in +that metric (header overhead, typically a few hundred bytes per response) when emitters move +to v2. + +#### Required values + +`$is_tor` is `1` if the client IP is a TOR exit node and `0` otherwise (typically populated +via a Lua script or `$geoip2_data_*`). `$asn` is the client AS number as a decimal integer +(e.g. MaxMind GeoIP2's `$geoip2_data_autonomous_system_number`). Operators without TOR or +GeoIP data MUST emit literal `0` for both — a literal `0` in `$is_tor` parses as `false`; a +literal `0` in `$asn` is ASN `0`, filterable at query time with `--asn '!=0'`. + +`$ipng_source_tag` is provided by [`nginx-ipng-stats-plugin`](https://git.ipng.ch/ipng/nginx-ipng-stats-plugin). +Operators not running the plugin SHOULD declare a constant via `set $ipng_source_tag direct;` +in their `server` block — there is no synthesised fallback in the collector. + +#### Pointing the collector at logs + +For file ingest, set `COLLECTOR_LOGS` to comma-separated paths or glob patterns. Make sure +the files are group-readable by `www-data` (the collector's primary group in the systemd +unit). For UDP ingest, the plugin's `ipng_stats_logtail udp://127.0.0.1:9514` line above is +sufficient. Both paths can feed the same collector simultaneously and converge on the same +aggregation pipeline. Malformed lines (wrong version, wrong field count, bad IP) are silently +dropped; for UDP they show up as `logtail_udp_packets_received_total` minus +`logtail_udp_loglines_success_total`. --- @@ -303,21 +300,33 @@ the new file appears. No restart or SIGHUP required. The collector exposes a Prometheus-compatible `/metrics` endpoint on `--prom-listen` (default `:9100`). Set `--prom-listen ""` to disable it entirely. -**Per-host series:** +**Per-{host,source_tag} series** (both v1 and v2): - `nginx_http_requests_total{host, method, status}` — counter. Map capped at 250 000 distinct label sets; new entries beyond the cap are dropped until the map is rolled over. -- `nginx_http_response_body_bytes_{bucket,count,sum}{host, le}` — histogram of - `$body_bytes_sent`. Buckets (bytes): `256, 1024, 4096, 16384, 65536, 262144, 1048576, +Inf`. -- `nginx_http_request_duration_seconds_{bucket,count,sum}{host, le}` — histogram of - `$request_time`. Buckets (seconds): `0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, - 10, +Inf`. Not split by `source_tag` (duration histogram stays per-host to avoid cardinality - blow-up). +- `nginx_http_bytes_sent_{bucket,count,sum}{host, source_tag, le}` — histogram of response + size. v1 fills from `$body_bytes_sent`; v2 fills from `$bytes_sent`. Buckets (bytes): + `256, 1024, 4096, 16384, 65536, 262144, 1048576, +Inf`. +- `nginx_http_request_duration_seconds_{bucket,count,sum}{host, source_tag, le}` — histogram + of `$request_time`. Buckets (seconds): `0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, + 5, 10, +Inf`. -**Per-`source_tag` roll-ups** (parallel series, not a cross-product with `host`): +**v2-only series** (populated only when v2 emitters are running, and the upstream histograms +only when nginx involved an upstream): -- `nginx_http_requests_by_source_total{source_tag}` — counter. -- `nginx_http_response_body_bytes_by_source_{bucket,count,sum}{source_tag, le}` — histogram. +- `nginx_http_request_bytes_{bucket,count,sum}{host, source_tag, le}` — histogram of + `$request_length` (ingress, headers + body). Same byte buckets as `bytes_sent`. +- `nginx_http_upstream_duration_seconds_{bucket,count,sum}{host, source_tag, le}` — + histogram of `$upstream_response_time`. Same time buckets as `request_duration`. Lets you + split end-to-end latency into upstream vs. nginx overhead. +- `nginx_http_upstream_requests_total{host, source_tag, status_class}` — counter incremented + once per upstream-served request, classed by `$upstream_status` (`2xx`/`3xx`/`4xx`/`5xx`/`other`). + Lets you spot upstream errors masked at the edge (e.g. nginx 502 because origin 504). + +**Source-tag rollup** (fleet-wide attribution health, intentionally not crossed with host): + +- `nginx_http_requests_by_source_total{source_tag, status_class}` — counter classed by + `$status`. Use it to spot per-source error spikes without exploding cardinality. **UDP ingest counters** — lets operators distinguish parse failures from back-pressure drops: @@ -365,10 +374,23 @@ histogram_quantile(0.95, sum by (host, le) (rate(nginx_http_request_duration_seconds_bucket[5m])) ) -# Median response body size per host -histogram_quantile(0.50, - sum by (host, le) (rate(nginx_http_response_body_bytes_bucket[5m])) +# 95th percentile response time per source_tag (drill in further as needed) +histogram_quantile(0.95, + sum by (source_tag, le) (rate(nginx_http_request_duration_seconds_bucket[5m])) ) + +# Median response size per host +histogram_quantile(0.50, + sum by (host, le) (rate(nginx_http_bytes_sent_bucket[5m])) +) + +# v2-only: upstream P95, split out from nginx overhead +histogram_quantile(0.95, + sum by (host, le) (rate(nginx_http_upstream_duration_seconds_bucket[5m])) +) + +# v2-only: upstream 5xx rate per source_tag +sum by (source_tag) (rate(nginx_http_upstream_requests_total{status_class="5xx"}[5m])) ``` ### Memory usage