Refactor docs; Add 'ipng_source_tag', add udp listener for nginx-ipng-stats plugin

This commit is contained in:
2026-04-17 09:50:54 +02:00
parent 0ecca06069
commit 577ed3dad5
26 changed files with 1319 additions and 1718 deletions

View File

@@ -24,6 +24,7 @@ type sharedFlags struct {
uriReNeg string // RE2 regex exclusion against request URI
isTor string // "", "1" / "!=0" (TOR only), "0" / "!=1" (non-TOR only)
asn string // expression: "12345", "!=65000", ">=1000", etc.
sourceTag string // exact ipng_source_tag match
}
// bindShared registers the shared flags on fs and returns a pointer to the
@@ -42,6 +43,7 @@ func bindShared(fs *flag.FlagSet) (*sharedFlags, *string) {
fs.StringVar(&sf.uriReNeg, "uri-re-neg", "", "filter: RE2 regex exclusion against request URI")
fs.StringVar(&sf.isTor, "is-tor", "", "filter: TOR traffic (1 or !=0 = TOR only; 0 or !=1 = non-TOR only)")
fs.StringVar(&sf.asn, "asn", "", "filter: ASN expression (12345, !=65000, >=1000, <64512, …)")
fs.StringVar(&sf.sourceTag, "source-tag", "", "filter: exact ipng_source_tag match (e.g. direct, cdn, …)")
return sf, target
}
@@ -64,7 +66,7 @@ func parseTargets(s string) []string {
}
func buildFilter(sf *sharedFlags) *pb.Filter {
if sf.website == "" && sf.prefix == "" && sf.uri == "" && sf.status == "" && sf.websiteRe == "" && sf.uriRe == "" && sf.websiteReNeg == "" && sf.uriReNeg == "" && sf.isTor == "" && sf.asn == "" {
if sf.website == "" && sf.prefix == "" && sf.uri == "" && sf.status == "" && sf.websiteRe == "" && sf.uriRe == "" && sf.websiteReNeg == "" && sf.uriReNeg == "" && sf.isTor == "" && sf.asn == "" && sf.sourceTag == "" {
return nil
}
f := &pb.Filter{}
@@ -118,6 +120,9 @@ func buildFilter(sf *sharedFlags) *pb.Filter {
f.AsnNumber = &n
f.AsnOp = op
}
if sf.sourceTag != "" {
f.IpngSourceTag = &sf.sourceTag
}
return f
}
@@ -152,8 +157,12 @@ func parseGroupBy(s string) pb.GroupBy {
return pb.GroupBy_REQUEST_URI
case "status":
return pb.GroupBy_HTTP_RESPONSE
case "asn":
return pb.GroupBy_ASN_NUMBER
case "source_tag", "source-tag":
return pb.GroupBy_SOURCE_TAG
default:
fmt.Fprintf(os.Stderr, "--group-by: unknown value %q; valid: website prefix uri status\n", s)
fmt.Fprintf(os.Stderr, "--group-by: unknown value %q; valid: website prefix uri status asn source_tag\n", s)
os.Exit(1)
panic("unreachable")
}

View File

@@ -22,11 +22,14 @@ Subcommand flags (all subcommands):
--status EXPR filter: HTTP status expression (200, !=200, >=400, <500, …)
--website-re REGEX filter: RE2 regex against website
--uri-re REGEX filter: RE2 regex against request URI
--is-tor EXPR filter: TOR (1/!=0 = only, 0/!=1 = none)
--asn EXPR filter: ASN expression (12345, !=65000, …)
--source-tag STRING filter: exact ipng_source_tag match
topn flags:
--n INT number of entries (default 10)
--window STR 1m 5m 15m 60m 6h 24h (default 5m)
--group-by STR website prefix uri status (default website)
--group-by STR website prefix uri status asn source_tag (default website)
trend flags:
--window STR 1m 5m 15m 60m 6h 24h (default 5m)

View File

@@ -28,13 +28,17 @@ func main() {
v4prefix := flag.Int("v4prefix", envOrInt("COLLECTOR_V4PREFIX", 24), "IPv4 prefix length for client bucketing (env: COLLECTOR_V4PREFIX)")
v6prefix := flag.Int("v6prefix", envOrInt("COLLECTOR_V6PREFIX", 48), "IPv6 prefix length for client bucketing (env: COLLECTOR_V6PREFIX)")
scanInterval := flag.Duration("scan-interval", envOrDuration("COLLECTOR_SCAN_INTERVAL", 10*time.Second), "how often to rescan glob patterns for new/removed files (env: COLLECTOR_SCAN_INTERVAL)")
logtailPort := flag.Int("logtail-port", envOrInt("COLLECTOR_LOGTAIL_PORT", 0), "UDP port to receive nginx ipng_stats_logtail packets, 0 to disable (env: COLLECTOR_LOGTAIL_PORT)")
logtailBind := flag.String("logtail-bind", envOr("COLLECTOR_LOGTAIL_BIND", "127.0.0.1"), "UDP bind address for the logtail listener (env: COLLECTOR_LOGTAIL_BIND)")
flag.Parse()
patterns := collectPatterns(*logPaths, *logsFile)
if len(patterns) == 0 {
log.Fatal("collector: no log paths specified; use --logs or --logs-file")
if len(patterns) == 0 && *logtailPort == 0 {
log.Fatal("collector: no inputs configured; use --logs, --logs-file, or --logtail-port")
}
if len(patterns) > 0 {
log.Printf("collector: watching %d pattern(s), rescan every %s", len(patterns), *scanInterval)
}
log.Printf("collector: watching %d pattern(s), rescan every %s", len(patterns), *scanInterval)
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
@@ -57,8 +61,16 @@ func main() {
}
go store.Run(ch)
tailer := NewMultiTailer(patterns, *scanInterval, *v4prefix, *v6prefix, ch)
go tailer.Run(ctx)
if len(patterns) > 0 {
tailer := NewMultiTailer(patterns, *scanInterval, *v4prefix, *v6prefix, ch)
go tailer.Run(ctx)
}
if *logtailPort > 0 {
udp := NewUDPListener(net.JoinHostPort(*logtailBind, strconv.Itoa(*logtailPort)), *v4prefix, *v6prefix, ch)
udp.SetProm(store.prom)
go udp.Run(ctx)
}
lis, err := net.Listen("tcp", *listen)
if err != nil {

View File

@@ -18,65 +18,104 @@ type LogRecord struct {
Method string
BodyBytesSent int64
RequestTime float64
SourceTag string
}
// ParseLine parses a tab-separated logtail log line:
// fileSourceTag is the SourceTag assigned to records read from on-disk log
// files, which pre-date the tag concept. Mirrors nginx's fallback label.
const fileSourceTag = "direct"
// ParseLine parses a tab-separated logtail log line from a file:
//
// $host \t $remote_addr \t $msec \t $request_method \t $request_uri \t $status \t $body_bytes_sent \t $request_time \t $is_tor \t $asn
//
// The is_tor (field 9) and asn (field 10) fields are optional for backward
// compatibility with older log files that omit them; they default to false/0
// when absent.
// when absent. SourceTag is always set to "direct" (file origin has no tag).
// Returns false for lines with fewer than 8 fields.
func ParseLine(line string, v4bits, v6bits int) (LogRecord, bool) {
// SplitN caps allocations; we need up to 10 fields.
fields := strings.SplitN(line, "\t", 10)
if len(fields) < 8 {
return LogRecord{}, false
}
uri := fields[4]
if i := strings.IndexByte(uri, '?'); i >= 0 {
uri = uri[:i]
}
prefix, ok := truncateIP(fields[1], v4bits, v6bits)
if !ok {
return LogRecord{}, false
}
isTor := len(fields) >= 9 && fields[8] == "1"
var asn int32
if len(fields) == 10 {
if n, err := strconv.ParseInt(fields[9], 10, 32); err == nil {
asn = int32(n)
}
}
var bodyBytes int64
if n, err := strconv.ParseInt(fields[6], 10, 64); err == nil {
bodyBytes = n
}
var reqTime float64
if f, err := strconv.ParseFloat(fields[7], 64); err == nil {
reqTime = f
}
return LogRecord{
Website: fields[0],
ClientPrefix: prefix,
URI: uri,
URI: stripQuery(fields[4]),
Status: fields[5],
IsTor: isTor,
ASN: asn,
Method: fields[3],
BodyBytesSent: bodyBytes,
RequestTime: reqTime,
BodyBytesSent: parseInt(fields[6]),
RequestTime: parseFloat(fields[7]),
SourceTag: fileSourceTag,
}, true
}
// ParseUDPLine parses a tab-separated logtail log line from the UDP listener:
//
// $host \t $remote_addr \t $request_method \t $request_uri \t $status \t
// $body_bytes_sent \t $request_time \t $is_tor \t $asn \t
// $ipng_source_tag \t $server_addr \t $scheme
//
// All 12 fields are required. server_addr and scheme are consumed but not
// propagated. Returns false for any malformed packet (wrong field count,
// bad IP).
func ParseUDPLine(line string, v4bits, v6bits int) (LogRecord, bool) {
fields := strings.Split(line, "\t")
if len(fields) != 12 {
return LogRecord{}, false
}
prefix, ok := truncateIP(fields[1], v4bits, v6bits)
if !ok {
return LogRecord{}, false
}
var asn int32
if n, err := strconv.ParseInt(fields[8], 10, 32); err == nil {
asn = int32(n)
}
return LogRecord{
Website: fields[0],
ClientPrefix: prefix,
URI: stripQuery(fields[3]),
Status: fields[4],
IsTor: fields[7] == "1",
ASN: asn,
Method: fields[2],
BodyBytesSent: parseInt(fields[5]),
RequestTime: parseFloat(fields[6]),
SourceTag: fields[9],
}, true
}
func stripQuery(uri string) string {
if i := strings.IndexByte(uri, '?'); i >= 0 {
return uri[:i]
}
return uri
}
func parseInt(s string) int64 {
n, _ := strconv.ParseInt(s, 10, 64)
return n
}
func parseFloat(s string) float64 {
f, _ := strconv.ParseFloat(s, 64)
return f
}
// truncateIP masks addr to the given prefix length depending on IP version.
// Returns the CIDR string (e.g. "1.2.3.0/24") and true on success.
func truncateIP(addr string, v4bits, v6bits int) (string, bool) {

View File

@@ -25,6 +25,7 @@ func TestParseLine(t *testing.T) {
Method: "GET",
BodyBytesSent: 1452,
RequestTime: 0.043,
SourceTag: "direct",
},
},
{
@@ -38,6 +39,7 @@ func TestParseLine(t *testing.T) {
Status: "201",
Method: "POST",
RequestTime: 0.001,
SourceTag: "direct",
},
},
{
@@ -46,11 +48,12 @@ func TestParseLine(t *testing.T) {
wantOK: true,
want: LogRecord{
Website: "host",
ClientPrefix: "2001:db8:cafe::/48", // /48 = 3 full 16-bit groups intact
ClientPrefix: "2001:db8:cafe::/48",
URI: "/",
Status: "200",
Method: "GET",
RequestTime: 0.001,
SourceTag: "direct",
},
},
{
@@ -79,6 +82,7 @@ func TestParseLine(t *testing.T) {
Status: "429",
Method: "GET",
RequestTime: 0.001,
SourceTag: "direct",
},
},
{
@@ -93,6 +97,7 @@ func TestParseLine(t *testing.T) {
IsTor: true,
Method: "GET",
RequestTime: 0.001,
SourceTag: "direct",
},
},
{
@@ -107,6 +112,7 @@ func TestParseLine(t *testing.T) {
IsTor: false,
Method: "GET",
RequestTime: 0.001,
SourceTag: "direct",
},
},
{
@@ -121,6 +127,7 @@ func TestParseLine(t *testing.T) {
IsTor: false,
Method: "GET",
RequestTime: 0.001,
SourceTag: "direct",
},
},
{
@@ -136,6 +143,7 @@ func TestParseLine(t *testing.T) {
ASN: 12345,
Method: "GET",
RequestTime: 0.001,
SourceTag: "direct",
},
},
{
@@ -151,6 +159,7 @@ func TestParseLine(t *testing.T) {
ASN: 65535,
Method: "GET",
RequestTime: 0.001,
SourceTag: "direct",
},
},
{
@@ -166,6 +175,7 @@ func TestParseLine(t *testing.T) {
ASN: 0,
Method: "GET",
RequestTime: 0.001,
SourceTag: "direct",
},
},
{
@@ -181,6 +191,7 @@ func TestParseLine(t *testing.T) {
ASN: 0,
Method: "GET",
RequestTime: 0.001,
SourceTag: "direct",
},
},
}
@@ -201,6 +212,84 @@ func TestParseLine(t *testing.T) {
}
}
func TestParseUDPLine(t *testing.T) {
// host \t remote_addr \t method \t uri \t status \t body_bytes \t req_time \t
// is_tor \t asn \t source_tag \t server_addr \t scheme
good := "www.example.com\t1.2.3.4\tGET\t/api/v1/search?q=foo\t200\t1452\t0.043\t0\t12345\tcdn\t10.0.0.1\thttps"
tests := []struct {
name string
line string
wantOK bool
want LogRecord
}{
{
name: "all 12 fields parsed, query stripped, extras dropped",
line: good,
wantOK: true,
want: LogRecord{
Website: "www.example.com",
ClientPrefix: "1.2.3.0/24",
URI: "/api/v1/search",
Status: "200",
IsTor: false,
ASN: 12345,
Method: "GET",
BodyBytesSent: 1452,
RequestTime: 0.043,
SourceTag: "cdn",
},
},
{
name: "is_tor=1, tag direct, IPv6",
line: "h\t2001:db8::1\tGET\t/\t200\t0\t0\t1\t65535\tdirect\t::1\thttp",
wantOK: true,
want: LogRecord{
Website: "h",
ClientPrefix: "2001:db8::/48",
URI: "/",
Status: "200",
IsTor: true,
ASN: 65535,
Method: "GET",
BodyBytesSent: 0,
RequestTime: 0,
SourceTag: "direct",
},
},
{
name: "11 fields rejected",
line: "h\t1.2.3.4\tGET\t/\t200\t0\t0\t0\t0\ttag\t10.0.0.1",
wantOK: false,
},
{
name: "13 fields rejected",
line: good + "\textra",
wantOK: false,
},
{
name: "bad IP rejected",
line: "h\tnope\tGET\t/\t200\t0\t0\t0\t0\ttag\t10.0.0.1\thttp",
wantOK: false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got, ok := ParseUDPLine(tc.line, 24, 48)
if ok != tc.wantOK {
t.Fatalf("ParseUDPLine ok=%v, want %v; got=%+v", ok, tc.wantOK, got)
}
if !tc.wantOK {
return
}
if got != tc.want {
t.Errorf("got %+v, want %+v", got, tc.want)
}
})
}
}
func TestTruncateIP(t *testing.T) {
tests := []struct {
addr string
@@ -208,8 +297,8 @@ func TestTruncateIP(t *testing.T) {
}{
{"1.2.3.4", "1.2.3.0/24"},
{"192.168.100.200", "192.168.100.0/24"},
{"2001:db8:cafe:babe::1", "2001:db8:cafe::/48"}, // /48 = 3 full groups intact
{"::1", "::/48"}, // loopback — first 48 bits are all zero
{"2001:db8:cafe:babe::1", "2001:db8:cafe::/48"},
{"::1", "::/48"},
}
for _, tc := range tests {

View File

@@ -19,7 +19,7 @@ const promNumTimeBounds = 11
var promTimeBounds = [promNumTimeBounds]float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
const promCounterCap = 100_000 // safety cap on {host,method,status} counter entries
const promCounterCap = 250_000 // safety cap on {host,method,status} counter entries
// promCounterKey is the label set for per-request counters.
type promCounterKey struct {
@@ -49,14 +49,26 @@ type PromStore struct {
counters map[promCounterKey]int64
body map[string]*promBodyEntry // keyed by host
reqTime map[string]*promTimeEntry // keyed by host
// per-source_tag rollups (parallel series, not a cross-product with host)
sourceCounters map[string]int64 // keyed by source_tag
sourceBody map[string]*promBodyEntry // keyed by source_tag
// UDP ingest counters — protected by their own atomic-friendly lock.
udpMu sync.Mutex
udpPacketsReceived int64 // datagrams read off the socket
udpLoglinesSuccess int64 // successfully parsed
udpLoglinesConsumed int64 // successfully forwarded to the store channel
}
// NewPromStore returns an empty PromStore ready for use.
func NewPromStore() *PromStore {
return &PromStore{
counters: make(map[promCounterKey]int64, 1024),
body: make(map[string]*promBodyEntry, 64),
reqTime: make(map[string]*promTimeEntry, 64),
counters: make(map[promCounterKey]int64, 1024),
body: make(map[string]*promBodyEntry, 64),
reqTime: make(map[string]*promTimeEntry, 64),
sourceCounters: make(map[string]int64, 32),
sourceBody: make(map[string]*promBodyEntry, 32),
}
}
@@ -74,18 +86,7 @@ func (p *PromStore) Ingest(r LogRecord) {
}
// --- body_bytes_sent histogram (keyed by host only) ---
be, ok := p.body[r.Website]
if !ok {
be = &promBodyEntry{}
p.body[r.Website] = be
}
for i, bound := range promBodyBounds {
if r.BodyBytesSent <= bound {
be.buckets[i]++
}
}
be.buckets[promNumBodyBounds]++ // +Inf
be.sum += r.BodyBytesSent
observeBody(p.body, r.Website, r.BodyBytesSent)
// --- request_time histogram (keyed by host only) ---
te, ok := p.reqTime[r.Website]
@@ -101,9 +102,34 @@ func (p *PromStore) Ingest(r LogRecord) {
te.buckets[promNumTimeBounds]++ // +Inf
te.sum += r.RequestTime
// --- per-source_tag rollups ---
p.sourceCounters[r.SourceTag]++
observeBody(p.sourceBody, r.SourceTag, r.BodyBytesSent)
p.mu.Unlock()
}
// IncUDPPacket, IncUDPSuccess, and IncUDPConsumed bump their respective
// UDP ingest counters. They are called from the UDP listener goroutine.
func (p *PromStore) IncUDPPacket() { p.udpMu.Lock(); p.udpPacketsReceived++; p.udpMu.Unlock() }
func (p *PromStore) IncUDPSuccess() { p.udpMu.Lock(); p.udpLoglinesSuccess++; p.udpMu.Unlock() }
func (p *PromStore) IncUDPConsumed() { p.udpMu.Lock(); p.udpLoglinesConsumed++; p.udpMu.Unlock() }
func observeBody(m map[string]*promBodyEntry, key string, bytes int64) {
e, ok := m[key]
if !ok {
e = &promBodyEntry{}
m[key] = e
}
for i, bound := range promBodyBounds {
if bytes <= bound {
e.buckets[i]++
}
}
e.buckets[promNumBodyBounds]++ // +Inf
e.sum += bytes
}
// ServeHTTP renders all metrics in the Prometheus text exposition format (0.0.4).
func (p *PromStore) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
// Snapshot everything under the lock, then render without holding it.
@@ -119,8 +145,8 @@ func (p *PromStore) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
}
type bodySnap struct {
host string
e promBodyEntry
label string
e promBodyEntry
}
bodySnaps := make([]bodySnap, 0, len(p.body))
for h, e := range p.body {
@@ -136,8 +162,27 @@ func (p *PromStore) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
timeSnaps = append(timeSnaps, timeSnap{h, *e})
}
type sourceCounterSnap struct {
tag string
v int64
}
sourceCounters := make([]sourceCounterSnap, 0, len(p.sourceCounters))
for t, v := range p.sourceCounters {
sourceCounters = append(sourceCounters, sourceCounterSnap{t, v})
}
sourceBodySnaps := make([]bodySnap, 0, len(p.sourceBody))
for t, e := range p.sourceBody {
sourceBodySnaps = append(sourceBodySnaps, bodySnap{t, *e})
}
p.mu.Unlock()
p.udpMu.Lock()
udpPackets := p.udpPacketsReceived
udpSuccess := p.udpLoglinesSuccess
udpConsumed := p.udpLoglinesConsumed
p.udpMu.Unlock()
// Sort for stable, human-readable output.
sort.Slice(counters, func(i, j int) bool {
a, b := counters[i].k, counters[j].k
@@ -149,8 +194,10 @@ func (p *PromStore) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
}
return a.Status < b.Status
})
sort.Slice(bodySnaps, func(i, j int) bool { return bodySnaps[i].host < bodySnaps[j].host })
sort.Slice(bodySnaps, func(i, j int) bool { return bodySnaps[i].label < bodySnaps[j].label })
sort.Slice(timeSnaps, func(i, j int) bool { return timeSnaps[i].host < timeSnaps[j].host })
sort.Slice(sourceCounters, func(i, j int) bool { return sourceCounters[i].tag < sourceCounters[j].tag })
sort.Slice(sourceBodySnaps, func(i, j int) bool { return sourceBodySnaps[i].label < sourceBodySnaps[j].label })
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
bw := bufio.NewWriterSize(w, 256*1024)
@@ -167,16 +214,7 @@ func (p *PromStore) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
fmt.Fprintln(bw, "# HELP nginx_http_response_body_bytes HTTP response body size distribution in bytes.")
fmt.Fprintln(bw, "# TYPE nginx_http_response_body_bytes histogram")
for _, s := range bodySnaps {
for i, bound := range promBodyBounds {
fmt.Fprintf(bw, "nginx_http_response_body_bytes_bucket{host=%q,le=%q} %d\n",
s.host, fmt.Sprintf("%d", bound), s.e.buckets[i])
}
fmt.Fprintf(bw, "nginx_http_response_body_bytes_bucket{host=%q,le=\"+Inf\"} %d\n",
s.host, s.e.buckets[promNumBodyBounds])
fmt.Fprintf(bw, "nginx_http_response_body_bytes_count{host=%q} %d\n",
s.host, s.e.buckets[promNumBodyBounds])
fmt.Fprintf(bw, "nginx_http_response_body_bytes_sum{host=%q} %d\n",
s.host, s.e.sum)
writeBodyHistogram(bw, "nginx_http_response_body_bytes", "host", s.label, s.e)
}
// nginx_http_request_duration_seconds (histogram, labeled by host)
@@ -195,9 +233,48 @@ func (p *PromStore) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
s.host, s.e.sum)
}
// nginx_http_requests_by_source_total (counter, labeled by source_tag)
fmt.Fprintln(bw, "# HELP nginx_http_requests_by_source_total HTTP requests rolled up by nginx source tag.")
fmt.Fprintln(bw, "# TYPE nginx_http_requests_by_source_total counter")
for _, c := range sourceCounters {
fmt.Fprintf(bw, "nginx_http_requests_by_source_total{source_tag=%q} %d\n", c.tag, c.v)
}
// nginx_http_response_body_bytes_by_source (histogram, labeled by source_tag)
fmt.Fprintln(bw, "# HELP nginx_http_response_body_bytes_by_source HTTP response body size distribution by nginx source tag.")
fmt.Fprintln(bw, "# TYPE nginx_http_response_body_bytes_by_source histogram")
for _, s := range sourceBodySnaps {
writeBodyHistogram(bw, "nginx_http_response_body_bytes_by_source", "source_tag", s.label, s.e)
}
// UDP ingest counters — lets operators distinguish parse failures
// (received - success) from channel-full drops (success - consumed).
fmt.Fprintln(bw, "# HELP logtail_udp_packets_received_total Datagrams read from the UDP socket.")
fmt.Fprintln(bw, "# TYPE logtail_udp_packets_received_total counter")
fmt.Fprintf(bw, "logtail_udp_packets_received_total %d\n", udpPackets)
fmt.Fprintln(bw, "# HELP logtail_udp_loglines_success_total UDP loglines that parsed successfully.")
fmt.Fprintln(bw, "# TYPE logtail_udp_loglines_success_total counter")
fmt.Fprintf(bw, "logtail_udp_loglines_success_total %d\n", udpSuccess)
fmt.Fprintln(bw, "# HELP logtail_udp_loglines_consumed_total UDP loglines forwarded to the store (not dropped).")
fmt.Fprintln(bw, "# TYPE logtail_udp_loglines_consumed_total counter")
fmt.Fprintf(bw, "logtail_udp_loglines_consumed_total %d\n", udpConsumed)
bw.Flush()
}
func writeBodyHistogram(bw *bufio.Writer, metric, labelName, labelValue string, e promBodyEntry) {
for i, bound := range promBodyBounds {
fmt.Fprintf(bw, "%s_bucket{%s=%q,le=%q} %d\n",
metric, labelName, labelValue, fmt.Sprintf("%d", bound), e.buckets[i])
}
fmt.Fprintf(bw, "%s_bucket{%s=%q,le=\"+Inf\"} %d\n",
metric, labelName, labelValue, e.buckets[promNumBodyBounds])
fmt.Fprintf(bw, "%s_count{%s=%q} %d\n",
metric, labelName, labelValue, e.buckets[promNumBodyBounds])
fmt.Fprintf(bw, "%s_sum{%s=%q} %d\n",
metric, labelName, labelValue, e.sum)
}
// formatFloat renders a float64 bucket bound without trailing zeros but always
// with at least one decimal place, matching Prometheus convention (e.g. "0.5", "10").
func formatFloat(f float64) string {

View File

@@ -110,6 +110,61 @@ func TestPromStoreServeHTTP(t *testing.T) {
}
}
func TestPromStoreSourceTagRollup(t *testing.T) {
ps := NewPromStore()
// same host, two tags; each tag should appear with its own series.
ps.Ingest(LogRecord{Website: "h", Method: "GET", Status: "200", BodyBytesSent: 100, SourceTag: "direct"})
ps.Ingest(LogRecord{Website: "h", Method: "GET", Status: "200", BodyBytesSent: 300, SourceTag: "cdn"})
ps.Ingest(LogRecord{Website: "h", Method: "GET", Status: "200", BodyBytesSent: 100, SourceTag: "cdn"})
req := httptest.NewRequest("GET", "/metrics", nil)
rec := httptest.NewRecorder()
ps.ServeHTTP(rec, req)
body := rec.Body.String()
checks := []string{
"# TYPE nginx_http_requests_by_source_total counter",
`nginx_http_requests_by_source_total{source_tag="direct"} 1`,
`nginx_http_requests_by_source_total{source_tag="cdn"} 2`,
"# TYPE nginx_http_response_body_bytes_by_source histogram",
`nginx_http_response_body_bytes_by_source_sum{source_tag="direct"} 100`,
`nginx_http_response_body_bytes_by_source_sum{source_tag="cdn"} 400`,
// host-series totals are unchanged (one row, counting 3 requests).
`nginx_http_requests_total{host="h",method="GET",status="200"} 3`,
}
for _, want := range checks {
if !strings.Contains(body, want) {
t.Errorf("missing %q in output:\n%s", want, body)
}
}
}
func TestPromStoreUDPCounters(t *testing.T) {
ps := NewPromStore()
ps.IncUDPPacket()
ps.IncUDPPacket()
ps.IncUDPPacket()
ps.IncUDPSuccess()
ps.IncUDPSuccess()
ps.IncUDPConsumed()
req := httptest.NewRequest("GET", "/metrics", nil)
rec := httptest.NewRecorder()
ps.ServeHTTP(rec, req)
body := rec.Body.String()
checks := []string{
"logtail_udp_packets_received_total 3",
"logtail_udp_loglines_success_total 2",
"logtail_udp_loglines_consumed_total 1",
}
for _, want := range checks {
if !strings.Contains(body, want) {
t.Errorf("missing %q in output:\n%s", want, body)
}
}
}
func TestPromStoreCounterCap(t *testing.T) {
ps := NewPromStore()
// Fill to cap with distinct {host,method,status} combos

View File

@@ -48,7 +48,7 @@ func (s *Store) ingest(r LogRecord) {
if s.prom != nil {
s.prom.Ingest(r)
}
key := st.Tuple6{Website: r.Website, Prefix: r.ClientPrefix, URI: r.URI, Status: r.Status, IsTor: r.IsTor, ASN: r.ASN}
key := st.Tuple6{Website: r.Website, Prefix: r.ClientPrefix, URI: r.URI, Status: r.Status, IsTor: r.IsTor, ASN: r.ASN, SourceTag: r.SourceTag}
if _, exists := s.live[key]; !exists {
if s.liveLen >= liveMapCap {
return

86
cmd/collector/udp.go Normal file
View File

@@ -0,0 +1,86 @@
package main
import (
"context"
"log"
"net"
"strings"
)
// udpReadBufBytes is the SO_RCVBUF size requested. Bursts of ~10K lines/sec at
// ~200B each comfortably fit; the kernel may cap below this.
const udpReadBufBytes = 4 << 20
// udpPacketBuf is the per-read buffer. A single nginx log line easily fits in
// a few KB; 64K is the practical UDP datagram ceiling.
const udpPacketBuf = 64 << 10
// UDPListener receives nginx_ipng_stats_logtail datagrams on a local socket,
// parses each packet as one log line, and forwards LogRecords to ch.
type UDPListener struct {
addr string
v4bits int
v6bits int
ch chan<- LogRecord
prom *PromStore // optional; bumps UDP ingest counters
}
func NewUDPListener(addr string, v4bits, v6bits int, ch chan<- LogRecord) *UDPListener {
return &UDPListener{addr: addr, v4bits: v4bits, v6bits: v6bits, ch: ch}
}
// SetProm wires a PromStore so the listener can report received/success/consumed counts.
func (u *UDPListener) SetProm(p *PromStore) { u.prom = p }
// Run listens until ctx is cancelled.
func (u *UDPListener) Run(ctx context.Context) {
laddr, err := net.ResolveUDPAddr("udp", u.addr)
if err != nil {
log.Fatalf("udp: resolve %s: %v", u.addr, err)
}
conn, err := net.ListenUDP("udp", laddr)
if err != nil {
log.Fatalf("udp: listen %s: %v", u.addr, err)
}
defer conn.Close()
if err := conn.SetReadBuffer(udpReadBufBytes); err != nil {
log.Printf("udp: SetReadBuffer(%d): %v", udpReadBufBytes, err)
}
log.Printf("udp: listening on %s", conn.LocalAddr())
go func() {
<-ctx.Done()
conn.Close()
}()
buf := make([]byte, udpPacketBuf)
for {
n, _, err := conn.ReadFromUDP(buf)
if err != nil {
if ctx.Err() != nil {
return
}
log.Printf("udp: read: %v", err)
continue
}
if u.prom != nil {
u.prom.IncUDPPacket()
}
line := strings.TrimRight(string(buf[:n]), "\r\n")
rec, ok := ParseUDPLine(line, u.v4bits, u.v6bits)
if !ok {
continue
}
if u.prom != nil {
u.prom.IncUDPSuccess()
}
select {
case u.ch <- rec:
if u.prom != nil {
u.prom.IncUDPConsumed()
}
default:
// Channel full — drop rather than block the read loop.
}
}
}

67
cmd/collector/udp_test.go Normal file
View File

@@ -0,0 +1,67 @@
package main
import (
"context"
"net"
"testing"
"time"
)
func TestUDPListenerRoundTrip(t *testing.T) {
ch := make(chan LogRecord, 4)
ps := NewPromStore()
// Bind to an ephemeral port on loopback.
pc, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen probe: %v", err)
}
addr := pc.LocalAddr().String()
pc.Close() // release; listener will re-bind
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
u := NewUDPListener(addr, 24, 48, ch)
u.SetProm(ps)
go u.Run(ctx)
// Dial the listener and send one valid and one malformed packet.
conn, err := net.Dial("udp", addr)
if err != nil {
t.Fatalf("dial: %v", err)
}
defer conn.Close()
// The listener is started asynchronously; retry for up to 1s.
good := "www.example.com\t1.2.3.4\tGET\t/\t200\t42\t0.010\t0\t12345\tdirect\t10.0.0.1\thttps"
bad := "not enough\tfields"
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
conn.Write([]byte(good))
conn.Write([]byte(bad))
select {
case rec := <-ch:
if rec.Website != "www.example.com" || rec.SourceTag != "direct" {
t.Fatalf("bad record: %+v", rec)
}
// Give the listener a moment to process the malformed packet too.
time.Sleep(50 * time.Millisecond)
ps.udpMu.Lock()
pkt, suc, con := ps.udpPacketsReceived, ps.udpLoglinesSuccess, ps.udpLoglinesConsumed
ps.udpMu.Unlock()
if pkt < 2 {
t.Errorf("udpPacketsReceived=%d, want >=2", pkt)
}
if suc < 1 {
t.Errorf("udpLoglinesSuccess=%d, want >=1", suc)
}
if con < 1 {
t.Errorf("udpLoglinesConsumed=%d, want >=1", con)
}
return
case <-time.After(50 * time.Millisecond):
}
}
t.Fatal("no record received within 1s")
}

View File

@@ -146,8 +146,13 @@ func applyTerm(term string, fs *filterState) error {
return fmt.Errorf("invalid asn expression %q", expr)
}
fs.ASN = expr
case "source_tag":
if op != "=" {
return fmt.Errorf("source_tag only supports =, not %q", op)
}
fs.SourceTag = value
default:
return fmt.Errorf("unknown field %q; valid: status, website, uri, prefix, is_tor, asn", field)
return fmt.Errorf("unknown field %q; valid: status, website, uri, prefix, is_tor, asn, source_tag", field)
}
return nil
}
@@ -196,6 +201,9 @@ func FilterExprString(f filterState) string {
if f.ASN != "" {
parts = append(parts, asnTermStr(f.ASN))
}
if f.SourceTag != "" {
parts = append(parts, "source_tag="+quoteMaybe(f.SourceTag))
}
return strings.Join(parts, " AND ")
}

View File

@@ -229,8 +229,17 @@ func TestDrillURL(t *testing.T) {
if !strings.Contains(u, "f_asn=12345") {
t.Errorf("drill from asn: missing f_asn in %q", u)
}
if !strings.Contains(u, "by=source_tag") {
t.Errorf("drill from asn: expected next by=source_tag in %q", u)
}
p.GroupByS = "source_tag"
u = p.drillURL("direct")
if !strings.Contains(u, "f_source_tag=direct") {
t.Errorf("drill from source_tag: missing f_source_tag in %q", u)
}
if !strings.Contains(u, "by=website") {
t.Errorf("drill from asn: expected cycle back to by=website in %q", u)
t.Errorf("drill from source_tag: expected cycle back to by=website in %q", u)
}
}

View File

@@ -58,6 +58,7 @@ type filterState struct {
URIReNeg string // RE2 regex exclusion against request URI
IsTor string // "", "1" (TOR only), "0" (non-TOR only)
ASN string // expression: "12345", "!=65000", ">=1000", etc.
SourceTag string // exact ipng_source_tag match
}
// QueryParams holds all parsed URL parameters for one page request.
@@ -95,7 +96,7 @@ var windowSpecs = []struct{ s, label string }{
}
var groupBySpecs = []struct{ s, label string }{
{"website", "website"}, {"asn", "asn"}, {"prefix", "prefix"}, {"status", "status"}, {"uri", "uri"},
{"website", "website"}, {"asn", "asn"}, {"prefix", "prefix"}, {"status", "status"}, {"uri", "uri"}, {"source_tag", "source"},
}
func parseWindowString(s string) (pb.Window, string) {
@@ -127,6 +128,8 @@ func parseGroupByString(s string) (pb.GroupBy, string) {
return pb.GroupBy_HTTP_RESPONSE, "status"
case "asn":
return pb.GroupBy_ASN_NUMBER, "asn"
case "source_tag":
return pb.GroupBy_SOURCE_TAG, "source_tag"
default:
return pb.GroupBy_WEBSITE, "website"
}
@@ -168,12 +171,13 @@ func (h *Handler) parseParams(r *http.Request) QueryParams {
URIReNeg: q.Get("f_uri_re_neg"),
IsTor: q.Get("f_is_tor"),
ASN: q.Get("f_asn"),
SourceTag: q.Get("f_source_tag"),
},
}
}
func buildFilter(f filterState) *pb.Filter {
if f.Website == "" && f.Prefix == "" && f.URI == "" && f.Status == "" && f.WebsiteRe == "" && f.URIRe == "" && f.WebsiteReNeg == "" && f.URIReNeg == "" && f.IsTor == "" && f.ASN == "" {
if f.Website == "" && f.Prefix == "" && f.URI == "" && f.Status == "" && f.WebsiteRe == "" && f.URIRe == "" && f.WebsiteReNeg == "" && f.URIReNeg == "" && f.IsTor == "" && f.ASN == "" && f.SourceTag == "" {
return nil
}
out := &pb.Filter{}
@@ -216,6 +220,9 @@ func buildFilter(f filterState) *pb.Filter {
out.AsnOp = op
}
}
if f.SourceTag != "" {
out.IpngSourceTag = &f.SourceTag
}
return out
}
@@ -256,6 +263,9 @@ func (p QueryParams) toValues() url.Values {
if p.Filter.ASN != "" {
v.Set("f_asn", p.Filter.ASN)
}
if p.Filter.SourceTag != "" {
v.Set("f_source_tag", p.Filter.SourceTag)
}
return v
}
@@ -278,7 +288,7 @@ func (p QueryParams) clearFilterURL() string {
return p.buildURL(map[string]string{
"f_website": "", "f_prefix": "", "f_uri": "", "f_status": "",
"f_website_re": "", "f_uri_re": "", "f_website_re_neg": "", "f_uri_re_neg": "",
"f_is_tor": "", "f_asn": "",
"f_is_tor": "", "f_asn": "", "f_source_tag": "",
})
}
@@ -293,7 +303,9 @@ func nextGroupBy(s string) string {
return "status"
case "status":
return "asn"
default: // asn → back to website
case "asn":
return "source_tag"
default: // source_tag → back to website
return "website"
}
}
@@ -311,6 +323,8 @@ func groupByFilterKey(s string) string {
return "f_status"
case "asn":
return "f_asn"
case "source_tag":
return "f_source_tag"
default:
return "f_website"
}
@@ -391,6 +405,12 @@ func buildCrumbs(p QueryParams) []Crumb {
RemoveURL: p.buildURL(map[string]string{"f_asn": ""}),
})
}
if p.Filter.SourceTag != "" {
crumbs = append(crumbs, Crumb{
Text: "source_tag=" + p.Filter.SourceTag,
RemoveURL: p.buildURL(map[string]string{"f_source_tag": ""}),
})
}
return crumbs
}