Files
nginx-logtail/cmd/aggregator/aggregator_test.go

426 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"context"
"fmt"
"net"
"sync"
"testing"
"time"
st "git.ipng.ch/ipng/nginx-logtail/internal/store"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// --- Merger tests ---
func makeSnap(source string, entries map[string]int64) *pb.Snapshot {
snap := &pb.Snapshot{Source: source, Timestamp: time.Now().Unix()}
for label, count := range entries {
snap.Entries = append(snap.Entries, &pb.TopNEntry{Label: label, Count: count})
}
return snap
}
func TestMergerApply(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{"a": 10, "b": 20}))
m.Apply(makeSnap("c2", map[string]int64{"a": 5, "c": 15}))
top := m.TopK(10)
totals := map[string]int64{}
for _, e := range top {
totals[e.Label] = e.Count
}
if totals["a"] != 15 { // 10 + 5
t.Errorf("a = %d, want 15", totals["a"])
}
if totals["b"] != 20 {
t.Errorf("b = %d, want 20", totals["b"])
}
if totals["c"] != 15 {
t.Errorf("c = %d, want 15", totals["c"])
}
}
func TestMergerReplacement(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{"a": 100}))
// Second snapshot from same collector replaces the first, not adds.
m.Apply(makeSnap("c1", map[string]int64{"a": 50, "b": 30}))
top := m.TopK(10)
totals := map[string]int64{}
for _, e := range top {
totals[e.Label] = e.Count
}
if totals["a"] != 50 {
t.Errorf("a = %d, want 50 (not 150)", totals["a"])
}
if totals["b"] != 30 {
t.Errorf("b = %d, want 30", totals["b"])
}
}
func TestMergerZero(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{"a": 100}))
m.Apply(makeSnap("c2", map[string]int64{"a": 50}))
m.Zero("c1")
top := m.TopK(10)
if len(top) != 1 || top[0].Label != "a" || top[0].Count != 50 {
t.Errorf("after Zero(c1): %v", top)
}
}
func TestMergerZeroNonexistent(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{"a": 10}))
// Zeroing an unknown addr should not panic.
m.Zero("unknown")
top := m.TopK(10)
if len(top) != 1 || top[0].Count != 10 {
t.Errorf("unexpected: %v", top)
}
}
func TestMergerConcurrent(t *testing.T) {
m := NewMerger()
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
addr := fmt.Sprintf("c%d", i%3)
for j := 0; j < 100; j++ {
m.Apply(makeSnap(addr, map[string]int64{"x": int64(j)}))
}
}(i)
}
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 30; i++ {
m.Zero(fmt.Sprintf("c%d", i%3))
}
}()
wg.Wait()
// No race, no panic — the race detector will catch issues if run with -race.
}
// --- Cache tests ---
func TestCacheRotation(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{"a": 100, "b": 50}))
cache := NewCache(m, "test")
cache.rotate(time.Now())
cache.mu.RLock()
defer cache.mu.RUnlock()
if cache.fineFilled != 1 {
t.Fatalf("fineFilled = %d, want 1", cache.fineFilled)
}
snap := cache.fineRing[(cache.fineHead-1+st.FineRingSize)%st.FineRingSize]
if len(snap.Entries) != 2 {
t.Fatalf("got %d entries, want 2", len(snap.Entries))
}
if snap.Entries[0].Count != 100 {
t.Errorf("top count = %d, want 100", snap.Entries[0].Count)
}
}
func TestCacheCoarseRing(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{"a": 10}))
cache := NewCache(m, "test")
now := time.Now()
for i := 0; i < st.CoarseEvery; i++ {
cache.rotate(now.Add(time.Duration(i) * time.Minute))
}
cache.mu.RLock()
defer cache.mu.RUnlock()
if cache.coarseFilled != 1 {
t.Fatalf("coarseFilled = %d, want 1", cache.coarseFilled)
}
coarse := cache.coarseRing[(cache.coarseHead-1+st.CoarseRingSize)%st.CoarseRingSize]
if len(coarse.Entries) == 0 {
t.Fatal("coarse snapshot is empty")
}
// 5 fine ticks × 10 counts = 50
if coarse.Entries[0].Count != 50 {
t.Errorf("coarse count = %d, want 50", coarse.Entries[0].Count)
}
}
func TestCacheQueryTopN(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{
st.EncodeTuple(st.Tuple4{"busy.com", "1.0.0.0/24", "/", "200"}): 300,
st.EncodeTuple(st.Tuple4{"quiet.com", "2.0.0.0/24", "/", "200"}): 50,
}))
cache := NewCache(m, "test")
cache.rotate(time.Now())
entries := cache.QueryTopN(nil, pb.GroupBy_WEBSITE, 2, pb.Window_W1M)
if len(entries) != 2 {
t.Fatalf("got %d entries, want 2", len(entries))
}
if entries[0].Label != "busy.com" || entries[0].Count != 300 {
t.Errorf("top = %+v, want {busy.com 300}", entries[0])
}
}
func TestCacheQueryTopNWithFilter(t *testing.T) {
m := NewMerger()
status429 := st.EncodeTuple(st.Tuple4{"example.com", "1.0.0.0/24", "/api", "429"})
status200 := st.EncodeTuple(st.Tuple4{"example.com", "2.0.0.0/24", "/api", "200"})
m.Apply(makeSnap("c1", map[string]int64{status429: 200, status200: 500}))
cache := NewCache(m, "test")
cache.rotate(time.Now())
f429 := int32(429)
entries := cache.QueryTopN(&pb.Filter{HttpResponse: &f429}, pb.GroupBy_WEBSITE, 10, pb.Window_W1M)
if len(entries) != 1 || entries[0].Label != "example.com" || entries[0].Count != 200 {
t.Errorf("filtered result: %v", entries)
}
}
func TestCacheQueryTrend(t *testing.T) {
m := NewMerger()
cache := NewCache(m, "test")
now := time.Now()
for i, count := range []int64{10, 20, 30} {
m.Apply(makeSnap("c1", map[string]int64{
st.EncodeTuple(st.Tuple4{"x.com", "1.0.0.0/24", "/", "200"}): count,
}))
cache.rotate(now.Add(time.Duration(i) * time.Minute))
}
points := cache.QueryTrend(nil, pb.Window_W5M)
if len(points) != 3 {
t.Fatalf("got %d points, want 3", len(points))
}
if points[0].Count != 10 || points[1].Count != 20 || points[2].Count != 30 {
t.Errorf("counts: %v %v %v", points[0].Count, points[1].Count, points[2].Count)
}
}
func TestCacheSubscribe(t *testing.T) {
m := NewMerger()
m.Apply(makeSnap("c1", map[string]int64{"x": 5}))
cache := NewCache(m, "test")
ch := cache.Subscribe()
cache.rotate(time.Now())
select {
case snap := <-ch:
if len(snap.Entries) == 0 {
t.Error("received empty snapshot")
}
case <-time.After(time.Second):
t.Fatal("no snapshot received")
}
cache.Unsubscribe(ch)
}
// --- gRPC end-to-end test ---
// fakeCollector is an in-process gRPC collector that streams a fixed set of
// snapshots then blocks until the context is cancelled.
type fakeCollector struct {
pb.UnimplementedLogtailServiceServer
snaps []*pb.Snapshot
}
func (f *fakeCollector) StreamSnapshots(_ *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error {
for _, s := range f.snaps {
if err := stream.Send(s); err != nil {
return err
}
}
<-stream.Context().Done()
return nil
}
func startFakeCollector(t *testing.T, snaps []*pb.Snapshot) string {
t.Helper()
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
srv := grpc.NewServer()
pb.RegisterLogtailServiceServer(srv, &fakeCollector{snaps: snaps})
go srv.Serve(lis)
t.Cleanup(srv.Stop)
return lis.Addr().String()
}
func TestGRPCEndToEnd(t *testing.T) {
// Two fake collectors with overlapping labels.
snap1 := makeSnap("col1", map[string]int64{
st.EncodeTuple(st.Tuple4{"busy.com", "1.0.0.0/24", "/", "200"}): 500,
st.EncodeTuple(st.Tuple4{"quiet.com", "2.0.0.0/24", "/", "429"}): 100,
})
snap2 := makeSnap("col2", map[string]int64{
st.EncodeTuple(st.Tuple4{"busy.com", "3.0.0.0/24", "/", "200"}): 300,
st.EncodeTuple(st.Tuple4{"other.com", "4.0.0.0/24", "/", "200"}): 50,
})
addr1 := startFakeCollector(t, []*pb.Snapshot{snap1})
addr2 := startFakeCollector(t, []*pb.Snapshot{snap2})
// Start aggregator components.
merger := NewMerger()
cache := NewCache(merger, "agg-test")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go NewCollectorSub(addr1, merger).Run(ctx)
go NewCollectorSub(addr2, merger).Run(ctx)
// Wait for both snapshots to be applied.
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
top := merger.TopK(1)
if len(top) > 0 && top[0].Count >= 800 { // busy.com: 500+300
break
}
time.Sleep(10 * time.Millisecond)
}
// Rotate the cache so the data is queryable.
cache.rotate(time.Now())
// Start a real gRPC server in front of the cache.
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
grpcSrv := grpc.NewServer()
pb.RegisterLogtailServiceServer(grpcSrv, NewServer(cache, "agg-test"))
go grpcSrv.Serve(lis)
defer grpcSrv.Stop()
conn, err := grpc.NewClient(lis.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatal(err)
}
defer conn.Close()
client := pb.NewLogtailServiceClient(conn)
qctx, qcancel := context.WithTimeout(context.Background(), 5*time.Second)
defer qcancel()
// TopN — busy.com should have 800 (500+300) across both collectors.
resp, err := client.TopN(qctx, &pb.TopNRequest{
GroupBy: pb.GroupBy_WEBSITE,
N: 5,
Window: pb.Window_W1M,
})
if err != nil {
t.Fatalf("TopN: %v", err)
}
if len(resp.Entries) == 0 {
t.Fatal("TopN returned no entries")
}
if resp.Entries[0].Label != "busy.com" || resp.Entries[0].Count != 800 {
t.Errorf("top entry = %+v, want {busy.com 800}", resp.Entries[0])
}
t.Logf("TopN: %v", resp.Entries)
// Filtered TopN — only 429s: quiet.com=100.
f429 := int32(429)
resp, err = client.TopN(qctx, &pb.TopNRequest{
Filter: &pb.Filter{HttpResponse: &f429},
GroupBy: pb.GroupBy_WEBSITE,
N: 5,
Window: pb.Window_W1M,
})
if err != nil {
t.Fatalf("TopN filtered: %v", err)
}
if len(resp.Entries) != 1 || resp.Entries[0].Label != "quiet.com" {
t.Errorf("filtered: %v", resp.Entries)
}
// Trend.
tresp, err := client.Trend(qctx, &pb.TrendRequest{Window: pb.Window_W5M})
if err != nil {
t.Fatalf("Trend: %v", err)
}
if len(tresp.Points) != 1 || tresp.Points[0].Count != 950 { // 500+100+300+50
t.Errorf("trend: %v", tresp.Points)
}
t.Logf("Trend: %v", tresp.Points)
// StreamSnapshots — trigger a rotation and verify we receive a snapshot.
streamCtx, streamCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer streamCancel()
stream, err := client.StreamSnapshots(streamCtx, &pb.SnapshotRequest{})
if err != nil {
t.Fatalf("StreamSnapshots: %v", err)
}
// Wait for the server goroutine to call cache.Subscribe() before rotating.
time.Sleep(50 * time.Millisecond)
cache.rotate(time.Now()) // trigger a broadcast
snap, err := stream.Recv()
if err != nil {
t.Fatalf("stream.Recv: %v", err)
}
if snap.Source != "agg-test" {
t.Errorf("source = %q, want agg-test", snap.Source)
}
t.Logf("StreamSnapshots: %d entries from %s", len(snap.Entries), snap.Source)
}
func TestDegradedCollector(t *testing.T) {
// Start one real and one immediately-gone collector.
snap1 := makeSnap("col1", map[string]int64{
st.EncodeTuple(st.Tuple4{"good.com", "1.0.0.0/24", "/", "200"}): 100,
})
addr1 := startFakeCollector(t, []*pb.Snapshot{snap1})
// addr2 points at nothing — connections will fail immediately.
addr2 := "127.0.0.1:1" // port 1 is always refused
merger := NewMerger()
cache := NewCache(merger, "test")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go NewCollectorSub(addr1, merger).Run(ctx)
go NewCollectorSub(addr2, merger).Run(ctx)
// Wait for col1's data to appear.
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
top := merger.TopK(1)
if len(top) > 0 {
break
}
time.Sleep(10 * time.Millisecond)
}
cache.rotate(time.Now())
// Results should reflect col1 only.
entries := cache.QueryTopN(nil, pb.GroupBy_WEBSITE, 5, pb.Window_W1M)
if len(entries) == 0 {
t.Fatal("no entries despite col1 being healthy")
}
if entries[0].Label != "good.com" {
t.Errorf("top = %q, want good.com", entries[0].Label)
}
t.Logf("degraded test: got %d entries, top = %s %d", len(entries), entries[0].Label, entries[0].Count)
}