diff --git a/cmd/collector/udp.go b/cmd/collector/udp.go index 3cd208f..c6c88bc 100644 --- a/cmd/collector/udp.go +++ b/cmd/collector/udp.go @@ -74,21 +74,31 @@ func (u *UDPListener) Run(ctx context.Context) { if u.prom != nil { u.prom.IncUDPPacket() } - line := strings.TrimRight(string(buf[:n]), "\r\n") - rec, ok := ParseUDPLine(line, u.v4bits, u.v6bits) - if !ok { - continue - } - if u.prom != nil { - u.prom.IncUDPSuccess() - } - select { - case u.ch <- rec: - if u.prom != nil { - u.prom.IncUDPConsumed() + // nginx-ipng-stats-plugin batches log lines into a single UDP + // datagram (default buffer=64k flush=1s), so one packet may carry + // many lines. nginx's log_format always ends a rendered line with + // '\n'; split on that and process each line independently. + payload := strings.TrimRight(string(buf[:n]), "\r\n") + for _, line := range strings.Split(payload, "\n") { + line = strings.TrimSuffix(line, "\r") + if line == "" { + continue + } + rec, ok := ParseUDPLine(line, u.v4bits, u.v6bits) + if !ok { + continue + } + if u.prom != nil { + u.prom.IncUDPSuccess() + } + select { + case u.ch <- rec: + if u.prom != nil { + u.prom.IncUDPConsumed() + } + default: + // Channel full — drop rather than block the read loop. } - default: - // Channel full — drop rather than block the read loop. } } } diff --git a/cmd/collector/udp_test.go b/cmd/collector/udp_test.go index 5fc0d5a..51cc665 100644 --- a/cmd/collector/udp_test.go +++ b/cmd/collector/udp_test.go @@ -66,6 +66,76 @@ func TestUDPListenerRoundTrip(t *testing.T) { t.Fatal("no record received within 1s") } +// TestUDPListenerBatchedDatagram exercises the nginx-ipng-stats-plugin's +// buffer/flush batching: a single UDP datagram may contain many log lines +// separated by '\n'. Each line MUST be counted and parsed independently, +// so packets_received * avg_lines_per_packet ≈ loglines_success (not +// packets_received == success as the earlier single-line code assumed). +func TestUDPListenerBatchedDatagram(t *testing.T) { + ch := make(chan LogRecord, 16) + ps := NewPromStore() + + pc, err := net.ListenPacket("udp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen probe: %v", err) + } + addr := pc.LocalAddr().(*net.UDPAddr) + pc.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + u := NewUDPListener(addr.String(), 24, 48, ch) + u.SetProm(ps) + go u.Run(ctx) + + // Three v1 lines in one datagram, '\n'-terminated like nginx's + // log_format renders them. + batch := []byte( + "v1\ta.example.com\t1.2.3.4\tGET\t/a\t200\t10\t0.001\t0\t0\tdirect\t10.0.0.1\thttps\n" + + "v1\tb.example.com\t1.2.3.5\tGET\t/b\t404\t20\t0.002\t0\t0\tdirect\t10.0.0.1\thttps\n" + + "v1\tc.example.com\t1.2.3.6\tGET\t/c\t500\t30\t0.003\t0\t0\tdirect\t10.0.0.1\thttps\n", + ) + + src, err := net.ListenPacket("udp", "127.0.0.1:0") + if err != nil { + t.Fatalf("src listen: %v", err) + } + defer src.Close() + + // Drive the listener with retries until all three records land. + got := make(map[string]bool) + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) && len(got) < 3 { + if _, err := src.WriteTo(batch, addr); err != nil { + t.Fatalf("write: %v", err) + } + drain: + for { + select { + case rec := <-ch: + got[rec.Website] = true + case <-time.After(50 * time.Millisecond): + break drain + } + } + } + for _, want := range []string{"a.example.com", "b.example.com", "c.example.com"} { + if !got[want] { + t.Errorf("missing record for %s; got=%v", want, got) + } + } + + // Exactly one packet arrived yet three lines should have succeeded. + // Under retries the numbers will be multiples, but the ratio must + // always be success ≈ 3 × packets once we've seen each record. + ps.udpMu.Lock() + pkt, suc := ps.udpPacketsReceived, ps.udpLoglinesSuccess + ps.udpMu.Unlock() + if pkt == 0 || suc < pkt*3 { + t.Errorf("packets=%d success=%d: expected success >= 3*packets", pkt, suc) + } +} + // TestUDPListenerMultipleSources exercises the nginx-reload path: a fresh // nginx worker set opens brand-new send sockets (different ephemeral source // ports) and the listener MUST keep accepting their packets without diff --git a/docs/design.md b/docs/design.md index e63db0b..6851b76 100644 --- a/docs/design.md +++ b/docs/design.md @@ -251,9 +251,10 @@ Each requirement carries a unique identifier (`FR-X.Y` or `NFR-X.Y`) so that lat `nginx_http_response_body_bytes_by_source{source_tag, le}`. These are separate metric names to avoid inconsistent label sets under a single name. - **FR-8.5** The collector MUST expose three counters that let operators distinguish UDP parse failures from back-pressure drops: - `logtail_udp_packets_received_total` (datagrams off the socket), - `logtail_udp_loglines_success_total` (parsed OK), and - `logtail_udp_loglines_consumed_total` (forwarded to the store — i.e. not dropped). + `logtail_udp_packets_received_total` (datagrams off the socket, one increment per `recvfrom`), + `logtail_udp_loglines_success_total` (log lines that parsed OK, incremented once per log line — a single batched datagram from + the nginx plugin may contribute many), and + `logtail_udp_loglines_consumed_total` (log lines forwarded to the store channel — i.e. not dropped by back-pressure). ### Non-Functional Requirements @@ -539,8 +540,8 @@ Primary channel is the collector's Prometheus endpoint (FR-8). Beyond the per-ho three UDP counters give direct visibility into the UDP ingest path: - `logtail_udp_packets_received_total` — what arrived. -- `logtail_udp_loglines_success_total` — what parsed cleanly. -- `logtail_udp_loglines_consumed_total` — what made it to the store (i.e. was not dropped by a full channel). +- `logtail_udp_loglines_success_total` — log lines that parsed cleanly (one datagram may contribute many). +- `logtail_udp_loglines_consumed_total` — log lines that made it to the store (i.e. were not dropped by a full channel). `received - success` is the parse-failure rate; `success - consumed` is the back-pressure drop rate. Operators should alert on both being non-zero. diff --git a/docs/user-guide.md b/docs/user-guide.md index 7d40e84..693dfbb 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -322,11 +322,17 @@ The collector exposes a Prometheus-compatible `/metrics` endpoint on `--prom-lis **UDP ingest counters** — lets operators distinguish parse failures from back-pressure drops: - `logtail_udp_packets_received_total` — datagrams read off the socket. -- `logtail_udp_loglines_success_total` — parsed OK. -- `logtail_udp_loglines_consumed_total` — forwarded to the store (not dropped). +- `logtail_udp_loglines_success_total` — log lines parsed OK. +- `logtail_udp_loglines_consumed_total` — log lines forwarded to the store (not dropped). -`received - success` is the parse-failure rate; `success - consumed` is the back-pressure -drop rate. Alert on either being non-zero. +Note the unit mismatch: `packets_*` counts datagrams, `loglines_*` counts log lines. +The nginx plugin batches many log lines into a single UDP datagram (default `buffer=64k +flush=1s`), so `loglines_success ≫ packets_received` is normal — operators should see +roughly `loglines_success / packets_received ≈ avg lines per batch`. + +`loglines_success - loglines_consumed` is the back-pressure drop rate (channel full). +A large gap between `packets_received * expected_lines_per_packet` and `loglines_success` +indicates parse failures. **Prometheus scrape config:**