Files

287 lines
10 KiB
Go

package main
import (
"bufio"
"fmt"
"net/http"
"sort"
"strings"
"sync"
)
// Body-size histogram bucket upper bounds in bytes.
const promNumBodyBounds = 7
var promBodyBounds = [promNumBodyBounds]int64{256, 1024, 4096, 16384, 65536, 262144, 1048576}
// Request-time histogram bucket upper bounds in seconds (standard Prometheus defaults).
const promNumTimeBounds = 11
var promTimeBounds = [promNumTimeBounds]float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
const promCounterCap = 250_000 // safety cap on {host,method,status} counter entries
// promCounterKey is the label set for per-request counters.
type promCounterKey struct {
Host string
Method string
Status string
}
// promBodyEntry holds the body_bytes_sent histogram for one host.
type promBodyEntry struct {
buckets [promNumBodyBounds + 1]int64 // indices 0..N-1: le=bound[i]; index N: le=+Inf
sum int64
}
// promTimeEntry holds the request_time histogram for one host.
type promTimeEntry struct {
buckets [promNumTimeBounds + 1]int64
sum float64
}
// PromStore accumulates Prometheus metrics ingested from log records.
//
// Ingest must be called from exactly one goroutine (the store's Run goroutine).
// ServeHTTP may be called from any number of goroutines concurrently.
type PromStore struct {
mu sync.Mutex
counters map[promCounterKey]int64
body map[string]*promBodyEntry // keyed by host
reqTime map[string]*promTimeEntry // keyed by host
// per-source_tag rollups (parallel series, not a cross-product with host)
sourceCounters map[string]int64 // keyed by source_tag
sourceBody map[string]*promBodyEntry // keyed by source_tag
// UDP ingest counters — protected by their own atomic-friendly lock.
udpMu sync.Mutex
udpPacketsReceived int64 // datagrams read off the socket
udpLoglinesSuccess int64 // successfully parsed
udpLoglinesConsumed int64 // successfully forwarded to the store channel
}
// NewPromStore returns an empty PromStore ready for use.
func NewPromStore() *PromStore {
return &PromStore{
counters: make(map[promCounterKey]int64, 1024),
body: make(map[string]*promBodyEntry, 64),
reqTime: make(map[string]*promTimeEntry, 64),
sourceCounters: make(map[string]int64, 32),
sourceBody: make(map[string]*promBodyEntry, 32),
}
}
// Ingest records one log record into the Prometheus metrics.
// Must be called from a single goroutine.
func (p *PromStore) Ingest(r LogRecord) {
p.mu.Lock()
// --- per-{host,method,status} request counter ---
ck := promCounterKey{Host: r.Website, Method: r.Method, Status: r.Status}
if _, ok := p.counters[ck]; ok {
p.counters[ck]++
} else if len(p.counters) < promCounterCap {
p.counters[ck] = 1
}
// --- body_bytes_sent histogram (keyed by host only) ---
observeBody(p.body, r.Website, r.BodyBytesSent)
// --- request_time histogram (keyed by host only) ---
te, ok := p.reqTime[r.Website]
if !ok {
te = &promTimeEntry{}
p.reqTime[r.Website] = te
}
for i, bound := range promTimeBounds {
if r.RequestTime <= bound {
te.buckets[i]++
}
}
te.buckets[promNumTimeBounds]++ // +Inf
te.sum += r.RequestTime
// --- per-source_tag rollups ---
p.sourceCounters[r.SourceTag]++
observeBody(p.sourceBody, r.SourceTag, r.BodyBytesSent)
p.mu.Unlock()
}
// IncUDPPacket, IncUDPSuccess, and IncUDPConsumed bump their respective
// UDP ingest counters. They are called from the UDP listener goroutine.
func (p *PromStore) IncUDPPacket() { p.udpMu.Lock(); p.udpPacketsReceived++; p.udpMu.Unlock() }
func (p *PromStore) IncUDPSuccess() { p.udpMu.Lock(); p.udpLoglinesSuccess++; p.udpMu.Unlock() }
func (p *PromStore) IncUDPConsumed() { p.udpMu.Lock(); p.udpLoglinesConsumed++; p.udpMu.Unlock() }
func observeBody(m map[string]*promBodyEntry, key string, bytes int64) {
e, ok := m[key]
if !ok {
e = &promBodyEntry{}
m[key] = e
}
for i, bound := range promBodyBounds {
if bytes <= bound {
e.buckets[i]++
}
}
e.buckets[promNumBodyBounds]++ // +Inf
e.sum += bytes
}
// ServeHTTP renders all metrics in the Prometheus text exposition format (0.0.4).
func (p *PromStore) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
// Snapshot everything under the lock, then render without holding it.
p.mu.Lock()
type counterSnap struct {
k promCounterKey
v int64
}
counters := make([]counterSnap, 0, len(p.counters))
for k, v := range p.counters {
counters = append(counters, counterSnap{k, v})
}
type bodySnap struct {
label string
e promBodyEntry
}
bodySnaps := make([]bodySnap, 0, len(p.body))
for h, e := range p.body {
bodySnaps = append(bodySnaps, bodySnap{h, *e})
}
type timeSnap struct {
host string
e promTimeEntry
}
timeSnaps := make([]timeSnap, 0, len(p.reqTime))
for h, e := range p.reqTime {
timeSnaps = append(timeSnaps, timeSnap{h, *e})
}
type sourceCounterSnap struct {
tag string
v int64
}
sourceCounters := make([]sourceCounterSnap, 0, len(p.sourceCounters))
for t, v := range p.sourceCounters {
sourceCounters = append(sourceCounters, sourceCounterSnap{t, v})
}
sourceBodySnaps := make([]bodySnap, 0, len(p.sourceBody))
for t, e := range p.sourceBody {
sourceBodySnaps = append(sourceBodySnaps, bodySnap{t, *e})
}
p.mu.Unlock()
p.udpMu.Lock()
udpPackets := p.udpPacketsReceived
udpSuccess := p.udpLoglinesSuccess
udpConsumed := p.udpLoglinesConsumed
p.udpMu.Unlock()
// Sort for stable, human-readable output.
sort.Slice(counters, func(i, j int) bool {
a, b := counters[i].k, counters[j].k
if a.Host != b.Host {
return a.Host < b.Host
}
if a.Method != b.Method {
return a.Method < b.Method
}
return a.Status < b.Status
})
sort.Slice(bodySnaps, func(i, j int) bool { return bodySnaps[i].label < bodySnaps[j].label })
sort.Slice(timeSnaps, func(i, j int) bool { return timeSnaps[i].host < timeSnaps[j].host })
sort.Slice(sourceCounters, func(i, j int) bool { return sourceCounters[i].tag < sourceCounters[j].tag })
sort.Slice(sourceBodySnaps, func(i, j int) bool { return sourceBodySnaps[i].label < sourceBodySnaps[j].label })
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
bw := bufio.NewWriterSize(w, 256*1024)
// nginx_http_requests_total
fmt.Fprintln(bw, "# HELP nginx_http_requests_total Total number of HTTP requests processed.")
fmt.Fprintln(bw, "# TYPE nginx_http_requests_total counter")
for _, c := range counters {
fmt.Fprintf(bw, "nginx_http_requests_total{host=%q,method=%q,status=%q} %d\n",
c.k.Host, c.k.Method, c.k.Status, c.v)
}
// nginx_http_response_body_bytes (histogram, labeled by host)
fmt.Fprintln(bw, "# HELP nginx_http_response_body_bytes HTTP response body size distribution in bytes.")
fmt.Fprintln(bw, "# TYPE nginx_http_response_body_bytes histogram")
for _, s := range bodySnaps {
writeBodyHistogram(bw, "nginx_http_response_body_bytes", "host", s.label, s.e)
}
// nginx_http_request_duration_seconds (histogram, labeled by host)
fmt.Fprintln(bw, "# HELP nginx_http_request_duration_seconds HTTP request processing time in seconds.")
fmt.Fprintln(bw, "# TYPE nginx_http_request_duration_seconds histogram")
for _, s := range timeSnaps {
for i, bound := range promTimeBounds {
fmt.Fprintf(bw, "nginx_http_request_duration_seconds_bucket{host=%q,le=%q} %d\n",
s.host, formatFloat(bound), s.e.buckets[i])
}
fmt.Fprintf(bw, "nginx_http_request_duration_seconds_bucket{host=%q,le=\"+Inf\"} %d\n",
s.host, s.e.buckets[promNumTimeBounds])
fmt.Fprintf(bw, "nginx_http_request_duration_seconds_count{host=%q} %d\n",
s.host, s.e.buckets[promNumTimeBounds])
fmt.Fprintf(bw, "nginx_http_request_duration_seconds_sum{host=%q} %g\n",
s.host, s.e.sum)
}
// nginx_http_requests_by_source_total (counter, labeled by source_tag)
fmt.Fprintln(bw, "# HELP nginx_http_requests_by_source_total HTTP requests rolled up by nginx source tag.")
fmt.Fprintln(bw, "# TYPE nginx_http_requests_by_source_total counter")
for _, c := range sourceCounters {
fmt.Fprintf(bw, "nginx_http_requests_by_source_total{source_tag=%q} %d\n", c.tag, c.v)
}
// nginx_http_response_body_bytes_by_source (histogram, labeled by source_tag)
fmt.Fprintln(bw, "# HELP nginx_http_response_body_bytes_by_source HTTP response body size distribution by nginx source tag.")
fmt.Fprintln(bw, "# TYPE nginx_http_response_body_bytes_by_source histogram")
for _, s := range sourceBodySnaps {
writeBodyHistogram(bw, "nginx_http_response_body_bytes_by_source", "source_tag", s.label, s.e)
}
// UDP ingest counters — lets operators distinguish parse failures
// (received - success) from channel-full drops (success - consumed).
fmt.Fprintln(bw, "# HELP logtail_udp_packets_received_total Datagrams read from the UDP socket.")
fmt.Fprintln(bw, "# TYPE logtail_udp_packets_received_total counter")
fmt.Fprintf(bw, "logtail_udp_packets_received_total %d\n", udpPackets)
fmt.Fprintln(bw, "# HELP logtail_udp_loglines_success_total UDP loglines that parsed successfully.")
fmt.Fprintln(bw, "# TYPE logtail_udp_loglines_success_total counter")
fmt.Fprintf(bw, "logtail_udp_loglines_success_total %d\n", udpSuccess)
fmt.Fprintln(bw, "# HELP logtail_udp_loglines_consumed_total UDP loglines forwarded to the store (not dropped).")
fmt.Fprintln(bw, "# TYPE logtail_udp_loglines_consumed_total counter")
fmt.Fprintf(bw, "logtail_udp_loglines_consumed_total %d\n", udpConsumed)
bw.Flush()
}
func writeBodyHistogram(bw *bufio.Writer, metric, labelName, labelValue string, e promBodyEntry) {
for i, bound := range promBodyBounds {
fmt.Fprintf(bw, "%s_bucket{%s=%q,le=%q} %d\n",
metric, labelName, labelValue, fmt.Sprintf("%d", bound), e.buckets[i])
}
fmt.Fprintf(bw, "%s_bucket{%s=%q,le=\"+Inf\"} %d\n",
metric, labelName, labelValue, e.buckets[promNumBodyBounds])
fmt.Fprintf(bw, "%s_count{%s=%q} %d\n",
metric, labelName, labelValue, e.buckets[promNumBodyBounds])
fmt.Fprintf(bw, "%s_sum{%s=%q} %d\n",
metric, labelName, labelValue, e.sum)
}
// formatFloat renders a float64 bucket bound without trailing zeros but always
// with at least one decimal place, matching Prometheus convention (e.g. "0.5", "10").
func formatFloat(f float64) string {
s := fmt.Sprintf("%g", f)
if !strings.Contains(s, ".") && !strings.Contains(s, "e") {
s += ".0" // ensure it looks like a float, not an integer
}
return s
}