Files
nginx-logtail/cmd/collector/udp_test.go
Pim van Pelt 589030cb00 doc-fix: clarify UDP listener handles multi-source peers
No runtime change — the listener already uses net.ListenUDP +
ReadFromUDP, which is the unconnected-socket pattern that accepts
datagrams from any source. nginx reloads (new workers with fresh
ephemeral source ports) are handled transparently.

- udp.go: expanded comment on Run() explaining the design choice and
  contrasting with the `nc -k -u -l` latching quirk (which is an nc
  bug, not a kernel behaviour).
- udp_test.go: new TestUDPListenerMultipleSources regresses against
  the multi-worker scenario by sending from three independent
  ListenPacket sockets (three different ephemeral source ports).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-17 10:39:34 +02:00

123 lines
3.7 KiB
Go

package main
import (
"context"
"net"
"testing"
"time"
)
func TestUDPListenerRoundTrip(t *testing.T) {
ch := make(chan LogRecord, 4)
ps := NewPromStore()
// Bind to an ephemeral port on loopback.
pc, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen probe: %v", err)
}
addr := pc.LocalAddr().String()
pc.Close() // release; listener will re-bind
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
u := NewUDPListener(addr, 24, 48, ch)
u.SetProm(ps)
go u.Run(ctx)
// Dial the listener and send one valid and one malformed packet.
conn, err := net.Dial("udp", addr)
if err != nil {
t.Fatalf("dial: %v", err)
}
defer conn.Close()
// The listener is started asynchronously; retry for up to 1s.
good := "v1\twww.example.com\t1.2.3.4\tGET\t/\t200\t42\t0.010\t0\t12345\tdirect\t10.0.0.1\thttps"
bad := "not enough\tfields"
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
conn.Write([]byte(good))
conn.Write([]byte(bad))
select {
case rec := <-ch:
if rec.Website != "www.example.com" || rec.SourceTag != "direct" {
t.Fatalf("bad record: %+v", rec)
}
// Give the listener a moment to process the malformed packet too.
time.Sleep(50 * time.Millisecond)
ps.udpMu.Lock()
pkt, suc, con := ps.udpPacketsReceived, ps.udpLoglinesSuccess, ps.udpLoglinesConsumed
ps.udpMu.Unlock()
if pkt < 2 {
t.Errorf("udpPacketsReceived=%d, want >=2", pkt)
}
if suc < 1 {
t.Errorf("udpLoglinesSuccess=%d, want >=1", suc)
}
if con < 1 {
t.Errorf("udpLoglinesConsumed=%d, want >=1", con)
}
return
case <-time.After(50 * time.Millisecond):
}
}
t.Fatal("no record received within 1s")
}
// TestUDPListenerMultipleSources exercises the nginx-reload path: a fresh
// nginx worker set opens brand-new send sockets (different ephemeral source
// ports) and the listener MUST keep accepting their packets without
// needing a restart. Regresses against the `nc -k -u -l` latching pitfall.
func TestUDPListenerMultipleSources(t *testing.T) {
ch := make(chan LogRecord, 16)
pc, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen probe: %v", err)
}
addr := pc.LocalAddr().(*net.UDPAddr)
pc.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go NewUDPListener(addr.String(), 24, 48, ch).Run(ctx)
good := []byte("v1\twww.example.com\t1.2.3.4\tGET\t/\t200\t42\t0.010\t0\t12345\tdirect\t10.0.0.1\thttps")
// sendFromNewSource opens a fresh unconnected UDP socket (new ephemeral
// source port, one per "worker") and WriteTo's the packet. Retries until
// a record arrives or the deadline hits. Unconnected sockets ignore the
// asynchronous ICMP-unreachable that a connected Dial() would latch,
// which keeps the retry loop unaffected by startup races.
sendFromNewSource := func(tag string) {
src, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Fatalf("%s listen: %v", tag, err)
}
defer src.Close()
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
if _, err := src.WriteTo(good, addr); err != nil {
t.Fatalf("%s write: %v", tag, err)
}
select {
case rec := <-ch:
if rec.Website != "www.example.com" {
t.Fatalf("%s: bad record: %+v", tag, rec)
}
return
case <-time.After(50 * time.Millisecond):
}
}
t.Fatalf("%s: no record received within 1s; listener may be filtering by source addr", tag)
}
// Three independent sockets give three different ephemeral source ports,
// each representing a distinct nginx worker (or a post-reload worker).
sendFromNewSource("worker1")
sendFromNewSource("worker2")
sendFromNewSource("worker3")
}