Collector implementation

This commit is contained in:
2026-03-14 20:07:22 +01:00
parent 4393ae2726
commit 6ca296b2e8
16 changed files with 3052 additions and 0 deletions

130
cmd/collector/main.go Normal file
View File

@@ -0,0 +1,130 @@
package main
import (
"bufio"
"context"
"flag"
"log"
"net"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc"
)
func main() {
listen := flag.String("listen", ":9090", "gRPC listen address")
logPaths := flag.String("logs", "", "comma-separated log file paths/globs to tail")
logsFile := flag.String("logs-file", "", "file containing one log path/glob per line")
source := flag.String("source", hostname(), "name for this collector (default: hostname)")
v4prefix := flag.Int("v4prefix", 24, "IPv4 prefix length for client bucketing")
v6prefix := flag.Int("v6prefix", 48, "IPv6 prefix length for client bucketing")
flag.Parse()
patterns := collectPatterns(*logPaths, *logsFile)
if len(patterns) == 0 {
log.Fatal("collector: no log paths specified; use --logs or --logs-file")
}
paths := expandGlobs(patterns)
if len(paths) == 0 {
log.Fatal("collector: no log files matched the specified patterns")
}
log.Printf("collector: tailing %d file(s)", len(paths))
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Shared channel: tailer → store. Buffer absorbs ~20s of peak load.
ch := make(chan LogRecord, 200_000)
store := NewStore(*source)
go store.Run(ch)
tailer := NewMultiTailer(paths, *v4prefix, *v6prefix, ch)
go tailer.Run(ctx)
lis, err := net.Listen("tcp", *listen)
if err != nil {
log.Fatalf("collector: failed to listen on %s: %v", *listen, err)
}
grpcServer := grpc.NewServer()
pb.RegisterLogtailServiceServer(grpcServer, NewServer(store, *source))
go func() {
log.Printf("collector: gRPC server listening on %s (source=%s)", *listen, *source)
if err := grpcServer.Serve(lis); err != nil {
log.Printf("collector: gRPC server stopped: %v", err)
}
}()
<-ctx.Done()
log.Printf("collector: shutting down")
grpcServer.GracefulStop()
close(ch)
}
// collectPatterns merges patterns from --logs (comma-separated) and --logs-file.
func collectPatterns(logPaths, logsFile string) []string {
var patterns []string
for _, p := range strings.Split(logPaths, ",") {
if p = strings.TrimSpace(p); p != "" {
patterns = append(patterns, p)
}
}
if logsFile != "" {
f, err := os.Open(logsFile)
if err != nil {
log.Fatalf("collector: cannot open --logs-file %s: %v", logsFile, err)
}
defer f.Close()
sc := bufio.NewScanner(f)
for sc.Scan() {
if p := strings.TrimSpace(sc.Text()); p != "" && !strings.HasPrefix(p, "#") {
patterns = append(patterns, p)
}
}
}
return patterns
}
// expandGlobs expands any glob patterns and returns deduplicated concrete paths.
func expandGlobs(patterns []string) []string {
seen := make(map[string]struct{})
var paths []string
for _, pat := range patterns {
matches, err := filepath.Glob(pat)
if err != nil {
log.Printf("collector: invalid glob %q: %v", pat, err)
continue
}
if len(matches) == 0 {
// Keep the path even if it doesn't exist yet; the tailer will retry.
log.Printf("collector: pattern %q matched no files, will watch for creation", pat)
if _, ok := seen[pat]; !ok {
seen[pat] = struct{}{}
paths = append(paths, pat)
}
continue
}
for _, m := range matches {
if _, ok := seen[m]; !ok {
seen[m] = struct{}{}
paths = append(paths, m)
}
}
}
return paths
}
func hostname() string {
h, err := os.Hostname()
if err != nil {
return "unknown"
}
return h
}

71
cmd/collector/parser.go Normal file
View File

@@ -0,0 +1,71 @@
package main
import (
"fmt"
"net"
"strings"
)
// LogRecord holds the four dimensions extracted from a single nginx log line.
type LogRecord struct {
Website string
ClientPrefix string
URI string
Status string
}
// ParseLine parses a tab-separated logtail log line:
//
// $host \t $remote_addr \t $msec \t $request_method \t $request_uri \t $status \t $body_bytes_sent \t $request_time
//
// Returns false for lines with fewer than 8 fields.
func ParseLine(line string, v4bits, v6bits int) (LogRecord, bool) {
// SplitN caps allocations; we need exactly 8 fields.
fields := strings.SplitN(line, "\t", 8)
if len(fields) < 8 {
return LogRecord{}, false
}
uri := fields[4]
if i := strings.IndexByte(uri, '?'); i >= 0 {
uri = uri[:i]
}
prefix, ok := truncateIP(fields[1], v4bits, v6bits)
if !ok {
return LogRecord{}, false
}
return LogRecord{
Website: fields[0],
ClientPrefix: prefix,
URI: uri,
Status: fields[5],
}, true
}
// truncateIP masks addr to the given prefix length depending on IP version.
// Returns the CIDR string (e.g. "1.2.3.0/24") and true on success.
func truncateIP(addr string, v4bits, v6bits int) (string, bool) {
ip := net.ParseIP(addr)
if ip == nil {
return "", false
}
var bits int
if ip.To4() != nil {
ip = ip.To4()
bits = v4bits
} else {
ip = ip.To16()
bits = v6bits
}
mask := net.CIDRMask(bits, len(ip)*8)
masked := make(net.IP, len(ip))
for i := range ip {
masked[i] = ip[i] & mask[i]
}
return fmt.Sprintf("%s/%d", masked.String(), bits), true
}

View File

@@ -0,0 +1,114 @@
package main
import (
"testing"
)
func TestParseLine(t *testing.T) {
good := "www.example.com\t1.2.3.4\t1741954800.123\tGET\t/api/v1/search?q=foo&x=1\t200\t1452\t0.043"
tests := []struct {
name string
line string
wantOK bool
want LogRecord
}{
{
name: "normal IPv4 line strips query string",
line: good,
wantOK: true,
want: LogRecord{
Website: "www.example.com",
ClientPrefix: "1.2.3.0/24",
URI: "/api/v1/search",
Status: "200",
},
},
{
name: "URI with no query string",
line: "host\t10.0.0.1\t0\tPOST\t/submit\t201\t0\t0.001",
wantOK: true,
want: LogRecord{
Website: "host",
ClientPrefix: "10.0.0.0/24",
URI: "/submit",
Status: "201",
},
},
{
name: "IPv6 address truncated to /48",
line: "host\t2001:db8:cafe::1\t0\tGET\t/\t200\t0\t0.001",
wantOK: true,
want: LogRecord{
Website: "host",
ClientPrefix: "2001:db8:cafe::/48", // /48 = 3 full 16-bit groups intact
URI: "/",
Status: "200",
},
},
{
name: "too few fields returns false",
line: "host\t1.2.3.4\t0\tGET\t/",
wantOK: false,
},
{
name: "empty line returns false",
line: "",
wantOK: false,
},
{
name: "invalid IP returns false",
line: "host\tnot-an-ip\t0\tGET\t/\t200\t0\t0.001",
wantOK: false,
},
{
name: "status 429",
line: "api.example.com\t5.6.7.8\t0\tGET\t/rate-limited\t429\t0\t0.001",
wantOK: true,
want: LogRecord{
Website: "api.example.com",
ClientPrefix: "5.6.7.0/24",
URI: "/rate-limited",
Status: "429",
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got, ok := ParseLine(tc.line, 24, 48)
if ok != tc.wantOK {
t.Fatalf("ParseLine ok=%v, want %v", ok, tc.wantOK)
}
if !tc.wantOK {
return
}
if got != tc.want {
t.Errorf("got %+v, want %+v", got, tc.want)
}
})
}
}
func TestTruncateIP(t *testing.T) {
tests := []struct {
addr string
want string
}{
{"1.2.3.4", "1.2.3.0/24"},
{"192.168.100.200", "192.168.100.0/24"},
{"2001:db8:cafe:babe::1", "2001:db8:cafe::/48"}, // /48 = 3 full groups intact
{"::1", "::/48"}, // loopback — first 48 bits are all zero
}
for _, tc := range tests {
got, ok := truncateIP(tc.addr, 24, 48)
if !ok {
t.Errorf("truncateIP(%q) returned not-ok", tc.addr)
continue
}
if got != tc.want {
t.Errorf("truncateIP(%q) = %q, want %q", tc.addr, got, tc.want)
}
}
}

91
cmd/collector/server.go Normal file
View File

@@ -0,0 +1,91 @@
package main
import (
"context"
"log"
"time"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Server implements pb.LogtailServiceServer backed by a Store.
type Server struct {
pb.UnimplementedLogtailServiceServer
store *Store
source string
}
func NewServer(store *Store, source string) *Server {
return &Server{store: store, source: source}
}
func (srv *Server) TopN(_ context.Context, req *pb.TopNRequest) (*pb.TopNResponse, error) {
if req == nil {
return nil, status.Error(codes.InvalidArgument, "request is nil")
}
n := int(req.N)
if n <= 0 {
n = 10
}
entries := srv.store.QueryTopN(req.Filter, req.GroupBy, n, req.Window)
resp := &pb.TopNResponse{Source: srv.source}
for _, e := range entries {
resp.Entries = append(resp.Entries, &pb.TopNEntry{
Label: e.Label,
Count: e.Count,
})
}
return resp, nil
}
func (srv *Server) Trend(_ context.Context, req *pb.TrendRequest) (*pb.TrendResponse, error) {
if req == nil {
return nil, status.Error(codes.InvalidArgument, "request is nil")
}
points := srv.store.QueryTrend(req.Filter, req.Window)
resp := &pb.TrendResponse{Source: srv.source}
for _, p := range points {
resp.Points = append(resp.Points, &pb.TrendPoint{
TimestampUnix: p.Timestamp.Unix(),
Count: p.Count,
})
}
return resp, nil
}
func (srv *Server) StreamSnapshots(req *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error {
ch := srv.store.Subscribe()
defer srv.store.Unsubscribe(ch)
log.Printf("server: new StreamSnapshots subscriber from %v", stream.Context().Value("peer"))
for {
select {
case <-stream.Context().Done():
log.Printf("server: StreamSnapshots subscriber disconnected")
return nil
case snap, ok := <-ch:
if !ok {
return nil
}
msg := &pb.Snapshot{
Source: srv.source,
Timestamp: snap.Timestamp.Unix(),
}
for _, e := range snap.Entries {
msg.Entries = append(msg.Entries, &pb.TopNEntry{
Label: e.Label,
Count: e.Count,
})
}
if err := stream.Send(msg); err != nil {
log.Printf("server: send error: %v", err)
return err
}
case <-time.After(30 * time.Second):
// unblock select when server is quiet; gRPC keepalives handle the rest
}
}
}

205
cmd/collector/smoke_test.go Normal file
View File

@@ -0,0 +1,205 @@
package main
import (
"context"
"fmt"
"net"
"runtime"
"testing"
"time"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// BenchmarkIngest measures how fast the store can process log records.
// At 10K lines/s we need ~10µs budget per record; we should be well under 1µs.
func BenchmarkIngest(b *testing.B) {
s := NewStore("bench")
r := LogRecord{
Website: "www.example.com",
ClientPrefix: "1.2.3.0/24",
URI: "/api/v1/search",
Status: "200",
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Vary the key slightly to avoid the "existing key" fast path
r.ClientPrefix = fmt.Sprintf("%d.%d.%d.0/24", i%200, (i/200)%256, (i/51200)%256)
s.ingest(r)
}
}
// BenchmarkParseLine measures parser throughput.
func BenchmarkParseLine(b *testing.B) {
line := "www.example.com\t1.2.3.4\t1741954800.123\tGET\t/api/v1/search?q=foo\t200\t1452\t0.043"
b.ResetTimer()
for i := 0; i < b.N; i++ {
ParseLine(line, 24, 48)
}
}
// TestMemoryBudget fills the store to capacity and checks RSS stays within budget.
func TestMemoryBudget(t *testing.T) {
if testing.Short() {
t.Skip("skipping memory test in short mode")
}
s := NewStore("memtest")
now := time.Now()
// Fill the live map to cap
for i := 0; i < liveMapCap; i++ {
s.ingest(LogRecord{
Website: fmt.Sprintf("site%d.com", i%1000),
ClientPrefix: fmt.Sprintf("%d.%d.%d.0/24", i%256, (i/256)%256, (i/65536)%256),
URI: fmt.Sprintf("/path/%d", i%100),
Status: fmt.Sprintf("%d", 200+i%4*100),
})
}
// Rotate 60 fine buckets to fill the fine ring
for i := 0; i < fineRingSize; i++ {
for j := 0; j < 1000; j++ {
s.ingest(LogRecord{
Website: fmt.Sprintf("site%d.com", j%1000),
ClientPrefix: fmt.Sprintf("%d.%d.%d.0/24", j%256, j/256, 0),
URI: fmt.Sprintf("/p/%d", j%100),
Status: "200",
})
}
s.rotate(now.Add(time.Duration(i) * time.Minute))
}
// Rotate enough to fill the coarse ring (288 coarse buckets × 5 fine each)
for i := 0; i < coarseRingSize*coarseEvery; i++ {
for j := 0; j < 100; j++ {
s.ingest(LogRecord{
Website: fmt.Sprintf("site%d.com", j%1000),
ClientPrefix: fmt.Sprintf("%d.%d.%d.0/24", j%256, j/256, 0),
URI: "/",
Status: "200",
})
}
s.rotate(now.Add(time.Duration(fineRingSize+i) * time.Minute))
}
var ms runtime.MemStats
runtime.GC()
runtime.ReadMemStats(&ms)
heapMB := ms.HeapInuse / 1024 / 1024
t.Logf("heap in-use after full ring fill: %d MB", heapMB)
const budgetMB = 1024
if heapMB > budgetMB {
t.Errorf("heap %d MB exceeds budget of %d MB", heapMB, budgetMB)
}
}
// TestGRPCEndToEnd spins up a real gRPC server, injects data, and queries it.
func TestGRPCEndToEnd(t *testing.T) {
store := NewStore("e2e-test")
// Pre-populate with known data then rotate so it's queryable
for i := 0; i < 500; i++ {
store.ingest(LogRecord{"busy.com", "1.2.3.0/24", "/api", "200"})
}
for i := 0; i < 200; i++ {
store.ingest(LogRecord{"quiet.com", "5.6.7.0/24", "/", "429"})
}
store.rotate(time.Now())
// Start gRPC server on a random free port
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
grpcSrv := grpc.NewServer()
pb.RegisterLogtailServiceServer(grpcSrv, NewServer(store, "e2e-test"))
go grpcSrv.Serve(lis)
defer grpcSrv.Stop()
// Dial it
conn, err := grpc.NewClient(lis.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatal(err)
}
defer conn.Close()
client := pb.NewLogtailServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// TopN by website
resp, err := client.TopN(ctx, &pb.TopNRequest{
GroupBy: pb.GroupBy_WEBSITE,
N: 10,
Window: pb.Window_W1M,
})
if err != nil {
t.Fatalf("TopN error: %v", err)
}
if len(resp.Entries) != 2 {
t.Fatalf("got %d entries, want 2", len(resp.Entries))
}
if resp.Entries[0].Label != "busy.com" {
t.Errorf("top site = %q, want busy.com", resp.Entries[0].Label)
}
if resp.Entries[0].Count != 500 {
t.Errorf("top count = %d, want 500", resp.Entries[0].Count)
}
t.Logf("TopN result: source=%s entries=%v", resp.Source, resp.Entries)
// TopN filtered to 429s
status429 := int32(429)
resp, err = client.TopN(ctx, &pb.TopNRequest{
Filter: &pb.Filter{HttpResponse: &status429},
GroupBy: pb.GroupBy_WEBSITE,
N: 10,
Window: pb.Window_W1M,
})
if err != nil {
t.Fatalf("TopN filtered error: %v", err)
}
if len(resp.Entries) != 1 || resp.Entries[0].Label != "quiet.com" {
t.Errorf("filtered result unexpected: %v", resp.Entries)
}
// Trend
tresp, err := client.Trend(ctx, &pb.TrendRequest{Window: pb.Window_W5M})
if err != nil {
t.Fatalf("Trend error: %v", err)
}
if len(tresp.Points) != 1 {
t.Fatalf("got %d trend points, want 1", len(tresp.Points))
}
if tresp.Points[0].Count != 700 {
t.Errorf("trend count = %d, want 700", tresp.Points[0].Count)
}
t.Logf("Trend result: %v points", len(tresp.Points))
// StreamSnapshots — inject a new rotation and check we receive it
subCh := store.Subscribe()
defer store.Unsubscribe(subCh)
streamCtx, streamCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer streamCancel()
stream, err := client.StreamSnapshots(streamCtx, &pb.SnapshotRequest{})
if err != nil {
t.Fatalf("StreamSnapshots error: %v", err)
}
store.ingest(LogRecord{"new.com", "9.9.9.0/24", "/new", "200"})
store.rotate(time.Now())
snap, err := stream.Recv()
if err != nil {
t.Fatalf("stream Recv error: %v", err)
}
if snap.Source != "e2e-test" {
t.Errorf("snapshot source = %q, want e2e-test", snap.Source)
}
t.Logf("StreamSnapshots: received snapshot with %d entries from %s", len(snap.Entries), snap.Source)
}

393
cmd/collector/store.go Normal file
View File

@@ -0,0 +1,393 @@
package main
import (
"container/heap"
"fmt"
"sync"
"time"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
)
const (
liveMapCap = 100_000 // hard cap on live map entries
fineRingSize = 60 // 60 × 1-min buckets → 1 hour
coarseRingSize = 288 // 288 × 5-min buckets → 24 hours
fineTopK = 50_000 // entries kept per fine snapshot
coarseTopK = 5_000 // entries kept per coarse snapshot
coarseEvery = 5 // merge every N fine ticks into one coarse bucket
)
// Tuple4 is the four-dimensional key.
type Tuple4 struct {
Website string
Prefix string
URI string
Status string
}
// Entry is a labelled count used in snapshots and query results.
type Entry struct {
Label string
Count int64
}
// snapshot is one sorted (desc) slice of top-K entries for a time bucket.
type snapshot struct {
Timestamp time.Time
Entries []Entry // sorted descending by Count
}
// Store holds the live map and both ring buffers.
type Store struct {
source string
// live map — written only by Run goroutine, no locking needed for writes
live map[Tuple4]int64
liveLen int // tracked separately to avoid map len() call in hot path
// ring buffers — protected by mu for reads (Run goroutine writes)
mu sync.RWMutex
fineRing [fineRingSize]snapshot
fineHead int // index of next write slot
fineFilled int // how many slots are populated
coarseRing [coarseRingSize]snapshot
coarseHead int
coarseFilled int
fineTick int // counts fine ticks mod coarseEvery
// fan-out to StreamSnapshots subscribers
subMu sync.Mutex
subs map[chan snapshot]struct{}
}
func NewStore(source string) *Store {
return &Store{
source: source,
live: make(map[Tuple4]int64, liveMapCap),
subs: make(map[chan snapshot]struct{}),
}
}
// Ingest records one log record into the live map.
// Must only be called from the Run goroutine.
func (s *Store) ingest(r LogRecord) {
key := Tuple4{r.Website, r.ClientPrefix, r.URI, r.Status}
if _, exists := s.live[key]; !exists {
if s.liveLen >= liveMapCap {
return // drop new keys when at cap
}
s.liveLen++
}
s.live[key]++
}
// rotate snapshots the live map into the fine ring, and every coarseEvery ticks
// also merges into the coarse ring. Called once per minute by Run.
func (s *Store) rotate(now time.Time) {
fine := topK(s.live, fineTopK, now)
s.mu.Lock()
s.fineRing[s.fineHead] = fine
s.fineHead = (s.fineHead + 1) % fineRingSize
if s.fineFilled < fineRingSize {
s.fineFilled++
}
s.fineTick++
if s.fineTick >= coarseEvery {
s.fineTick = 0
coarse := s.mergeFineBuckets(coarseTopK, now)
s.coarseRing[s.coarseHead] = coarse
s.coarseHead = (s.coarseHead + 1) % coarseRingSize
if s.coarseFilled < coarseRingSize {
s.coarseFilled++
}
}
s.mu.Unlock()
// reset live map
s.live = make(map[Tuple4]int64, liveMapCap)
s.liveLen = 0
// notify subscribers — must be outside mu to avoid deadlock
s.broadcast(fine)
}
// mergeFineBuckets merges the last coarseEvery fine snapshots into one.
// Called with mu held.
func (s *Store) mergeFineBuckets(k int, now time.Time) snapshot {
merged := make(map[string]int64)
count := coarseEvery
if count > s.fineFilled {
count = s.fineFilled
}
for i := 0; i < count; i++ {
idx := (s.fineHead - 1 - i + fineRingSize) % fineRingSize
for _, e := range s.fineRing[idx].Entries {
merged[e.Label] += e.Count
}
}
entries := topKFromMap(merged, k)
return snapshot{Timestamp: now, Entries: entries}
}
// QueryTopN answers a TopN request from the ring buffers.
func (s *Store) QueryTopN(filter *pb.Filter, groupBy pb.GroupBy, n int, window pb.Window) []Entry {
s.mu.RLock()
defer s.mu.RUnlock()
buckets, count := s.bucketsForWindow(window)
// Accumulate grouped counts
grouped := make(map[string]int64)
for i := 0; i < count; i++ {
idx := (buckets.head - 1 - i + buckets.size) % buckets.size
snap := buckets.ring[idx]
for _, e := range snap.Entries {
t := labelTuple(e.Label)
if !matchesFilter(t, filter) {
continue
}
grouped[dimensionLabel(t, groupBy)] += e.Count
}
}
return topKFromMap(grouped, n)
}
// QueryTrend answers a Trend request from the ring buffers.
func (s *Store) QueryTrend(filter *pb.Filter, window pb.Window) []trendPoint {
s.mu.RLock()
defer s.mu.RUnlock()
buckets, count := s.bucketsForWindow(window)
points := make([]trendPoint, count)
for i := 0; i < count; i++ {
// oldest first
idx := (buckets.head - count + i + buckets.size) % buckets.size
snap := buckets.ring[idx]
var total int64
for _, e := range snap.Entries {
if matchesFilter(labelTuple(e.Label), filter) {
total += e.Count
}
}
points[i] = trendPoint{Timestamp: snap.Timestamp, Count: total}
}
return points
}
type trendPoint struct {
Timestamp time.Time
Count int64
}
// ringView is a helper to treat fine and coarse rings uniformly.
type ringView struct {
ring []snapshot
head int
size int
}
func (s *Store) bucketsForWindow(window pb.Window) (ringView, int) {
switch window {
case pb.Window_W1M:
return s.fineView(), min(1, s.fineFilled)
case pb.Window_W5M:
return s.fineView(), min(5, s.fineFilled)
case pb.Window_W15M:
return s.fineView(), min(15, s.fineFilled)
case pb.Window_W60M:
return s.fineView(), min(60, s.fineFilled)
case pb.Window_W6H:
return s.coarseView(), min(72, s.coarseFilled) // 72 × 5-min = 6h
case pb.Window_W24H:
return s.coarseView(), min(288, s.coarseFilled)
default:
return s.fineView(), min(5, s.fineFilled)
}
}
func (s *Store) fineView() ringView {
ring := make([]snapshot, fineRingSize)
copy(ring, s.fineRing[:])
return ringView{ring: ring, head: s.fineHead, size: fineRingSize}
}
func (s *Store) coarseView() ringView {
ring := make([]snapshot, coarseRingSize)
copy(ring, s.coarseRing[:])
return ringView{ring: ring, head: s.coarseHead, size: coarseRingSize}
}
// Subscribe returns a channel that receives a copy of each fine snapshot
// after rotation. Buffer of 4 so a slow subscriber doesn't block rotation.
func (s *Store) Subscribe() chan snapshot {
ch := make(chan snapshot, 4)
s.subMu.Lock()
s.subs[ch] = struct{}{}
s.subMu.Unlock()
return ch
}
// Unsubscribe removes and closes the subscriber channel.
func (s *Store) Unsubscribe(ch chan snapshot) {
s.subMu.Lock()
delete(s.subs, ch)
s.subMu.Unlock()
close(ch)
}
func (s *Store) broadcast(snap snapshot) {
s.subMu.Lock()
defer s.subMu.Unlock()
for ch := range s.subs {
select {
case ch <- snap:
default:
// subscriber is slow; drop rather than block rotation
}
}
}
// Run is the single goroutine that reads from ch, ingests records, and rotates
// the ring buffer every minute. Exits when ch is closed.
func (s *Store) Run(ch <-chan LogRecord) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case r, ok := <-ch:
if !ok {
return
}
s.ingest(r)
case t := <-ticker.C:
s.rotate(t)
}
}
}
// --- heap-based top-K helpers ---
type entryHeap []Entry
func (h entryHeap) Len() int { return len(h) }
func (h entryHeap) Less(i, j int) bool { return h[i].Count < h[j].Count } // min-heap
func (h entryHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *entryHeap) Push(x interface{}) { *h = append(*h, x.(Entry)) }
func (h *entryHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
// topK extracts the top-k entries from a Tuple4 map, labelled as "w|p|u|s".
func topK(m map[Tuple4]int64, k int, ts time.Time) snapshot {
// Build a string-keyed map for topKFromMap
flat := make(map[string]int64, len(m))
for t, c := range m {
flat[encodeTuple(t)] = c
}
return snapshot{Timestamp: ts, Entries: topKFromMap(flat, k)}
}
// topKFromMap selects the top-k entries from a string→count map, sorted desc.
func topKFromMap(m map[string]int64, k int) []Entry {
if k <= 0 {
return nil
}
h := make(entryHeap, 0, k+1)
for label, count := range m {
heap.Push(&h, Entry{Label: label, Count: count})
if h.Len() > k {
heap.Pop(&h) // evict smallest
}
}
result := make([]Entry, h.Len())
for i := len(result) - 1; i >= 0; i-- {
result[i] = heap.Pop(&h).(Entry)
}
return result
}
// --- label encoding: "website\x00prefix\x00uri\x00status" ---
func encodeTuple(t Tuple4) string {
return t.Website + "\x00" + t.Prefix + "\x00" + t.URI + "\x00" + t.Status
}
func labelTuple(label string) Tuple4 {
parts := splitN(label, '\x00', 4)
if len(parts) != 4 {
return Tuple4{}
}
return Tuple4{parts[0], parts[1], parts[2], parts[3]}
}
func splitN(s string, sep byte, n int) []string {
result := make([]string, 0, n)
for len(result) < n-1 {
i := indexOf(s, sep)
if i < 0 {
break
}
result = append(result, s[:i])
s = s[i+1:]
}
return append(result, s)
}
func indexOf(s string, b byte) int {
for i := 0; i < len(s); i++ {
if s[i] == b {
return i
}
}
return -1
}
func matchesFilter(t Tuple4, f *pb.Filter) bool {
if f == nil {
return true
}
if f.Website != nil && t.Website != f.GetWebsite() {
return false
}
if f.ClientPrefix != nil && t.Prefix != f.GetClientPrefix() {
return false
}
if f.HttpRequestUri != nil && t.URI != f.GetHttpRequestUri() {
return false
}
if f.HttpResponse != nil && t.Status != fmt.Sprint(f.GetHttpResponse()) {
return false
}
return true
}
func dimensionLabel(t Tuple4, g pb.GroupBy) string {
switch g {
case pb.GroupBy_WEBSITE:
return t.Website
case pb.GroupBy_CLIENT_PREFIX:
return t.Prefix
case pb.GroupBy_REQUEST_URI:
return t.URI
case pb.GroupBy_HTTP_RESPONSE:
return t.Status
default:
return t.Website
}
}
func min(a, b int) int {
if a < b {
return a
}
return b
}

179
cmd/collector/store_test.go Normal file
View File

@@ -0,0 +1,179 @@
package main
import (
"fmt"
"testing"
"time"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
)
func makeStore() *Store {
return NewStore("test")
}
func ingestN(s *Store, website, prefix, uri, status string, n int) {
for i := 0; i < n; i++ {
s.ingest(LogRecord{website, prefix, uri, status})
}
}
func TestIngestAndRotate(t *testing.T) {
s := makeStore()
ingestN(s, "example.com", "1.2.3.0/24", "/", "200", 100)
ingestN(s, "other.com", "5.6.7.0/24", "/api", "429", 50)
s.rotate(time.Now())
s.mu.RLock()
defer s.mu.RUnlock()
if s.fineFilled != 1 {
t.Fatalf("fineFilled = %d, want 1", s.fineFilled)
}
snap := s.fineRing[(s.fineHead-1+fineRingSize)%fineRingSize]
if len(snap.Entries) != 2 {
t.Fatalf("snapshot has %d entries, want 2", len(snap.Entries))
}
if snap.Entries[0].Count != 100 {
t.Errorf("top entry count = %d, want 100", snap.Entries[0].Count)
}
}
func TestLiveMapCap(t *testing.T) {
s := makeStore()
// Ingest liveMapCap+100 distinct keys; only liveMapCap should be tracked
for i := 0; i < liveMapCap+100; i++ {
s.ingest(LogRecord{
Website: fmt.Sprintf("site%d.com", i),
ClientPrefix: "1.2.3.0/24",
URI: "/",
Status: "200",
})
}
if s.liveLen != liveMapCap {
t.Errorf("liveLen = %d, want %d", s.liveLen, liveMapCap)
}
}
func TestQueryTopN(t *testing.T) {
s := makeStore()
ingestN(s, "busy.com", "1.0.0.0/24", "/", "200", 300)
ingestN(s, "medium.com", "2.0.0.0/24", "/", "200", 100)
ingestN(s, "quiet.com", "3.0.0.0/24", "/", "200", 10)
s.rotate(time.Now())
entries := s.QueryTopN(nil, pb.GroupBy_WEBSITE, 2, pb.Window_W1M)
if len(entries) != 2 {
t.Fatalf("got %d entries, want 2", len(entries))
}
if entries[0].Label != "busy.com" {
t.Errorf("top entry = %q, want busy.com", entries[0].Label)
}
if entries[0].Count != 300 {
t.Errorf("top count = %d, want 300", entries[0].Count)
}
}
func TestQueryTopNWithFilter(t *testing.T) {
s := makeStore()
ingestN(s, "example.com", "1.0.0.0/24", "/api", "429", 200)
ingestN(s, "example.com", "2.0.0.0/24", "/api", "200", 500)
ingestN(s, "other.com", "3.0.0.0/24", "/", "429", 100)
s.rotate(time.Now())
status429 := int32(429)
entries := s.QueryTopN(&pb.Filter{HttpResponse: &status429}, pb.GroupBy_WEBSITE, 10, pb.Window_W1M)
if len(entries) != 2 {
t.Fatalf("got %d entries, want 2", len(entries))
}
// example.com has 200 × 429, other.com has 100 × 429
if entries[0].Label != "example.com" || entries[0].Count != 200 {
t.Errorf("unexpected top: %+v", entries[0])
}
}
func TestQueryTrend(t *testing.T) {
s := makeStore()
now := time.Now()
// Rotate 3 buckets with different counts
ingestN(s, "x.com", "1.0.0.0/24", "/", "200", 10)
s.rotate(now.Add(-2 * time.Minute))
ingestN(s, "x.com", "1.0.0.0/24", "/", "200", 20)
s.rotate(now.Add(-1 * time.Minute))
ingestN(s, "x.com", "1.0.0.0/24", "/", "200", 30)
s.rotate(now)
points := s.QueryTrend(nil, pb.Window_W5M)
if len(points) != 3 {
t.Fatalf("got %d points, want 3", len(points))
}
// Points are oldest-first; counts should be 10, 20, 30
if points[0].Count != 10 || points[1].Count != 20 || points[2].Count != 30 {
t.Errorf("unexpected counts: %v", points)
}
}
func TestCoarseRingPopulated(t *testing.T) {
s := makeStore()
now := time.Now()
// Rotate coarseEvery fine buckets to trigger one coarse bucket
for i := 0; i < coarseEvery; i++ {
ingestN(s, "x.com", "1.0.0.0/24", "/", "200", 10)
s.rotate(now.Add(time.Duration(i) * time.Minute))
}
s.mu.RLock()
defer s.mu.RUnlock()
if s.coarseFilled != 1 {
t.Fatalf("coarseFilled = %d, want 1", s.coarseFilled)
}
coarse := s.coarseRing[(s.coarseHead-1+coarseRingSize)%coarseRingSize]
if len(coarse.Entries) == 0 {
t.Fatal("coarse snapshot is empty")
}
// 5 fine buckets × 10 counts = 50
if coarse.Entries[0].Count != 50 {
t.Errorf("coarse count = %d, want 50", coarse.Entries[0].Count)
}
}
func TestSubscribeBroadcast(t *testing.T) {
s := makeStore()
ch := s.Subscribe()
ingestN(s, "x.com", "1.0.0.0/24", "/", "200", 5)
s.rotate(time.Now())
select {
case snap := <-ch:
if len(snap.Entries) != 1 {
t.Errorf("got %d entries, want 1", len(snap.Entries))
}
case <-time.After(time.Second):
t.Fatal("no snapshot received within 1s")
}
s.Unsubscribe(ch)
}
func TestTopKOrdering(t *testing.T) {
m := map[string]int64{
"a": 5,
"b": 100,
"c": 1,
"d": 50,
}
entries := topKFromMap(m, 3)
if len(entries) != 3 {
t.Fatalf("got %d entries, want 3", len(entries))
}
if entries[0].Label != "b" || entries[0].Count != 100 {
t.Errorf("wrong top: %+v", entries[0])
}
if entries[1].Label != "d" || entries[1].Count != 50 {
t.Errorf("wrong second: %+v", entries[1])
}
}

179
cmd/collector/tailer.go Normal file
View File

@@ -0,0 +1,179 @@
package main
import (
"bufio"
"context"
"io"
"log"
"os"
"time"
"github.com/fsnotify/fsnotify"
)
// fileState holds the open file handle and buffered reader for one log file.
type fileState struct {
f *os.File
reader *bufio.Reader
}
// reopenMsg is sent by a retry goroutine back to the Run loop when a rotated
// file has reappeared and is ready to be watched again.
type reopenMsg struct {
path string
f *os.File
}
// MultiTailer tails any number of log files using a single shared
// fsnotify.Watcher (one inotify instance). This scales to hundreds of files
// without hitting the kernel limit on inotify instances per user.
type MultiTailer struct {
paths []string
v4bits int
v6bits int
ch chan<- LogRecord
}
func NewMultiTailer(paths []string, v4bits, v6bits int, ch chan<- LogRecord) *MultiTailer {
return &MultiTailer{paths: paths, v4bits: v4bits, v6bits: v6bits, ch: ch}
}
// Run tails all configured files until ctx is cancelled.
// All files share one fsnotify.Watcher. Log rotation is handled per-file:
// on RENAME/REMOVE the old fd is drained then a retry goroutine re-opens
// the original path and hands it back via a channel.
func (mt *MultiTailer) Run(ctx context.Context) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatalf("tailer: failed to create watcher: %v", err)
}
defer watcher.Close()
files := make(map[string]*fileState, len(mt.paths))
reopenCh := make(chan reopenMsg, len(mt.paths))
// Open all files and seek to EOF.
for _, path := range mt.paths {
fs, err := openAndSeekEOF(path, watcher)
if err != nil {
log.Printf("tailer: %s not found, will retry: %v", path, err)
go retryOpen(ctx, path, watcher, reopenCh)
continue
}
files[path] = fs
log.Printf("tailer: watching %s", path)
}
for {
select {
case <-ctx.Done():
for _, fs := range files {
fs.f.Close()
}
return
case msg, ok := <-reopenCh:
if !ok {
return
}
files[msg.path] = &fileState{f: msg.f, reader: bufio.NewReader(msg.f)}
if err := watcher.Add(msg.path); err != nil {
log.Printf("tailer: watcher re-add failed for %s: %v", msg.path, err)
}
log.Printf("tailer: re-opened %s after rotation", msg.path)
case event, ok := <-watcher.Events:
if !ok {
return
}
fs, known := files[event.Name]
if !known {
continue
}
if event.Has(fsnotify.Write) {
mt.readLines(fs.reader)
}
if event.Has(fsnotify.Rename) || event.Has(fsnotify.Remove) {
// Drain remaining bytes in the old fd before it disappears.
mt.readLines(fs.reader)
fs.f.Close()
delete(files, event.Name)
_ = watcher.Remove(event.Name)
go retryOpen(ctx, event.Name, watcher, reopenCh)
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Printf("tailer: watcher error: %v", err)
}
}
}
// openAndSeekEOF opens path, seeks to EOF, and registers it with watcher.
func openAndSeekEOF(path string, watcher *fsnotify.Watcher) (*fileState, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
if _, err := f.Seek(0, io.SeekEnd); err != nil {
f.Close()
return nil, err
}
if err := watcher.Add(path); err != nil {
f.Close()
return nil, err
}
return &fileState{f: f, reader: bufio.NewReader(f)}, nil
}
// retryOpen polls until path exists again (after log rotation), then sends
// the open file back on ch. Exits if ctx is cancelled.
func retryOpen(ctx context.Context, path string, watcher *fsnotify.Watcher, ch chan<- reopenMsg) {
backoff := 100 * time.Millisecond
const maxBackoff = 5 * time.Second
for {
select {
case <-ctx.Done():
return
case <-time.After(backoff):
}
f, err := os.Open(path)
if err != nil {
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
continue
}
ch <- reopenMsg{path: path, f: f}
return
}
}
// readLines reads all complete lines currently available and emits records.
func (mt *MultiTailer) readLines(reader *bufio.Reader) {
for {
line, err := reader.ReadString('\n')
if len(line) > 0 {
l := line
if l[len(l)-1] == '\n' {
l = l[:len(l)-1]
}
if len(l) > 0 && l[len(l)-1] == '\r' {
l = l[:len(l)-1]
}
if rec, ok := ParseLine(l, mt.v4bits, mt.v6bits); ok {
select {
case mt.ch <- rec:
default:
// Channel full — drop rather than block the event loop.
}
}
}
if err != nil {
return
}
}
}

View File

@@ -0,0 +1,178 @@
package main
import (
"context"
"fmt"
"os"
"path/filepath"
"testing"
"time"
)
func writeLine(t *testing.T, f *os.File, website string) {
t.Helper()
_, err := fmt.Fprintf(f, "%s\t1.2.3.4\t0\tGET\t/path\t200\t0\t0.001\n", website)
if err != nil {
t.Fatalf("writeLine: %v", err)
}
}
func TestMultiTailerReadsLines(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "access.log")
f, err := os.Create(path)
if err != nil {
t.Fatal(err)
}
defer f.Close()
ch := make(chan LogRecord, 100)
mt := NewMultiTailer([]string{path}, 24, 48, ch)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go mt.Run(ctx)
// Give the tailer time to open and seek to EOF.
time.Sleep(50 * time.Millisecond)
writeLine(t, f, "www.example.com")
writeLine(t, f, "api.example.com")
received := collectN(t, ch, 2, 2*time.Second)
websites := map[string]bool{}
for _, r := range received {
websites[r.Website] = true
}
if !websites["www.example.com"] || !websites["api.example.com"] {
t.Errorf("unexpected records: %v", received)
}
}
func TestMultiTailerMultipleFiles(t *testing.T) {
dir := t.TempDir()
const numFiles = 5
files := make([]*os.File, numFiles)
paths := make([]string, numFiles)
for i := range files {
p := filepath.Join(dir, fmt.Sprintf("access%d.log", i))
paths[i] = p
f, err := os.Create(p)
if err != nil {
t.Fatal(err)
}
defer f.Close()
files[i] = f
}
ch := make(chan LogRecord, 200)
mt := NewMultiTailer(paths, 24, 48, ch)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go mt.Run(ctx)
time.Sleep(50 * time.Millisecond)
// Write one line per file
for i, f := range files {
writeLine(t, f, fmt.Sprintf("site%d.com", i))
}
received := collectN(t, ch, numFiles, 2*time.Second)
if len(received) != numFiles {
t.Errorf("got %d records, want %d", len(received), numFiles)
}
}
func TestMultiTailerLogRotation(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "access.log")
f, err := os.Create(path)
if err != nil {
t.Fatal(err)
}
ch := make(chan LogRecord, 100)
mt := NewMultiTailer([]string{path}, 24, 48, ch)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go mt.Run(ctx)
time.Sleep(50 * time.Millisecond)
// Write a line to the original file
writeLine(t, f, "before.rotation.com")
collectN(t, ch, 1, 2*time.Second)
// Simulate logrotate: rename the old file, create a new one
rotated := filepath.Join(dir, "access.log.1")
f.Close()
if err := os.Rename(path, rotated); err != nil {
t.Fatal(err)
}
// Give the tailer a moment to detect the rename and start retrying
time.Sleep(50 * time.Millisecond)
// Create the new log file (as nginx would after logrotate)
newF, err := os.Create(path)
if err != nil {
t.Fatal(err)
}
defer newF.Close()
// Allow retry goroutine to pick it up
time.Sleep(300 * time.Millisecond)
writeLine(t, newF, "after.rotation.com")
received := collectN(t, ch, 1, 3*time.Second)
if len(received) == 0 || received[0].Website != "after.rotation.com" {
t.Errorf("expected after.rotation.com, got %v", received)
}
}
func TestExpandGlobs(t *testing.T) {
dir := t.TempDir()
for _, name := range []string{"a.log", "b.log", "other.txt"} {
f, _ := os.Create(filepath.Join(dir, name))
f.Close()
}
pattern := filepath.Join(dir, "*.log")
paths := expandGlobs([]string{pattern})
if len(paths) != 2 {
t.Errorf("glob expanded to %d paths, want 2: %v", len(paths), paths)
}
}
func TestExpandGlobsDeduplication(t *testing.T) {
dir := t.TempDir()
p := filepath.Join(dir, "access.log")
f, _ := os.Create(p)
f.Close()
// Same file listed twice via explicit path and glob
paths := expandGlobs([]string{p, filepath.Join(dir, "*.log")})
if len(paths) != 1 {
t.Errorf("expected 1 deduplicated path, got %d: %v", len(paths), paths)
}
}
// collectN reads exactly n records from ch within timeout, or returns what it got.
func collectN(t *testing.T, ch <-chan LogRecord, n int, timeout time.Duration) []LogRecord {
t.Helper()
var records []LogRecord
deadline := time.After(timeout)
for len(records) < n {
select {
case r := <-ch:
records = append(records, r)
case <-deadline:
t.Logf("collectN: timeout waiting for record %d/%d", len(records)+1, n)
return records
}
}
return records
}