Files
nginx-logtail/cmd/aggregator/backfill.go

166 lines
4.9 KiB
Go

package main
import (
"context"
"io"
"log"
"sort"
"time"
st "git.ipng.ch/ipng/nginx-logtail/internal/store"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)
// Backfill calls DumpSnapshots on all collectors concurrently, merges their
// data per timestamp, and loads the result into the cache. It blocks until all
// collectors have responded or the context is cancelled.
func Backfill(ctx context.Context, collectorAddrs []string, cache *Cache) {
type result struct {
fine []st.Snapshot
coarse []st.Snapshot
}
ch := make(chan result, len(collectorAddrs))
for _, addr := range collectorAddrs {
addr := addr
go func() {
start := time.Now()
fine, coarse, err := dumpCollector(ctx, addr)
if err != nil {
if status.Code(err) == codes.Unimplemented {
log.Printf("backfill: %s: collector does not support DumpSnapshots (old binary), skipping", addr)
} else {
log.Printf("backfill: %s: failed after %s: %v", addr, time.Since(start).Round(time.Millisecond), err)
}
ch <- result{}
return
}
var fineEntries, coarseEntries int
for _, s := range fine {
fineEntries += len(s.Entries)
}
for _, s := range coarse {
coarseEntries += len(s.Entries)
}
log.Printf("backfill: %s: %d fine buckets (%d entries) + %d coarse buckets (%d entries) in %s",
addr, len(fine), fineEntries, len(coarse), coarseEntries, time.Since(start).Round(time.Millisecond))
ch <- result{fine, coarse}
}()
}
// Collect per-timestamp maps: unix-minute → label → total count.
fineByTS := make(map[int64]map[string]int64)
coarseByTS := make(map[int64]map[string]int64)
for range collectorAddrs {
r := <-ch
mergeDump(r.fine, fineByTS, time.Minute)
mergeDump(r.coarse, coarseByTS, 5*time.Minute)
}
mergeStart := time.Now()
fine := buildSnapshots(fineByTS, st.FineTopK, st.FineRingSize)
coarse := buildSnapshots(coarseByTS, st.CoarseTopK, st.CoarseRingSize)
log.Printf("backfill: merge+topk took %s", time.Since(mergeStart).Round(time.Microsecond))
if len(fine)+len(coarse) == 0 {
log.Printf("backfill: no data received from any collector")
return
}
loadStart := time.Now()
cache.LoadHistorical(fine, coarse)
log.Printf("backfill: loaded %d fine + %d coarse buckets in %s",
len(fine), len(coarse), time.Since(loadStart).Round(time.Microsecond))
}
// dumpCollector calls DumpSnapshots on one collector and returns the fine and
// coarse ring snapshots as separate slices.
func dumpCollector(ctx context.Context, addr string) (fine, coarse []st.Snapshot, err error) {
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, nil, err
}
defer conn.Close()
client := pb.NewLogtailServiceClient(conn)
stream, err := client.DumpSnapshots(ctx, &pb.DumpSnapshotsRequest{})
if err != nil {
return nil, nil, err
}
for {
msg, err := stream.Recv()
if err == io.EOF {
return fine, coarse, nil
}
if err != nil {
return fine, coarse, err
}
snap := st.Snapshot{
Timestamp: time.Unix(msg.Timestamp, 0),
Entries: pbEntriesToStore(msg.Entries),
}
if msg.IsCoarse {
coarse = append(coarse, snap)
} else {
fine = append(fine, snap)
}
}
}
// mergeDump adds all snapshots from one collector's dump into the per-timestamp
// accumulator map. Multiple collectors' entries for the same timestamp are summed.
// granularity should match the ring bucket size (time.Minute for fine, 5*time.Minute for coarse).
func mergeDump(snaps []st.Snapshot, byTS map[int64]map[string]int64, granularity time.Duration) {
for _, snap := range snaps {
ts := snap.Timestamp.Truncate(granularity).Unix()
m := byTS[ts]
if m == nil {
m = make(map[string]int64, len(snap.Entries))
byTS[ts] = m
}
for _, e := range snap.Entries {
m[e.Label] += e.Count
}
}
}
// buildSnapshots sorts the per-timestamp map chronologically, runs TopK on each
// bucket, and returns a slice capped to ringSize oldest-first snapshots.
func buildSnapshots(byTS map[int64]map[string]int64, topK, ringSize int) []st.Snapshot {
if len(byTS) == 0 {
return nil
}
timestamps := make([]int64, 0, len(byTS))
for ts := range byTS {
timestamps = append(timestamps, ts)
}
sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] })
// Keep only the most recent ringSize buckets.
if len(timestamps) > ringSize {
timestamps = timestamps[len(timestamps)-ringSize:]
}
snaps := make([]st.Snapshot, len(timestamps))
for i, ts := range timestamps {
snaps[i] = st.Snapshot{
Timestamp: time.Unix(ts, 0),
Entries: st.TopKFromMap(byTS[ts], topK),
}
}
return snaps
}
func pbEntriesToStore(entries []*pb.TopNEntry) []st.Entry {
out := make([]st.Entry, len(entries))
for i, e := range entries {
out[i] = st.Entry{Label: e.Label, Count: e.Count}
}
return out
}