package main import ( "context" "io" "log" "sort" "time" st "git.ipng.ch/ipng/nginx-logtail/internal/store" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" ) // Backfill calls DumpSnapshots on all collectors concurrently, merges their // data per timestamp, and loads the result into the cache. It blocks until all // collectors have responded or the context is cancelled. func Backfill(ctx context.Context, collectorAddrs []string, cache *Cache) { type result struct { fine []st.Snapshot coarse []st.Snapshot } ch := make(chan result, len(collectorAddrs)) for _, addr := range collectorAddrs { addr := addr go func() { start := time.Now() fine, coarse, err := dumpCollector(ctx, addr) if err != nil { if status.Code(err) == codes.Unimplemented { log.Printf("backfill: %s: collector does not support DumpSnapshots (old binary), skipping", addr) } else { log.Printf("backfill: %s: failed after %s: %v", addr, time.Since(start).Round(time.Millisecond), err) } ch <- result{} return } var fineEntries, coarseEntries int for _, s := range fine { fineEntries += len(s.Entries) } for _, s := range coarse { coarseEntries += len(s.Entries) } log.Printf("backfill: %s: %d fine buckets (%d entries) + %d coarse buckets (%d entries) in %s", addr, len(fine), fineEntries, len(coarse), coarseEntries, time.Since(start).Round(time.Millisecond)) ch <- result{fine, coarse} }() } // Collect per-timestamp maps: unix-minute → label → total count. fineByTS := make(map[int64]map[string]int64) coarseByTS := make(map[int64]map[string]int64) for range collectorAddrs { r := <-ch mergeDump(r.fine, fineByTS) mergeDump(r.coarse, coarseByTS) } mergeStart := time.Now() fine := buildSnapshots(fineByTS, st.FineTopK, st.FineRingSize) coarse := buildSnapshots(coarseByTS, st.CoarseTopK, st.CoarseRingSize) log.Printf("backfill: merge+topk took %s", time.Since(mergeStart).Round(time.Microsecond)) if len(fine)+len(coarse) == 0 { log.Printf("backfill: no data received from any collector") return } loadStart := time.Now() cache.LoadHistorical(fine, coarse) log.Printf("backfill: loaded %d fine + %d coarse buckets in %s", len(fine), len(coarse), time.Since(loadStart).Round(time.Microsecond)) } // dumpCollector calls DumpSnapshots on one collector and returns the fine and // coarse ring snapshots as separate slices. func dumpCollector(ctx context.Context, addr string) (fine, coarse []st.Snapshot, err error) { conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, nil, err } defer conn.Close() client := pb.NewLogtailServiceClient(conn) stream, err := client.DumpSnapshots(ctx, &pb.DumpSnapshotsRequest{}) if err != nil { return nil, nil, err } for { msg, err := stream.Recv() if err == io.EOF { return fine, coarse, nil } if err != nil { return fine, coarse, err } snap := st.Snapshot{ Timestamp: time.Unix(msg.Timestamp, 0), Entries: pbEntriesToStore(msg.Entries), } if msg.IsCoarse { coarse = append(coarse, snap) } else { fine = append(fine, snap) } } } // mergeDump adds all snapshots from one collector's dump into the per-timestamp // accumulator map. Multiple collectors' entries for the same timestamp are summed. func mergeDump(snaps []st.Snapshot, byTS map[int64]map[string]int64) { for _, snap := range snaps { ts := snap.Timestamp.Unix() m := byTS[ts] if m == nil { m = make(map[string]int64, len(snap.Entries)) byTS[ts] = m } for _, e := range snap.Entries { m[e.Label] += e.Count } } } // buildSnapshots sorts the per-timestamp map chronologically, runs TopK on each // bucket, and returns a slice capped to ringSize oldest-first snapshots. func buildSnapshots(byTS map[int64]map[string]int64, topK, ringSize int) []st.Snapshot { if len(byTS) == 0 { return nil } timestamps := make([]int64, 0, len(byTS)) for ts := range byTS { timestamps = append(timestamps, ts) } sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] }) // Keep only the most recent ringSize buckets. if len(timestamps) > ringSize { timestamps = timestamps[len(timestamps)-ringSize:] } snaps := make([]st.Snapshot, len(timestamps)) for i, ts := range timestamps { snaps[i] = st.Snapshot{ Timestamp: time.Unix(ts, 0), Entries: st.TopKFromMap(byTS[ts], topK), } } return snaps } func pbEntriesToStore(entries []*pb.TopNEntry) []st.Entry { out := make([]st.Entry, len(entries)) for i, e := range entries { out[i] = st.Entry{Label: e.Label, Count: e.Count} } return out }