package main import ( "bufio" "fmt" "net/http" "sort" "strings" "sync" ) // Body-size histogram bucket upper bounds in bytes. const promNumBodyBounds = 7 var promBodyBounds = [promNumBodyBounds]int64{256, 1024, 4096, 16384, 65536, 262144, 1048576} // Request-time histogram bucket upper bounds in seconds (standard Prometheus defaults). const promNumTimeBounds = 11 var promTimeBounds = [promNumTimeBounds]float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10} const promCounterCap = 250_000 // safety cap on {host,method,status} counter entries // promCounterKey is the label set for per-request counters. type promCounterKey struct { Host string Method string Status string } // promBodyEntry holds the body_bytes_sent histogram for one host. type promBodyEntry struct { buckets [promNumBodyBounds + 1]int64 // indices 0..N-1: le=bound[i]; index N: le=+Inf sum int64 } // promTimeEntry holds the request_time histogram for one host. type promTimeEntry struct { buckets [promNumTimeBounds + 1]int64 sum float64 } // PromStore accumulates Prometheus metrics ingested from log records. // // Ingest must be called from exactly one goroutine (the store's Run goroutine). // ServeHTTP may be called from any number of goroutines concurrently. type PromStore struct { mu sync.Mutex counters map[promCounterKey]int64 body map[string]*promBodyEntry // keyed by host reqTime map[string]*promTimeEntry // keyed by host // per-source_tag rollups (parallel series, not a cross-product with host) sourceCounters map[string]int64 // keyed by source_tag sourceBody map[string]*promBodyEntry // keyed by source_tag // UDP ingest counters — protected by their own atomic-friendly lock. udpMu sync.Mutex udpPacketsReceived int64 // datagrams read off the socket udpLoglinesSuccess int64 // successfully parsed udpLoglinesConsumed int64 // successfully forwarded to the store channel } // NewPromStore returns an empty PromStore ready for use. func NewPromStore() *PromStore { return &PromStore{ counters: make(map[promCounterKey]int64, 1024), body: make(map[string]*promBodyEntry, 64), reqTime: make(map[string]*promTimeEntry, 64), sourceCounters: make(map[string]int64, 32), sourceBody: make(map[string]*promBodyEntry, 32), } } // Ingest records one log record into the Prometheus metrics. // Must be called from a single goroutine. func (p *PromStore) Ingest(r LogRecord) { p.mu.Lock() // --- per-{host,method,status} request counter --- ck := promCounterKey{Host: r.Website, Method: r.Method, Status: r.Status} if _, ok := p.counters[ck]; ok { p.counters[ck]++ } else if len(p.counters) < promCounterCap { p.counters[ck] = 1 } // --- body_bytes_sent histogram (keyed by host only) --- observeBody(p.body, r.Website, r.BodyBytesSent) // --- request_time histogram (keyed by host only) --- te, ok := p.reqTime[r.Website] if !ok { te = &promTimeEntry{} p.reqTime[r.Website] = te } for i, bound := range promTimeBounds { if r.RequestTime <= bound { te.buckets[i]++ } } te.buckets[promNumTimeBounds]++ // +Inf te.sum += r.RequestTime // --- per-source_tag rollups --- p.sourceCounters[r.SourceTag]++ observeBody(p.sourceBody, r.SourceTag, r.BodyBytesSent) p.mu.Unlock() } // IncUDPPacket, IncUDPSuccess, and IncUDPConsumed bump their respective // UDP ingest counters. They are called from the UDP listener goroutine. func (p *PromStore) IncUDPPacket() { p.udpMu.Lock(); p.udpPacketsReceived++; p.udpMu.Unlock() } func (p *PromStore) IncUDPSuccess() { p.udpMu.Lock(); p.udpLoglinesSuccess++; p.udpMu.Unlock() } func (p *PromStore) IncUDPConsumed() { p.udpMu.Lock(); p.udpLoglinesConsumed++; p.udpMu.Unlock() } func observeBody(m map[string]*promBodyEntry, key string, bytes int64) { e, ok := m[key] if !ok { e = &promBodyEntry{} m[key] = e } for i, bound := range promBodyBounds { if bytes <= bound { e.buckets[i]++ } } e.buckets[promNumBodyBounds]++ // +Inf e.sum += bytes } // ServeHTTP renders all metrics in the Prometheus text exposition format (0.0.4). func (p *PromStore) ServeHTTP(w http.ResponseWriter, _ *http.Request) { // Snapshot everything under the lock, then render without holding it. p.mu.Lock() type counterSnap struct { k promCounterKey v int64 } counters := make([]counterSnap, 0, len(p.counters)) for k, v := range p.counters { counters = append(counters, counterSnap{k, v}) } type bodySnap struct { label string e promBodyEntry } bodySnaps := make([]bodySnap, 0, len(p.body)) for h, e := range p.body { bodySnaps = append(bodySnaps, bodySnap{h, *e}) } type timeSnap struct { host string e promTimeEntry } timeSnaps := make([]timeSnap, 0, len(p.reqTime)) for h, e := range p.reqTime { timeSnaps = append(timeSnaps, timeSnap{h, *e}) } type sourceCounterSnap struct { tag string v int64 } sourceCounters := make([]sourceCounterSnap, 0, len(p.sourceCounters)) for t, v := range p.sourceCounters { sourceCounters = append(sourceCounters, sourceCounterSnap{t, v}) } sourceBodySnaps := make([]bodySnap, 0, len(p.sourceBody)) for t, e := range p.sourceBody { sourceBodySnaps = append(sourceBodySnaps, bodySnap{t, *e}) } p.mu.Unlock() p.udpMu.Lock() udpPackets := p.udpPacketsReceived udpSuccess := p.udpLoglinesSuccess udpConsumed := p.udpLoglinesConsumed p.udpMu.Unlock() // Sort for stable, human-readable output. sort.Slice(counters, func(i, j int) bool { a, b := counters[i].k, counters[j].k if a.Host != b.Host { return a.Host < b.Host } if a.Method != b.Method { return a.Method < b.Method } return a.Status < b.Status }) sort.Slice(bodySnaps, func(i, j int) bool { return bodySnaps[i].label < bodySnaps[j].label }) sort.Slice(timeSnaps, func(i, j int) bool { return timeSnaps[i].host < timeSnaps[j].host }) sort.Slice(sourceCounters, func(i, j int) bool { return sourceCounters[i].tag < sourceCounters[j].tag }) sort.Slice(sourceBodySnaps, func(i, j int) bool { return sourceBodySnaps[i].label < sourceBodySnaps[j].label }) w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") bw := bufio.NewWriterSize(w, 256*1024) // nginx_http_requests_total fmt.Fprintln(bw, "# HELP nginx_http_requests_total Total number of HTTP requests processed.") fmt.Fprintln(bw, "# TYPE nginx_http_requests_total counter") for _, c := range counters { fmt.Fprintf(bw, "nginx_http_requests_total{host=%q,method=%q,status=%q} %d\n", c.k.Host, c.k.Method, c.k.Status, c.v) } // nginx_http_response_body_bytes (histogram, labeled by host) fmt.Fprintln(bw, "# HELP nginx_http_response_body_bytes HTTP response body size distribution in bytes.") fmt.Fprintln(bw, "# TYPE nginx_http_response_body_bytes histogram") for _, s := range bodySnaps { writeBodyHistogram(bw, "nginx_http_response_body_bytes", "host", s.label, s.e) } // nginx_http_request_duration_seconds (histogram, labeled by host) fmt.Fprintln(bw, "# HELP nginx_http_request_duration_seconds HTTP request processing time in seconds.") fmt.Fprintln(bw, "# TYPE nginx_http_request_duration_seconds histogram") for _, s := range timeSnaps { for i, bound := range promTimeBounds { fmt.Fprintf(bw, "nginx_http_request_duration_seconds_bucket{host=%q,le=%q} %d\n", s.host, formatFloat(bound), s.e.buckets[i]) } fmt.Fprintf(bw, "nginx_http_request_duration_seconds_bucket{host=%q,le=\"+Inf\"} %d\n", s.host, s.e.buckets[promNumTimeBounds]) fmt.Fprintf(bw, "nginx_http_request_duration_seconds_count{host=%q} %d\n", s.host, s.e.buckets[promNumTimeBounds]) fmt.Fprintf(bw, "nginx_http_request_duration_seconds_sum{host=%q} %g\n", s.host, s.e.sum) } // nginx_http_requests_by_source_total (counter, labeled by source_tag) fmt.Fprintln(bw, "# HELP nginx_http_requests_by_source_total HTTP requests rolled up by nginx source tag.") fmt.Fprintln(bw, "# TYPE nginx_http_requests_by_source_total counter") for _, c := range sourceCounters { fmt.Fprintf(bw, "nginx_http_requests_by_source_total{source_tag=%q} %d\n", c.tag, c.v) } // nginx_http_response_body_bytes_by_source (histogram, labeled by source_tag) fmt.Fprintln(bw, "# HELP nginx_http_response_body_bytes_by_source HTTP response body size distribution by nginx source tag.") fmt.Fprintln(bw, "# TYPE nginx_http_response_body_bytes_by_source histogram") for _, s := range sourceBodySnaps { writeBodyHistogram(bw, "nginx_http_response_body_bytes_by_source", "source_tag", s.label, s.e) } // UDP ingest counters — lets operators distinguish parse failures // (received - success) from channel-full drops (success - consumed). fmt.Fprintln(bw, "# HELP logtail_udp_packets_received_total Datagrams read from the UDP socket.") fmt.Fprintln(bw, "# TYPE logtail_udp_packets_received_total counter") fmt.Fprintf(bw, "logtail_udp_packets_received_total %d\n", udpPackets) fmt.Fprintln(bw, "# HELP logtail_udp_loglines_success_total UDP loglines that parsed successfully.") fmt.Fprintln(bw, "# TYPE logtail_udp_loglines_success_total counter") fmt.Fprintf(bw, "logtail_udp_loglines_success_total %d\n", udpSuccess) fmt.Fprintln(bw, "# HELP logtail_udp_loglines_consumed_total UDP loglines forwarded to the store (not dropped).") fmt.Fprintln(bw, "# TYPE logtail_udp_loglines_consumed_total counter") fmt.Fprintf(bw, "logtail_udp_loglines_consumed_total %d\n", udpConsumed) bw.Flush() } func writeBodyHistogram(bw *bufio.Writer, metric, labelName, labelValue string, e promBodyEntry) { for i, bound := range promBodyBounds { fmt.Fprintf(bw, "%s_bucket{%s=%q,le=%q} %d\n", metric, labelName, labelValue, fmt.Sprintf("%d", bound), e.buckets[i]) } fmt.Fprintf(bw, "%s_bucket{%s=%q,le=\"+Inf\"} %d\n", metric, labelName, labelValue, e.buckets[promNumBodyBounds]) fmt.Fprintf(bw, "%s_count{%s=%q} %d\n", metric, labelName, labelValue, e.buckets[promNumBodyBounds]) fmt.Fprintf(bw, "%s_sum{%s=%q} %d\n", metric, labelName, labelValue, e.sum) } // formatFloat renders a float64 bucket bound without trailing zeros but always // with at least one decimal place, matching Prometheus convention (e.g. "0.5", "10"). func formatFloat(f float64) string { s := fmt.Sprintf("%g", f) if !strings.Contains(s, ".") && !strings.Contains(s, "e") { s += ".0" // ensure it looks like a float, not an integer } return s }