Add aggregator backfill, pulling fine+coarse buckets from collectors
This commit is contained in:
162
cmd/aggregator/backfill.go
Normal file
162
cmd/aggregator/backfill.go
Normal file
@@ -0,0 +1,162 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"log"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
st "git.ipng.ch/ipng/nginx-logtail/internal/store"
|
||||
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// Backfill calls DumpSnapshots on all collectors concurrently, merges their
|
||||
// data per timestamp, and loads the result into the cache. It blocks until all
|
||||
// collectors have responded or the context is cancelled.
|
||||
func Backfill(ctx context.Context, collectorAddrs []string, cache *Cache) {
|
||||
type result struct {
|
||||
fine []st.Snapshot
|
||||
coarse []st.Snapshot
|
||||
}
|
||||
|
||||
ch := make(chan result, len(collectorAddrs))
|
||||
for _, addr := range collectorAddrs {
|
||||
addr := addr
|
||||
go func() {
|
||||
start := time.Now()
|
||||
fine, coarse, err := dumpCollector(ctx, addr)
|
||||
if err != nil {
|
||||
if status.Code(err) == codes.Unimplemented {
|
||||
log.Printf("backfill: %s: collector does not support DumpSnapshots (old binary), skipping", addr)
|
||||
} else {
|
||||
log.Printf("backfill: %s: failed after %s: %v", addr, time.Since(start).Round(time.Millisecond), err)
|
||||
}
|
||||
ch <- result{}
|
||||
return
|
||||
}
|
||||
var fineEntries, coarseEntries int
|
||||
for _, s := range fine {
|
||||
fineEntries += len(s.Entries)
|
||||
}
|
||||
for _, s := range coarse {
|
||||
coarseEntries += len(s.Entries)
|
||||
}
|
||||
log.Printf("backfill: %s: %d fine buckets (%d entries) + %d coarse buckets (%d entries) in %s",
|
||||
addr, len(fine), fineEntries, len(coarse), coarseEntries, time.Since(start).Round(time.Millisecond))
|
||||
ch <- result{fine, coarse}
|
||||
}()
|
||||
}
|
||||
|
||||
// Collect per-timestamp maps: unix-minute → label → total count.
|
||||
fineByTS := make(map[int64]map[string]int64)
|
||||
coarseByTS := make(map[int64]map[string]int64)
|
||||
|
||||
for range collectorAddrs {
|
||||
r := <-ch
|
||||
mergeDump(r.fine, fineByTS)
|
||||
mergeDump(r.coarse, coarseByTS)
|
||||
}
|
||||
|
||||
fine := buildSnapshots(fineByTS, st.FineTopK, st.FineRingSize)
|
||||
coarse := buildSnapshots(coarseByTS, st.CoarseTopK, st.CoarseRingSize)
|
||||
|
||||
if len(fine)+len(coarse) == 0 {
|
||||
log.Printf("backfill: no data received from any collector")
|
||||
return
|
||||
}
|
||||
loadStart := time.Now()
|
||||
cache.LoadHistorical(fine, coarse)
|
||||
log.Printf("backfill: loaded %d fine + %d coarse buckets in %s total",
|
||||
len(fine), len(coarse), time.Since(loadStart).Round(time.Microsecond))
|
||||
}
|
||||
|
||||
// dumpCollector calls DumpSnapshots on one collector and returns the fine and
|
||||
// coarse ring snapshots as separate slices.
|
||||
func dumpCollector(ctx context.Context, addr string) (fine, coarse []st.Snapshot, err error) {
|
||||
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := pb.NewLogtailServiceClient(conn)
|
||||
stream, err := client.DumpSnapshots(ctx, &pb.DumpSnapshotsRequest{})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
for {
|
||||
msg, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
return fine, coarse, nil
|
||||
}
|
||||
if err != nil {
|
||||
return fine, coarse, err
|
||||
}
|
||||
snap := st.Snapshot{
|
||||
Timestamp: time.Unix(msg.Timestamp, 0),
|
||||
Entries: pbEntriesToStore(msg.Entries),
|
||||
}
|
||||
if msg.IsCoarse {
|
||||
coarse = append(coarse, snap)
|
||||
} else {
|
||||
fine = append(fine, snap)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// mergeDump adds all snapshots from one collector's dump into the per-timestamp
|
||||
// accumulator map. Multiple collectors' entries for the same timestamp are summed.
|
||||
func mergeDump(snaps []st.Snapshot, byTS map[int64]map[string]int64) {
|
||||
for _, snap := range snaps {
|
||||
ts := snap.Timestamp.Unix()
|
||||
m := byTS[ts]
|
||||
if m == nil {
|
||||
m = make(map[string]int64, len(snap.Entries))
|
||||
byTS[ts] = m
|
||||
}
|
||||
for _, e := range snap.Entries {
|
||||
m[e.Label] += e.Count
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// buildSnapshots sorts the per-timestamp map chronologically, runs TopK on each
|
||||
// bucket, and returns a slice capped to ringSize oldest-first snapshots.
|
||||
func buildSnapshots(byTS map[int64]map[string]int64, topK, ringSize int) []st.Snapshot {
|
||||
if len(byTS) == 0 {
|
||||
return nil
|
||||
}
|
||||
timestamps := make([]int64, 0, len(byTS))
|
||||
for ts := range byTS {
|
||||
timestamps = append(timestamps, ts)
|
||||
}
|
||||
sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] })
|
||||
|
||||
// Keep only the most recent ringSize buckets.
|
||||
if len(timestamps) > ringSize {
|
||||
timestamps = timestamps[len(timestamps)-ringSize:]
|
||||
}
|
||||
|
||||
snaps := make([]st.Snapshot, len(timestamps))
|
||||
for i, ts := range timestamps {
|
||||
snaps[i] = st.Snapshot{
|
||||
Timestamp: time.Unix(ts, 0),
|
||||
Entries: st.TopKFromMap(byTS[ts], topK),
|
||||
}
|
||||
}
|
||||
return snaps
|
||||
}
|
||||
|
||||
func pbEntriesToStore(entries []*pb.TopNEntry) []st.Entry {
|
||||
out := make([]st.Entry, len(entries))
|
||||
for i, e := range entries {
|
||||
out[i] = st.Entry{Label: e.Label, Count: e.Count}
|
||||
}
|
||||
return out
|
||||
}
|
||||
@@ -90,6 +90,26 @@ func (c *Cache) mergeFineBuckets(now time.Time) st.Snapshot {
|
||||
return st.Snapshot{Timestamp: now, Entries: st.TopKFromMap(merged, st.CoarseTopK)}
|
||||
}
|
||||
|
||||
// LoadHistorical pre-populates the ring buffers from backfill data before live
|
||||
// streaming begins. fine and coarse must be sorted oldest-first; each slice
|
||||
// must not exceed the respective ring size. Called once at startup, before Run.
|
||||
func (c *Cache) LoadHistorical(fine, coarse []st.Snapshot) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
for i, snap := range fine {
|
||||
c.fineRing[i] = snap
|
||||
}
|
||||
c.fineFilled = len(fine)
|
||||
c.fineHead = len(fine) % st.FineRingSize
|
||||
|
||||
for i, snap := range coarse {
|
||||
c.coarseRing[i] = snap
|
||||
}
|
||||
c.coarseFilled = len(coarse)
|
||||
c.coarseHead = len(coarse) % st.CoarseRingSize
|
||||
}
|
||||
|
||||
// QueryTopN answers a TopN request from the ring buffers.
|
||||
func (c *Cache) QueryTopN(filter *pb.Filter, groupBy pb.GroupBy, n int, window pb.Window) []st.Entry {
|
||||
cf := st.CompileFilter(filter)
|
||||
|
||||
@@ -38,13 +38,6 @@ func main() {
|
||||
merger := NewMerger()
|
||||
cache := NewCache(merger, *source)
|
||||
registry := NewTargetRegistry(collectorAddrs)
|
||||
go cache.Run(ctx)
|
||||
|
||||
for _, addr := range collectorAddrs {
|
||||
sub := NewCollectorSub(addr, merger, registry)
|
||||
go sub.Run(ctx)
|
||||
log.Printf("aggregator: subscribing to collector %s", addr)
|
||||
}
|
||||
|
||||
lis, err := net.Listen("tcp", *listen)
|
||||
if err != nil {
|
||||
@@ -60,6 +53,17 @@ func main() {
|
||||
}
|
||||
}()
|
||||
|
||||
go cache.Run(ctx)
|
||||
|
||||
for _, addr := range collectorAddrs {
|
||||
sub := NewCollectorSub(addr, merger, registry)
|
||||
go sub.Run(ctx)
|
||||
log.Printf("aggregator: subscribing to collector %s", addr)
|
||||
}
|
||||
|
||||
log.Printf("aggregator: backfilling from %d collector(s)", len(collectorAddrs))
|
||||
go Backfill(ctx, collectorAddrs, cache)
|
||||
|
||||
<-ctx.Done()
|
||||
log.Printf("aggregator: shutting down")
|
||||
grpcServer.GracefulStop()
|
||||
|
||||
Reference in New Issue
Block a user