This commit is contained in:
2026-03-24 02:30:18 +01:00
parent 30c8c40157
commit c7f8455188
10 changed files with 29 additions and 26 deletions

View File

@@ -163,7 +163,7 @@ func TestCacheCoarseRing(t *testing.T) {
func TestCacheQueryTopN(t *testing.T) { func TestCacheQueryTopN(t *testing.T) {
m := NewMerger() m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{ m.Apply(makeSnap("c1", map[string]int64{
st.EncodeTuple(st.Tuple6{Website: "busy.com", Prefix: "1.0.0.0/24", URI: "/", Status: "200"}): 300, st.EncodeTuple(st.Tuple6{Website: "busy.com", Prefix: "1.0.0.0/24", URI: "/", Status: "200"}): 300,
st.EncodeTuple(st.Tuple6{Website: "quiet.com", Prefix: "2.0.0.0/24", URI: "/", Status: "200"}): 50, st.EncodeTuple(st.Tuple6{Website: "quiet.com", Prefix: "2.0.0.0/24", URI: "/", Status: "200"}): 50,
})) }))

View File

@@ -15,9 +15,9 @@ import (
) )
func main() { func main() {
listen := flag.String("listen", ":9091", "gRPC listen address") listen := flag.String("listen", ":9091", "gRPC listen address")
collectors := flag.String("collectors", "", "comma-separated collector host:port addresses") collectors := flag.String("collectors", "", "comma-separated collector host:port addresses")
source := flag.String("source", hostname(), "name for this aggregator in responses") source := flag.String("source", hostname(), "name for this aggregator in responses")
flag.Parse() flag.Parse()
if *collectors == "" { if *collectors == "" {

View File

@@ -89,7 +89,10 @@ func TestBuildFilterNil(t *testing.T) {
} }
func TestFmtCount(t *testing.T) { func TestFmtCount(t *testing.T) {
cases := []struct{ n int64; want string }{ cases := []struct {
n int64
want string
}{
{0, "0"}, {0, "0"},
{999, "999"}, {999, "999"},
{1000, "1 000"}, {1000, "1 000"},

View File

@@ -18,12 +18,12 @@ import (
) )
func main() { func main() {
listen := flag.String("listen", ":9090", "gRPC listen address") listen := flag.String("listen", ":9090", "gRPC listen address")
logPaths := flag.String("logs", "", "comma-separated log file paths/globs to tail") logPaths := flag.String("logs", "", "comma-separated log file paths/globs to tail")
logsFile := flag.String("logs-file", "", "file containing one log path/glob per line") logsFile := flag.String("logs-file", "", "file containing one log path/glob per line")
source := flag.String("source", hostname(), "name for this collector (default: hostname)") source := flag.String("source", hostname(), "name for this collector (default: hostname)")
v4prefix := flag.Int("v4prefix", 24, "IPv4 prefix length for client bucketing") v4prefix := flag.Int("v4prefix", 24, "IPv4 prefix length for client bucketing")
v6prefix := flag.Int("v6prefix", 48, "IPv6 prefix length for client bucketing") v6prefix := flag.Int("v6prefix", 48, "IPv6 prefix length for client bucketing")
scanInterval := flag.Duration("scan-interval", 10*time.Second, "how often to rescan glob patterns for new/removed files") scanInterval := flag.Duration("scan-interval", 10*time.Second, "how often to rescan glob patterns for new/removed files")
flag.Parse() flag.Parse()

View File

@@ -8,10 +8,10 @@ func TestParseLine(t *testing.T) {
good := "www.example.com\t1.2.3.4\t1741954800.123\tGET\t/api/v1/search?q=foo&x=1\t200\t1452\t0.043" good := "www.example.com\t1.2.3.4\t1741954800.123\tGET\t/api/v1/search?q=foo&x=1\t200\t1452\t0.043"
tests := []struct { tests := []struct {
name string name string
line string line string
wantOK bool wantOK bool
want LogRecord want LogRecord
}{ }{
{ {
name: "normal IPv4 line strips query string", name: "normal IPv4 line strips query string",
@@ -186,7 +186,7 @@ func TestTruncateIP(t *testing.T) {
{"1.2.3.4", "1.2.3.0/24"}, {"1.2.3.4", "1.2.3.0/24"},
{"192.168.100.200", "192.168.100.0/24"}, {"192.168.100.200", "192.168.100.0/24"},
{"2001:db8:cafe:babe::1", "2001:db8:cafe::/48"}, // /48 = 3 full groups intact {"2001:db8:cafe:babe::1", "2001:db8:cafe::/48"}, // /48 = 3 full groups intact
{"::1", "::/48"}, // loopback — first 48 bits are all zero {"::1", "::/48"}, // loopback — first 48 bits are all zero
} }
for _, tc := range tests { for _, tc := range tests {

View File

@@ -4,8 +4,8 @@ import (
"sync" "sync"
"time" "time"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
st "git.ipng.ch/ipng/nginx-logtail/internal/store" st "git.ipng.ch/ipng/nginx-logtail/internal/store"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
) )
const liveMapCap = 100_000 // hard cap on live map entries const liveMapCap = 100_000 // hard cap on live map entries

View File

@@ -5,8 +5,8 @@ import (
"testing" "testing"
"time" "time"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
st "git.ipng.ch/ipng/nginx-logtail/internal/store" st "git.ipng.ch/ipng/nginx-logtail/internal/store"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
) )
func makeStore() *Store { func makeStore() *Store {

View File

@@ -54,7 +54,7 @@ func (mt *MultiTailer) Run(ctx context.Context) {
} }
defer watcher.Close() defer watcher.Close()
files := make(map[string]*fileState) files := make(map[string]*fileState)
retrying := make(map[string]struct{}) // paths currently in a retryOpen goroutine retrying := make(map[string]struct{}) // paths currently in a retryOpen goroutine
reopenCh := make(chan reopenMsg, 32) reopenCh := make(chan reopenMsg, 32)

View File

@@ -16,9 +16,9 @@ import (
var templatesFS embed.FS var templatesFS embed.FS
func main() { func main() {
listen := flag.String("listen", ":8080", "HTTP listen address") listen := flag.String("listen", ":8080", "HTTP listen address")
target := flag.String("target", "localhost:9091", "default gRPC endpoint (aggregator or collector)") target := flag.String("target", "localhost:9091", "default gRPC endpoint (aggregator or collector)")
n := flag.Int("n", 25, "default number of table rows") n := flag.Int("n", 25, "default number of table rows")
refresh := flag.Int("refresh", 30, "meta-refresh interval in seconds (0 = disabled)") refresh := flag.Int("refresh", 30, "meta-refresh interval in seconds (0 = disabled)")
flag.Parse() flag.Parse()

View File

@@ -14,11 +14,11 @@ import (
// Ring-buffer dimensions — shared between collector and aggregator. // Ring-buffer dimensions — shared between collector and aggregator.
const ( const (
FineRingSize = 60 // 60 × 1-min buckets → 1 hour FineRingSize = 60 // 60 × 1-min buckets → 1 hour
CoarseRingSize = 288 // 288 × 5-min buckets → 24 hours CoarseRingSize = 288 // 288 × 5-min buckets → 24 hours
FineTopK = 50_000 // entries kept per fine snapshot FineTopK = 50_000 // entries kept per fine snapshot
CoarseTopK = 5_000 // entries kept per coarse snapshot CoarseTopK = 5_000 // entries kept per coarse snapshot
CoarseEvery = 5 // fine ticks between coarse writes CoarseEvery = 5 // fine ticks between coarse writes
) )
// Tuple6 is the aggregation key (website, prefix, URI, status, is_tor, asn). // Tuple6 is the aggregation key (website, prefix, URI, status, is_tor, asn).