Files
nginx-logtail/cmd/collector/udp_test.go
Pim van Pelt e1f8bc5eb4 fix: UDP listener parses batched datagrams
nginx-ipng-stats-plugin's ipng_stats_logtail directive buffers many log
lines into a single UDP datagram (default buffer=64k flush=1s). The
listener was treating each datagram as exactly one log line, so any
datagram with N>1 lines failed the v1 field-count check and dropped
silently. In production this showed up as logtail_udp_packets_received_total
roughly 4x logtail_udp_loglines_success_total — matching typical
burst-coalesced 4-lines-per-batch ratios.

Fix: strip trailing CRLF, split the payload on '\n', parse each
non-empty line independently. Counter semantics now match the names:

  packets_received  — datagrams off the socket (one per recvfrom)
  loglines_success  — log lines parsed OK (may be many per datagram)
  loglines_consumed — log lines forwarded to the store (not dropped)

After the fix, loglines_success ≈ packets_received × avg_lines_per_batch.

Regression test TestUDPListenerBatchedDatagram sends one datagram with
three '\n'-separated v1 lines and asserts all three LogRecords arrive,
plus loglines_success >= 3 * packets_received.

Docs (user-guide.md, design.md) now explain the datagram-vs-line unit
distinction so operators don't misread the ratio.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-17 11:59:43 +02:00

193 lines
5.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"context"
"net"
"testing"
"time"
)
func TestUDPListenerRoundTrip(t *testing.T) {
ch := make(chan LogRecord, 4)
ps := NewPromStore()
// Bind to an ephemeral port on loopback.
pc, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen probe: %v", err)
}
addr := pc.LocalAddr().String()
pc.Close() // release; listener will re-bind
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
u := NewUDPListener(addr, 24, 48, ch)
u.SetProm(ps)
go u.Run(ctx)
// Dial the listener and send one valid and one malformed packet.
conn, err := net.Dial("udp", addr)
if err != nil {
t.Fatalf("dial: %v", err)
}
defer conn.Close()
// The listener is started asynchronously; retry for up to 1s.
good := "v1\twww.example.com\t1.2.3.4\tGET\t/\t200\t42\t0.010\t0\t12345\tdirect\t10.0.0.1\thttps"
bad := "not enough\tfields"
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
conn.Write([]byte(good))
conn.Write([]byte(bad))
select {
case rec := <-ch:
if rec.Website != "www.example.com" || rec.SourceTag != "direct" {
t.Fatalf("bad record: %+v", rec)
}
// Give the listener a moment to process the malformed packet too.
time.Sleep(50 * time.Millisecond)
ps.udpMu.Lock()
pkt, suc, con := ps.udpPacketsReceived, ps.udpLoglinesSuccess, ps.udpLoglinesConsumed
ps.udpMu.Unlock()
if pkt < 2 {
t.Errorf("udpPacketsReceived=%d, want >=2", pkt)
}
if suc < 1 {
t.Errorf("udpLoglinesSuccess=%d, want >=1", suc)
}
if con < 1 {
t.Errorf("udpLoglinesConsumed=%d, want >=1", con)
}
return
case <-time.After(50 * time.Millisecond):
}
}
t.Fatal("no record received within 1s")
}
// TestUDPListenerBatchedDatagram exercises the nginx-ipng-stats-plugin's
// buffer/flush batching: a single UDP datagram may contain many log lines
// separated by '\n'. Each line MUST be counted and parsed independently,
// so packets_received * avg_lines_per_packet ≈ loglines_success (not
// packets_received == success as the earlier single-line code assumed).
func TestUDPListenerBatchedDatagram(t *testing.T) {
ch := make(chan LogRecord, 16)
ps := NewPromStore()
pc, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen probe: %v", err)
}
addr := pc.LocalAddr().(*net.UDPAddr)
pc.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
u := NewUDPListener(addr.String(), 24, 48, ch)
u.SetProm(ps)
go u.Run(ctx)
// Three v1 lines in one datagram, '\n'-terminated like nginx's
// log_format renders them.
batch := []byte(
"v1\ta.example.com\t1.2.3.4\tGET\t/a\t200\t10\t0.001\t0\t0\tdirect\t10.0.0.1\thttps\n" +
"v1\tb.example.com\t1.2.3.5\tGET\t/b\t404\t20\t0.002\t0\t0\tdirect\t10.0.0.1\thttps\n" +
"v1\tc.example.com\t1.2.3.6\tGET\t/c\t500\t30\t0.003\t0\t0\tdirect\t10.0.0.1\thttps\n",
)
src, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Fatalf("src listen: %v", err)
}
defer src.Close()
// Drive the listener with retries until all three records land.
got := make(map[string]bool)
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) && len(got) < 3 {
if _, err := src.WriteTo(batch, addr); err != nil {
t.Fatalf("write: %v", err)
}
drain:
for {
select {
case rec := <-ch:
got[rec.Website] = true
case <-time.After(50 * time.Millisecond):
break drain
}
}
}
for _, want := range []string{"a.example.com", "b.example.com", "c.example.com"} {
if !got[want] {
t.Errorf("missing record for %s; got=%v", want, got)
}
}
// Exactly one packet arrived yet three lines should have succeeded.
// Under retries the numbers will be multiples, but the ratio must
// always be success ≈ 3 × packets once we've seen each record.
ps.udpMu.Lock()
pkt, suc := ps.udpPacketsReceived, ps.udpLoglinesSuccess
ps.udpMu.Unlock()
if pkt == 0 || suc < pkt*3 {
t.Errorf("packets=%d success=%d: expected success >= 3*packets", pkt, suc)
}
}
// TestUDPListenerMultipleSources exercises the nginx-reload path: a fresh
// nginx worker set opens brand-new send sockets (different ephemeral source
// ports) and the listener MUST keep accepting their packets without
// needing a restart. Regresses against the `nc -k -u -l` latching pitfall.
func TestUDPListenerMultipleSources(t *testing.T) {
ch := make(chan LogRecord, 16)
pc, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen probe: %v", err)
}
addr := pc.LocalAddr().(*net.UDPAddr)
pc.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go NewUDPListener(addr.String(), 24, 48, ch).Run(ctx)
good := []byte("v1\twww.example.com\t1.2.3.4\tGET\t/\t200\t42\t0.010\t0\t12345\tdirect\t10.0.0.1\thttps")
// sendFromNewSource opens a fresh unconnected UDP socket (new ephemeral
// source port, one per "worker") and WriteTo's the packet. Retries until
// a record arrives or the deadline hits. Unconnected sockets ignore the
// asynchronous ICMP-unreachable that a connected Dial() would latch,
// which keeps the retry loop unaffected by startup races.
sendFromNewSource := func(tag string) {
src, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Fatalf("%s listen: %v", tag, err)
}
defer src.Close()
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
if _, err := src.WriteTo(good, addr); err != nil {
t.Fatalf("%s write: %v", tag, err)
}
select {
case rec := <-ch:
if rec.Website != "www.example.com" {
t.Fatalf("%s: bad record: %+v", tag, rec)
}
return
case <-time.After(50 * time.Millisecond):
}
}
t.Fatalf("%s: no record received within 1s; listener may be filtering by source addr", tag)
}
// Three independent sockets give three different ephemeral source ports,
// each representing a distinct nginx worker (or a post-reload worker).
sendFromNewSource("worker1")
sendFromNewSource("worker2")
sendFromNewSource("worker3")
}