Files

311 lines
8.0 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package store provides the shared ring-buffer, label-encoding and query
// helpers used by both the collector and the aggregator.
package store
import (
"container/heap"
"log"
"regexp"
"time"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
)
// Ring-buffer dimensions — shared between collector and aggregator.
const (
FineRingSize = 60 // 60 × 1-min buckets → 1 hour
CoarseRingSize = 288 // 288 × 5-min buckets → 24 hours
FineTopK = 50_000 // entries kept per fine snapshot
CoarseTopK = 5_000 // entries kept per coarse snapshot
CoarseEvery = 5 // fine ticks between coarse writes
)
// Tuple4 is the four-dimensional aggregation key.
type Tuple4 struct {
Website string
Prefix string
URI string
Status string
}
// Entry is a labelled count used in snapshots and query results.
type Entry struct {
Label string
Count int64
}
// Snapshot is one sorted (desc) slice of top-K entries for a time bucket.
type Snapshot struct {
Timestamp time.Time
Entries []Entry
}
// TrendPoint is a (timestamp, total-count) pair for sparkline queries.
type TrendPoint struct {
Timestamp time.Time
Count int64
}
// RingView is a read-only snapshot of a ring buffer for iteration.
type RingView struct {
Ring []Snapshot
Head int // index of next write slot (one past the latest entry)
Size int
}
// BucketsForWindow returns the RingView and number of buckets to sum for window.
func BucketsForWindow(window pb.Window, fine, coarse RingView, fineFilled, coarseFilled int) (RingView, int) {
switch window {
case pb.Window_W1M:
return fine, min(1, fineFilled)
case pb.Window_W5M:
return fine, min(5, fineFilled)
case pb.Window_W15M:
return fine, min(15, fineFilled)
case pb.Window_W60M:
return fine, min(60, fineFilled)
case pb.Window_W6H:
return coarse, min(72, coarseFilled) // 72 × 5-min = 6 h
case pb.Window_W24H:
return coarse, min(288, coarseFilled)
default:
return fine, min(5, fineFilled)
}
}
// --- label encoding: "website\x00prefix\x00uri\x00status" ---
// EncodeTuple encodes a Tuple4 as a NUL-separated string suitable for use
// as a map key in snapshots.
func EncodeTuple(t Tuple4) string {
return t.Website + "\x00" + t.Prefix + "\x00" + t.URI + "\x00" + t.Status
}
// LabelTuple decodes a NUL-separated snapshot label back into a Tuple4.
func LabelTuple(label string) Tuple4 {
parts := splitN(label, '\x00', 4)
if len(parts) != 4 {
return Tuple4{}
}
return Tuple4{parts[0], parts[1], parts[2], parts[3]}
}
func splitN(s string, sep byte, n int) []string {
result := make([]string, 0, n)
for len(result) < n-1 {
i := indexOf(s, sep)
if i < 0 {
break
}
result = append(result, s[:i])
s = s[i+1:]
}
return append(result, s)
}
func indexOf(s string, b byte) int {
for i := 0; i < len(s); i++ {
if s[i] == b {
return i
}
}
return -1
}
// --- filtering and grouping ---
// CompiledFilter wraps a pb.Filter with pre-compiled regular expressions.
// Use CompileFilter to construct one before a query loop.
type CompiledFilter struct {
Proto *pb.Filter
WebsiteRe *regexp.Regexp // nil if no website_regex or compilation failed
URIRe *regexp.Regexp // nil if no uri_regex or compilation failed
}
// CompileFilter compiles the regex fields in f once. Invalid regexes are
// logged and treated as "match nothing" for that field.
func CompileFilter(f *pb.Filter) *CompiledFilter {
cf := &CompiledFilter{Proto: f}
if f == nil {
return cf
}
if f.WebsiteRegex != nil {
re, err := regexp.Compile(f.GetWebsiteRegex())
if err != nil {
log.Printf("store: invalid website_regex %q: %v", f.GetWebsiteRegex(), err)
} else {
cf.WebsiteRe = re
}
}
if f.UriRegex != nil {
re, err := regexp.Compile(f.GetUriRegex())
if err != nil {
log.Printf("store: invalid uri_regex %q: %v", f.GetUriRegex(), err)
} else {
cf.URIRe = re
}
}
return cf
}
// MatchesFilter returns true if t satisfies all constraints in f.
// A nil filter matches everything.
func MatchesFilter(t Tuple4, f *CompiledFilter) bool {
if f == nil || f.Proto == nil {
return true
}
p := f.Proto
if p.Website != nil && t.Website != p.GetWebsite() {
return false
}
if f.WebsiteRe != nil && !f.WebsiteRe.MatchString(t.Website) {
return false
}
// website_regex set but failed to compile → match nothing
if p.WebsiteRegex != nil && f.WebsiteRe == nil {
return false
}
if p.ClientPrefix != nil && t.Prefix != p.GetClientPrefix() {
return false
}
if p.HttpRequestUri != nil && t.URI != p.GetHttpRequestUri() {
return false
}
if f.URIRe != nil && !f.URIRe.MatchString(t.URI) {
return false
}
if p.UriRegex != nil && f.URIRe == nil {
return false
}
if p.HttpResponse != nil && !matchesStatusOp(t.Status, p.GetHttpResponse(), p.StatusOp) {
return false
}
return true
}
// matchesStatusOp applies op(statusStr, want), parsing statusStr as an integer.
// Returns false if statusStr is not a valid integer.
func matchesStatusOp(statusStr string, want int32, op pb.StatusOp) bool {
var got int32
for _, c := range []byte(statusStr) {
if c < '0' || c > '9' {
return false
}
got = got*10 + int32(c-'0')
}
switch op {
case pb.StatusOp_NE:
return got != want
case pb.StatusOp_GT:
return got > want
case pb.StatusOp_GE:
return got >= want
case pb.StatusOp_LT:
return got < want
case pb.StatusOp_LE:
return got <= want
default: // EQ
return got == want
}
}
// DimensionLabel returns the string value of t for the given group-by dimension.
func DimensionLabel(t Tuple4, g pb.GroupBy) string {
switch g {
case pb.GroupBy_WEBSITE:
return t.Website
case pb.GroupBy_CLIENT_PREFIX:
return t.Prefix
case pb.GroupBy_REQUEST_URI:
return t.URI
case pb.GroupBy_HTTP_RESPONSE:
return t.Status
default:
return t.Website
}
}
// ParseStatusExpr parses a status filter expression into a value and operator.
// Accepted syntax: 200, =200, ==200, !=200, >400, >=400, <500, <=500.
// Returns ok=false if the expression is empty or unparseable.
func ParseStatusExpr(s string) (value int32, op pb.StatusOp, ok bool) {
if s == "" {
return 0, pb.StatusOp_EQ, false
}
var digits string
switch {
case len(s) >= 2 && s[:2] == "!=":
op, digits = pb.StatusOp_NE, s[2:]
case len(s) >= 2 && s[:2] == ">=":
op, digits = pb.StatusOp_GE, s[2:]
case len(s) >= 2 && s[:2] == "<=":
op, digits = pb.StatusOp_LE, s[2:]
case len(s) >= 2 && s[:2] == "==":
op, digits = pb.StatusOp_EQ, s[2:]
case s[0] == '>':
op, digits = pb.StatusOp_GT, s[1:]
case s[0] == '<':
op, digits = pb.StatusOp_LT, s[1:]
case s[0] == '=':
op, digits = pb.StatusOp_EQ, s[1:]
default:
op, digits = pb.StatusOp_EQ, s
}
var n int32
if digits == "" {
return 0, pb.StatusOp_EQ, false
}
for _, c := range []byte(digits) {
if c < '0' || c > '9' {
return 0, pb.StatusOp_EQ, false
}
n = n*10 + int32(c-'0')
}
return n, op, true
}
// --- heap-based top-K selection ---
type entryHeap []Entry
func (h entryHeap) Len() int { return len(h) }
func (h entryHeap) Less(i, j int) bool { return h[i].Count < h[j].Count } // min-heap
func (h entryHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *entryHeap) Push(x interface{}) { *h = append(*h, x.(Entry)) }
func (h *entryHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
// TopKFromMap selects the top-k entries from a label→count map, sorted desc.
func TopKFromMap(m map[string]int64, k int) []Entry {
if k <= 0 {
return nil
}
h := make(entryHeap, 0, k+1)
for label, count := range m {
heap.Push(&h, Entry{Label: label, Count: count})
if h.Len() > k {
heap.Pop(&h)
}
}
result := make([]Entry, h.Len())
for i := len(result) - 1; i >= 0; i-- {
result[i] = heap.Pop(&h).(Entry)
}
return result
}
// TopKFromTupleMap encodes a Tuple4 map and returns the top-k as a Snapshot.
// Used by the collector to snapshot its live map.
func TopKFromTupleMap(m map[Tuple4]int64, k int, ts time.Time) Snapshot {
flat := make(map[string]int64, len(m))
for t, c := range m {
flat[EncodeTuple(t)] = c
}
return Snapshot{Timestamp: ts, Entries: TopKFromMap(flat, k)}
}