Add ASN to logtail, collector, aggregator, frontend and CLI

This commit is contained in:
2026-03-24 02:28:29 +01:00
parent a798bb1d1d
commit 30c8c40157
17 changed files with 566 additions and 157 deletions

View File

@@ -6,6 +6,7 @@ import (
"container/heap"
"log"
"regexp"
"strconv"
"time"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
@@ -20,13 +21,14 @@ const (
CoarseEvery = 5 // fine ticks between coarse writes
)
// Tuple5 is the aggregation key (website, prefix, URI, status, is_tor).
type Tuple5 struct {
// Tuple6 is the aggregation key (website, prefix, URI, status, is_tor, asn).
type Tuple6 struct {
Website string
Prefix string
URI string
Status string
IsTor bool
ASN int32
}
// Entry is a labelled count used in snapshots and query results.
@@ -74,28 +76,33 @@ func BucketsForWindow(window pb.Window, fine, coarse RingView, fineFilled, coars
}
}
// --- label encoding: "website\x00prefix\x00uri\x00status\x00is_tor" ---
// --- label encoding: "website\x00prefix\x00uri\x00status\x00is_tor\x00asn" ---
// EncodeTuple encodes a Tuple5 as a NUL-separated string suitable for use
// EncodeTuple encodes a Tuple6 as a NUL-separated string suitable for use
// as a map key in snapshots.
func EncodeTuple(t Tuple5) string {
func EncodeTuple(t Tuple6) string {
tor := "0"
if t.IsTor {
tor = "1"
}
return t.Website + "\x00" + t.Prefix + "\x00" + t.URI + "\x00" + t.Status + "\x00" + tor
return t.Website + "\x00" + t.Prefix + "\x00" + t.URI + "\x00" + t.Status + "\x00" + tor + "\x00" + strconv.Itoa(int(t.ASN))
}
// LabelTuple decodes a NUL-separated snapshot label back into a Tuple5.
func LabelTuple(label string) Tuple5 {
parts := splitN(label, '\x00', 5)
// LabelTuple decodes a NUL-separated snapshot label back into a Tuple6.
func LabelTuple(label string) Tuple6 {
parts := splitN(label, '\x00', 6)
if len(parts) < 4 {
return Tuple5{}
return Tuple6{}
}
t := Tuple5{Website: parts[0], Prefix: parts[1], URI: parts[2], Status: parts[3]}
if len(parts) == 5 {
t := Tuple6{Website: parts[0], Prefix: parts[1], URI: parts[2], Status: parts[3]}
if len(parts) >= 5 {
t.IsTor = parts[4] == "1"
}
if len(parts) == 6 {
if n, err := strconv.Atoi(parts[5]); err == nil {
t.ASN = int32(n)
}
}
return t
}
@@ -159,7 +166,7 @@ func CompileFilter(f *pb.Filter) *CompiledFilter {
// MatchesFilter returns true if t satisfies all constraints in f.
// A nil filter matches everything.
func MatchesFilter(t Tuple5, f *CompiledFilter) bool {
func MatchesFilter(t Tuple6, f *CompiledFilter) bool {
if f == nil || f.Proto == nil {
return true
}
@@ -199,9 +206,30 @@ func MatchesFilter(t Tuple5, f *CompiledFilter) bool {
return false
}
}
if p.AsnNumber != nil && !matchesAsnOp(t.ASN, p.GetAsnNumber(), p.AsnOp) {
return false
}
return true
}
// matchesAsnOp applies op(asn, want) directly on int32 values.
func matchesAsnOp(asn, want int32, op pb.StatusOp) bool {
switch op {
case pb.StatusOp_NE:
return asn != want
case pb.StatusOp_GT:
return asn > want
case pb.StatusOp_GE:
return asn >= want
case pb.StatusOp_LT:
return asn < want
case pb.StatusOp_LE:
return asn <= want
default: // EQ
return asn == want
}
}
// matchesStatusOp applies op(statusStr, want), parsing statusStr as an integer.
// Returns false if statusStr is not a valid integer.
func matchesStatusOp(statusStr string, want int32, op pb.StatusOp) bool {
@@ -229,7 +257,7 @@ func matchesStatusOp(statusStr string, want int32, op pb.StatusOp) bool {
}
// DimensionLabel returns the string value of t for the given group-by dimension.
func DimensionLabel(t Tuple5, g pb.GroupBy) string {
func DimensionLabel(t Tuple6, g pb.GroupBy) string {
switch g {
case pb.GroupBy_WEBSITE:
return t.Website
@@ -239,6 +267,8 @@ func DimensionLabel(t Tuple5, g pb.GroupBy) string {
return t.URI
case pb.GroupBy_HTTP_RESPONSE:
return t.Status
case pb.GroupBy_ASN_NUMBER:
return strconv.Itoa(int(t.ASN))
default:
return t.Website
}
@@ -318,9 +348,9 @@ func TopKFromMap(m map[string]int64, k int) []Entry {
return result
}
// TopKFromTupleMap encodes a Tuple5 map and returns the top-k as a Snapshot.
// TopKFromTupleMap encodes a Tuple6 map and returns the top-k as a Snapshot.
// Used by the collector to snapshot its live map.
func TopKFromTupleMap(m map[Tuple5]int64, k int, ts time.Time) Snapshot {
func TopKFromTupleMap(m map[Tuple6]int64, k int, ts time.Time) Snapshot {
flat := make(map[string]int64, len(m))
for t, c := range m {
flat[EncodeTuple(t)] = c