From 589030cb00b1bd0e230f981d078223b425d24455 Mon Sep 17 00:00:00 2001 From: Pim van Pelt Date: Fri, 17 Apr 2026 10:39:34 +0200 Subject: [PATCH] doc-fix: clarify UDP listener handles multi-source peers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No runtime change — the listener already uses net.ListenUDP + ReadFromUDP, which is the unconnected-socket pattern that accepts datagrams from any source. nginx reloads (new workers with fresh ephemeral source ports) are handled transparently. - udp.go: expanded comment on Run() explaining the design choice and contrasting with the `nc -k -u -l` latching quirk (which is an nc bug, not a kernel behaviour). - udp_test.go: new TestUDPListenerMultipleSources regresses against the multi-worker scenario by sending from three independent ListenPacket sockets (three different ephemeral source ports). Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/collector/udp.go | 8 ++++++ cmd/collector/udp_test.go | 55 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/cmd/collector/udp.go b/cmd/collector/udp.go index 9ad6404..3cd208f 100644 --- a/cmd/collector/udp.go +++ b/cmd/collector/udp.go @@ -33,6 +33,14 @@ func NewUDPListener(addr string, v4bits, v6bits int, ch chan<- LogRecord) *UDPLi func (u *UDPListener) SetProm(p *PromStore) { u.prom = p } // Run listens until ctx is cancelled. +// +// The socket is unconnected (ListenUDP + ReadFromUDP), so every datagram is +// accepted regardless of its source address. This matters across nginx +// reloads: the old worker processes hold their own ephemeral send sockets, +// and the fresh worker set opens brand-new ones. The listener reads them +// all. (Contrast with `nc -k -u -l`, which latches onto the first peer's +// address and silently drops packets from anyone else — that is an `nc` +// quirk, not a kernel behaviour, and does not apply here.) func (u *UDPListener) Run(ctx context.Context) { laddr, err := net.ResolveUDPAddr("udp", u.addr) if err != nil { diff --git a/cmd/collector/udp_test.go b/cmd/collector/udp_test.go index 94ccc52..5fc0d5a 100644 --- a/cmd/collector/udp_test.go +++ b/cmd/collector/udp_test.go @@ -65,3 +65,58 @@ func TestUDPListenerRoundTrip(t *testing.T) { } t.Fatal("no record received within 1s") } + +// TestUDPListenerMultipleSources exercises the nginx-reload path: a fresh +// nginx worker set opens brand-new send sockets (different ephemeral source +// ports) and the listener MUST keep accepting their packets without +// needing a restart. Regresses against the `nc -k -u -l` latching pitfall. +func TestUDPListenerMultipleSources(t *testing.T) { + ch := make(chan LogRecord, 16) + + pc, err := net.ListenPacket("udp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen probe: %v", err) + } + addr := pc.LocalAddr().(*net.UDPAddr) + pc.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go NewUDPListener(addr.String(), 24, 48, ch).Run(ctx) + + good := []byte("v1\twww.example.com\t1.2.3.4\tGET\t/\t200\t42\t0.010\t0\t12345\tdirect\t10.0.0.1\thttps") + + // sendFromNewSource opens a fresh unconnected UDP socket (new ephemeral + // source port, one per "worker") and WriteTo's the packet. Retries until + // a record arrives or the deadline hits. Unconnected sockets ignore the + // asynchronous ICMP-unreachable that a connected Dial() would latch, + // which keeps the retry loop unaffected by startup races. + sendFromNewSource := func(tag string) { + src, err := net.ListenPacket("udp", "127.0.0.1:0") + if err != nil { + t.Fatalf("%s listen: %v", tag, err) + } + defer src.Close() + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + if _, err := src.WriteTo(good, addr); err != nil { + t.Fatalf("%s write: %v", tag, err) + } + select { + case rec := <-ch: + if rec.Website != "www.example.com" { + t.Fatalf("%s: bad record: %+v", tag, rec) + } + return + case <-time.After(50 * time.Millisecond): + } + } + t.Fatalf("%s: no record received within 1s; listener may be filtering by source addr", tag) + } + + // Three independent sockets give three different ephemeral source ports, + // each representing a distinct nginx worker (or a post-reload worker). + sendFromNewSource("worker1") + sendFromNewSource("worker2") + sendFromNewSource("worker3") +}