package main import ( "sync" st "git.ipng.ch/ipng/nginx-logtail/internal/store" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" ) // Merger maintains per-collector maps and a running merged map using a delta // strategy: on each new snapshot from collector X, subtract X's old entries // and add the new ones. This is O(snapshot_size) rather than // O(N_collectors × snapshot_size). type Merger struct { mu sync.Mutex perCollector map[string]map[string]int64 // source → label → count merged map[string]int64 // label → total across all collectors } func NewMerger() *Merger { return &Merger{ perCollector: make(map[string]map[string]int64), merged: make(map[string]int64), } } // Apply incorporates a snapshot from a collector, replacing that collector's // previous contribution in the merged map. func (m *Merger) Apply(snap *pb.Snapshot) { addr := snap.Source m.mu.Lock() defer m.mu.Unlock() // Subtract the old contribution. for label, count := range m.perCollector[addr] { m.merged[label] -= count if m.merged[label] <= 0 { delete(m.merged, label) } } // Build the new per-collector map and add to merged. newMap := make(map[string]int64, len(snap.Entries)) for _, e := range snap.Entries { newMap[e.Label] += e.Count m.merged[e.Label] += e.Count } m.perCollector[addr] = newMap } // Zero removes a degraded collector's entire contribution from the merged map. func (m *Merger) Zero(addr string) { m.mu.Lock() defer m.mu.Unlock() for label, count := range m.perCollector[addr] { m.merged[label] -= count if m.merged[label] <= 0 { delete(m.merged, label) } } delete(m.perCollector, addr) } // TopK returns the top-k entries from the current merged view. func (m *Merger) TopK(k int) []st.Entry { m.mu.Lock() defer m.mu.Unlock() return st.TopKFromMap(m.merged, k) }