nginx-ipng-stats-plugin's ipng_stats_logtail directive buffers many log lines into a single UDP datagram (default buffer=64k flush=1s). The listener was treating each datagram as exactly one log line, so any datagram with N>1 lines failed the v1 field-count check and dropped silently. In production this showed up as logtail_udp_packets_received_total roughly 4x logtail_udp_loglines_success_total — matching typical burst-coalesced 4-lines-per-batch ratios. Fix: strip trailing CRLF, split the payload on '\n', parse each non-empty line independently. Counter semantics now match the names: packets_received — datagrams off the socket (one per recvfrom) loglines_success — log lines parsed OK (may be many per datagram) loglines_consumed — log lines forwarded to the store (not dropped) After the fix, loglines_success ≈ packets_received × avg_lines_per_batch. Regression test TestUDPListenerBatchedDatagram sends one datagram with three '\n'-separated v1 lines and asserts all three LogRecords arrive, plus loglines_success >= 3 * packets_received. Docs (user-guide.md, design.md) now explain the datagram-vs-line unit distinction so operators don't misread the ratio. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
105 lines
3.0 KiB
Go
105 lines
3.0 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"net"
|
|
"strings"
|
|
)
|
|
|
|
// udpReadBufBytes is the SO_RCVBUF size requested. Bursts of ~10K lines/sec at
|
|
// ~200B each comfortably fit; the kernel may cap below this.
|
|
const udpReadBufBytes = 4 << 20
|
|
|
|
// udpPacketBuf is the per-read buffer. A single nginx log line easily fits in
|
|
// a few KB; 64K is the practical UDP datagram ceiling.
|
|
const udpPacketBuf = 64 << 10
|
|
|
|
// UDPListener receives nginx_ipng_stats_logtail datagrams on a local socket,
|
|
// parses each packet as one log line, and forwards LogRecords to ch.
|
|
type UDPListener struct {
|
|
addr string
|
|
v4bits int
|
|
v6bits int
|
|
ch chan<- LogRecord
|
|
prom *PromStore // optional; bumps UDP ingest counters
|
|
}
|
|
|
|
func NewUDPListener(addr string, v4bits, v6bits int, ch chan<- LogRecord) *UDPListener {
|
|
return &UDPListener{addr: addr, v4bits: v4bits, v6bits: v6bits, ch: ch}
|
|
}
|
|
|
|
// SetProm wires a PromStore so the listener can report received/success/consumed counts.
|
|
func (u *UDPListener) SetProm(p *PromStore) { u.prom = p }
|
|
|
|
// Run listens until ctx is cancelled.
|
|
//
|
|
// The socket is unconnected (ListenUDP + ReadFromUDP), so every datagram is
|
|
// accepted regardless of its source address. This matters across nginx
|
|
// reloads: the old worker processes hold their own ephemeral send sockets,
|
|
// and the fresh worker set opens brand-new ones. The listener reads them
|
|
// all. (Contrast with `nc -k -u -l`, which latches onto the first peer's
|
|
// address and silently drops packets from anyone else — that is an `nc`
|
|
// quirk, not a kernel behaviour, and does not apply here.)
|
|
func (u *UDPListener) Run(ctx context.Context) {
|
|
laddr, err := net.ResolveUDPAddr("udp", u.addr)
|
|
if err != nil {
|
|
log.Fatalf("udp: resolve %s: %v", u.addr, err)
|
|
}
|
|
conn, err := net.ListenUDP("udp", laddr)
|
|
if err != nil {
|
|
log.Fatalf("udp: listen %s: %v", u.addr, err)
|
|
}
|
|
defer conn.Close()
|
|
if err := conn.SetReadBuffer(udpReadBufBytes); err != nil {
|
|
log.Printf("udp: SetReadBuffer(%d): %v", udpReadBufBytes, err)
|
|
}
|
|
log.Printf("udp: listening on %s", conn.LocalAddr())
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
conn.Close()
|
|
}()
|
|
|
|
buf := make([]byte, udpPacketBuf)
|
|
for {
|
|
n, _, err := conn.ReadFromUDP(buf)
|
|
if err != nil {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
log.Printf("udp: read: %v", err)
|
|
continue
|
|
}
|
|
if u.prom != nil {
|
|
u.prom.IncUDPPacket()
|
|
}
|
|
// nginx-ipng-stats-plugin batches log lines into a single UDP
|
|
// datagram (default buffer=64k flush=1s), so one packet may carry
|
|
// many lines. nginx's log_format always ends a rendered line with
|
|
// '\n'; split on that and process each line independently.
|
|
payload := strings.TrimRight(string(buf[:n]), "\r\n")
|
|
for _, line := range strings.Split(payload, "\n") {
|
|
line = strings.TrimSuffix(line, "\r")
|
|
if line == "" {
|
|
continue
|
|
}
|
|
rec, ok := ParseUDPLine(line, u.v4bits, u.v6bits)
|
|
if !ok {
|
|
continue
|
|
}
|
|
if u.prom != nil {
|
|
u.prom.IncUDPSuccess()
|
|
}
|
|
select {
|
|
case u.ch <- rec:
|
|
if u.prom != nil {
|
|
u.prom.IncUDPConsumed()
|
|
}
|
|
default:
|
|
// Channel full — drop rather than block the read loop.
|
|
}
|
|
}
|
|
}
|
|
}
|