Files
nginx-logtail/cmd/collector/store.go

196 lines
4.9 KiB
Go

package main
import (
"sync"
"time"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
st "git.ipng.ch/ipng/nginx-logtail/internal/store"
)
const liveMapCap = 100_000 // hard cap on live map entries
// Store holds the live map and both ring buffers.
type Store struct {
source string
// live map — written only by the Run goroutine; no locking needed on writes
live map[st.Tuple4]int64
liveLen int
// ring buffers — protected by mu for reads
mu sync.RWMutex
fineRing [st.FineRingSize]st.Snapshot
fineHead int
fineFilled int
coarseRing [st.CoarseRingSize]st.Snapshot
coarseHead int
coarseFilled int
fineTick int
// fan-out to StreamSnapshots subscribers
subMu sync.Mutex
subs map[chan st.Snapshot]struct{}
}
func NewStore(source string) *Store {
return &Store{
source: source,
live: make(map[st.Tuple4]int64, liveMapCap),
subs: make(map[chan st.Snapshot]struct{}),
}
}
// ingest records one log record into the live map.
// Must only be called from the Run goroutine.
func (s *Store) ingest(r LogRecord) {
key := st.Tuple4{Website: r.Website, Prefix: r.ClientPrefix, URI: r.URI, Status: r.Status}
if _, exists := s.live[key]; !exists {
if s.liveLen >= liveMapCap {
return
}
s.liveLen++
}
s.live[key]++
}
// rotate snapshots the live map into the fine ring and, every CoarseEvery ticks,
// also merges into the coarse ring. Called once per minute by Run.
func (s *Store) rotate(now time.Time) {
fine := st.TopKFromTupleMap(s.live, st.FineTopK, now)
s.mu.Lock()
s.fineRing[s.fineHead] = fine
s.fineHead = (s.fineHead + 1) % st.FineRingSize
if s.fineFilled < st.FineRingSize {
s.fineFilled++
}
s.fineTick++
if s.fineTick >= st.CoarseEvery {
s.fineTick = 0
coarse := s.mergeFineBuckets(now)
s.coarseRing[s.coarseHead] = coarse
s.coarseHead = (s.coarseHead + 1) % st.CoarseRingSize
if s.coarseFilled < st.CoarseRingSize {
s.coarseFilled++
}
}
s.mu.Unlock()
s.live = make(map[st.Tuple4]int64, liveMapCap)
s.liveLen = 0
s.broadcast(fine)
}
func (s *Store) mergeFineBuckets(now time.Time) st.Snapshot {
merged := make(map[string]int64)
count := min(st.CoarseEvery, s.fineFilled)
for i := 0; i < count; i++ {
idx := (s.fineHead - 1 - i + st.FineRingSize) % st.FineRingSize
for _, e := range s.fineRing[idx].Entries {
merged[e.Label] += e.Count
}
}
return st.Snapshot{Timestamp: now, Entries: st.TopKFromMap(merged, st.CoarseTopK)}
}
// QueryTopN answers a TopN request from the ring buffers.
func (s *Store) QueryTopN(filter *pb.Filter, groupBy pb.GroupBy, n int, window pb.Window) []st.Entry {
cf := st.CompileFilter(filter)
s.mu.RLock()
defer s.mu.RUnlock()
buckets, count := st.BucketsForWindow(window, s.fineView(), s.coarseView(), s.fineFilled, s.coarseFilled)
grouped := make(map[string]int64)
for i := 0; i < count; i++ {
idx := (buckets.Head - 1 - i + buckets.Size) % buckets.Size
for _, e := range buckets.Ring[idx].Entries {
t := st.LabelTuple(e.Label)
if !st.MatchesFilter(t, cf) {
continue
}
grouped[st.DimensionLabel(t, groupBy)] += e.Count
}
}
return st.TopKFromMap(grouped, n)
}
// QueryTrend answers a Trend request from the ring buffers.
func (s *Store) QueryTrend(filter *pb.Filter, window pb.Window) []st.TrendPoint {
cf := st.CompileFilter(filter)
s.mu.RLock()
defer s.mu.RUnlock()
buckets, count := st.BucketsForWindow(window, s.fineView(), s.coarseView(), s.fineFilled, s.coarseFilled)
points := make([]st.TrendPoint, count)
for i := 0; i < count; i++ {
idx := (buckets.Head - count + i + buckets.Size) % buckets.Size
snap := buckets.Ring[idx]
var total int64
for _, e := range snap.Entries {
if st.MatchesFilter(st.LabelTuple(e.Label), cf) {
total += e.Count
}
}
points[i] = st.TrendPoint{Timestamp: snap.Timestamp, Count: total}
}
return points
}
func (s *Store) fineView() st.RingView {
ring := make([]st.Snapshot, st.FineRingSize)
copy(ring, s.fineRing[:])
return st.RingView{Ring: ring, Head: s.fineHead, Size: st.FineRingSize}
}
func (s *Store) coarseView() st.RingView {
ring := make([]st.Snapshot, st.CoarseRingSize)
copy(ring, s.coarseRing[:])
return st.RingView{Ring: ring, Head: s.coarseHead, Size: st.CoarseRingSize}
}
func (s *Store) Subscribe() chan st.Snapshot {
ch := make(chan st.Snapshot, 4)
s.subMu.Lock()
s.subs[ch] = struct{}{}
s.subMu.Unlock()
return ch
}
func (s *Store) Unsubscribe(ch chan st.Snapshot) {
s.subMu.Lock()
delete(s.subs, ch)
s.subMu.Unlock()
close(ch)
}
func (s *Store) broadcast(snap st.Snapshot) {
s.subMu.Lock()
defer s.subMu.Unlock()
for ch := range s.subs {
select {
case ch <- snap:
default:
}
}
}
// Run is the single goroutine that reads from ch, ingests records, and rotates
// the ring buffer every minute. Exits when ch is closed.
func (s *Store) Run(ch <-chan LogRecord) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case r, ok := <-ch:
if !ok {
return
}
s.ingest(r)
case t := <-ticker.C:
s.rotate(t)
}
}
}