Compare commits

..

11 Commits

Author SHA1 Message Date
Pim van Pelt
0ecca06069 Typo fix 2026-03-27 01:21:46 +01:00
Pim van Pelt
862d043376 Refactor docs 2026-03-27 01:20:19 +01:00
Pim van Pelt
50fc94b87d Add screenshot 2026-03-27 01:11:27 +01:00
Pim van Pelt
6c3a28c9ce Pin fine/coarse mergeDump to the 1min/5min boundary, fixes sparkline 2026-03-25 18:14:20 +01:00
Pim van Pelt
d0fb34160f Clean up logging, update website filter hint 2026-03-25 07:43:27 +01:00
Pim van Pelt
456452afc4 Allow !~= for website/uri 2026-03-25 07:32:39 +01:00
Pim van Pelt
eddb04ced4 Add aggregator backfill, pulling fine+coarse buckets from collectors 2026-03-25 07:06:03 +01:00
Pim van Pelt
d2dcd88c4b Add Docker setup, add environment vars for each flag 2026-03-25 06:41:13 +01:00
Pim van Pelt
129246d85b Always render all sources from default target 2026-03-25 05:35:40 +01:00
Pim van Pelt
b3103834d0 Add Grafana dashboard 2026-03-24 04:50:24 +01:00
Pim van Pelt
91eb56a64c Add prometheus exporter on :9100 2026-03-24 03:49:22 +01:00
33 changed files with 3676 additions and 1883 deletions

14
Dockerfile Normal file
View File

@@ -0,0 +1,14 @@
FROM golang:1.24-alpine AS builder
WORKDIR /src
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 go build -trimpath -ldflags="-s -w" -o /out/collector ./cmd/collector && \
CGO_ENABLED=0 go build -trimpath -ldflags="-s -w" -o /out/aggregator ./cmd/aggregator && \
CGO_ENABLED=0 go build -trimpath -ldflags="-s -w" -o /out/frontend ./cmd/frontend && \
CGO_ENABLED=0 go build -trimpath -ldflags="-s -w" -o /out/cli ./cmd/cli
FROM scratch
COPY --from=builder /out/ /usr/local/bin/

427
README.md
View File

@@ -1,4 +1,4 @@
PREAMBLE ## PREAMBLE
Although this computer program has a permissive license (AP2.0), if you came here looking to ask Although this computer program has a permissive license (AP2.0), if you came here looking to ask
questions, you're better off just moving on :) This program is shared AS-IS and really without any questions, you're better off just moving on :) This program is shared AS-IS and really without any
@@ -8,417 +8,24 @@ Code.
You have been warned :) You have been warned :)
SPECIFICATION ![nginx-logtail frontend](docs/frontend.png)
This project contains four programs: ## What is this?
1) A **collector** that tails any number of nginx log files and maintains an in-memory structure of This project consists of four components:
`{website, client_prefix, http_request_uri, http_response}` counts across all files. It answers 1. A log collector that tails NGINX (or Apache) logs of a certain format, and aggregates
TopN and Trend queries via gRPC and pushes minute snapshots to the aggregator via server-streaming. information per website, client address, status, and so on. It buckets these into windows
Runs on each nginx machine in the cluster. No UI — gRPC interface only. of 1min, 5min, 15min, 60min, 6hrs and 24hrs. It exposes this on a gRPC endpoint.
1. An aggregator that can scrape any number of collectors into a merged regional (or global)
view. The aggregator exposes the same gRPC endpoint as the collectors.
1. A Frontend that allows to query this data structure very quickly.
1. A CLI that allows to query this data also, returning JSON for further processing.
2) An **aggregator** that subscribes to the snapshot stream from all collectors, merges their data It's written in Go, and is meant to deploy collectors on any number of webservers, and central
into a unified in-memory cache, and exposes the same gRPC interface. Answers questions like "what aggregation and frontend logic. It's released under [[APACHE](LICENSE)] license. It can be run
is the busiest website globally", "which client prefix is causing the most HTTP 503s", and shows either as `systemd` units, or in Docker, or any combination of the two.
trending information useful for DDoS detection. Runs on a central machine.
3) An **HTTP frontend** companion to the aggregator that renders a drilldown dashboard. Operators See [[User Guide](docs/USERGUIDE.md)] or [[DETAILS](docs/DETAILS.md)] for more information.
can restrict by `http_response=429`, then by `website=www.example.com`, and so on. Works with
either a collector or aggregator as its backend. Zero JavaScript — server-rendered HTML with inline
SVG sparklines and meta-refresh.
4) A **CLI** for shell-based debugging. Sends `topn`, `trend`, and `stream` queries to any The [[docs/](docs/)] directory contains extensive planning information which shows how Claude
collector or aggregator, fans out to multiple targets in parallel, and outputs human-readable Code single-shot implemented the whole system in March 2026.
tables or newline-delimited JSON.
Programs are written in Go. No CGO, no external runtime dependencies.
---
DESIGN
## Directory Layout
```
nginx-logtail/
├── proto/
│ ├── logtail.proto # shared protobuf definitions
│ └── logtailpb/
│ ├── logtail.pb.go # generated: messages, enums
│ └── logtail_grpc.pb.go # generated: service stubs
├── internal/
│ └── store/
│ └── store.go # shared types: Tuple6, Entry, Snapshot, ring helpers
└── cmd/
├── collector/
│ ├── main.go
│ ├── tailer.go # MultiTailer: tail N files via one shared fsnotify watcher
│ ├── parser.go # tab-separated logtail log_format parser (~50 ns/line)
│ ├── store.go # bounded top-K in-memory store + tiered ring buffers
│ └── server.go # gRPC server: TopN, Trend, StreamSnapshots
├── aggregator/
│ ├── main.go
│ ├── subscriber.go # one goroutine per collector; StreamSnapshots with backoff
│ ├── merger.go # delta-merge: O(snapshot_size) per update
│ ├── cache.go # tick-based ring buffer cache served to clients
│ ├── registry.go # TargetRegistry: addr→name map updated from snapshot sources
│ └── server.go # gRPC server (same surface as collector)
├── frontend/
│ ├── main.go
│ ├── handler.go # URL param parsing, concurrent TopN+Trend, template exec
│ ├── filter.go # ParseFilterExpr / FilterExprString mini filter language
│ ├── client.go # gRPC dial helper
│ ├── sparkline.go # TrendPoints → inline SVG polyline
│ ├── format.go # fmtCount (space thousands separator)
│ └── templates/
│ ├── base.html # outer HTML shell, inline CSS, meta-refresh
│ └── index.html # window tabs, group-by tabs, breadcrumb, table, footer
└── cli/
├── main.go # subcommand dispatch and usage
├── flags.go # shared flags, parseTargets, buildFilter, parseWindow
├── client.go # gRPC dial helper
├── format.go # printTable, fmtCount, fmtTime, targetHeader
├── cmd_topn.go # topn: concurrent fan-out, table + JSON output
├── cmd_trend.go # trend: concurrent fan-out, table + JSON output
├── cmd_stream.go # stream: multiplexed streams, auto-reconnect
└── cmd_targets.go # targets: list collectors known to the endpoint
```
## Data Model
The core unit is a **count keyed by six dimensions**:
| Field | Description | Example |
|-------------------|------------------------------------------------------|-------------------|
| `website` | nginx `$host` | `www.example.com` |
| `client_prefix` | client IP truncated to /24 IPv4 or /48 IPv6 | `1.2.3.0/24` |
| `http_request_uri`| `$request_uri` path only — query string stripped | `/api/v1/search` |
| `http_response` | HTTP status code | `429` |
| `is_tor` | whether the client IP is a TOR exit node | `1` |
| `asn` | client AS number (MaxMind GeoIP2, 32-bit int) | `8298` |
## Time Windows & Tiered Ring Buffers
Two ring buffers at different resolutions cover all query windows up to 24 hours:
| Tier | Bucket size | Buckets | Top-K/bucket | Covers | Roll-up trigger |
|--------|-------------|---------|--------------|--------|---------------------|
| Fine | 1 min | 60 | 50 000 | 1 h | every minute |
| Coarse | 5 min | 288 | 5 000 | 24 h | every 5 fine ticks |
Supported query windows and which tier they read from:
| Window | Tier | Buckets summed |
|--------|--------|----------------|
| 1 min | fine | last 1 |
| 5 min | fine | last 5 |
| 15 min | fine | last 15 |
| 60 min | fine | all 60 |
| 6 h | coarse | last 72 |
| 24 h | coarse | all 288 |
Every minute: snapshot live map → top-50K → append to fine ring, reset live map.
Every 5 minutes: merge last 5 fine snapshots → top-5K → append to coarse ring.
## Memory Budget (Collector, target ≤ 1 GB)
Entry size: ~30 B website + ~15 B prefix + ~50 B URI + 3 B status + 1 B is_tor + 4 B asn + 8 B count + ~80 B Go map
overhead ≈ **~191 bytes per entry**.
| Structure | Entries | Size |
|-------------------------|-------------|-------------|
| Live map (capped) | 100 000 | ~19 MB |
| Fine ring (60 × 1-min) | 60 × 50 000 | ~558 MB |
| Coarse ring (288 × 5-min)| 288 × 5 000| ~268 MB |
| **Total** | | **~845 MB** |
The live map is **hard-capped at 100 K entries**. Once full, only updates to existing keys are
accepted; new keys are dropped until the next rotation resets the map. This keeps memory bounded
regardless of attack cardinality.
## Future Work — ClickHouse Export (post-MVP)
> **Do not implement until the end-to-end MVP is running.**
The aggregator will optionally write 1-minute pre-aggregated rows to ClickHouse for 7d/30d
historical views. Schema sketch:
```sql
CREATE TABLE logtail (
ts DateTime,
website LowCardinality(String),
client_prefix String,
request_uri LowCardinality(String),
status UInt16,
count UInt64
) ENGINE = SummingMergeTree(count)
PARTITION BY toYYYYMMDD(ts)
ORDER BY (ts, website, status, client_prefix, request_uri);
```
The frontend routes `window=7d|30d` queries to ClickHouse; all shorter windows continue to use
the in-memory cache. Kafka is not needed — the aggregator writes directly. This is purely additive
and does not change any existing interface.
## Protobuf API (`proto/logtail.proto`)
```protobuf
enum TorFilter { TOR_ANY = 0; TOR_YES = 1; TOR_NO = 2; }
enum StatusOp { EQ = 0; NE = 1; GT = 2; GE = 3; LT = 4; LE = 5; }
message Filter {
optional string website = 1;
optional string client_prefix = 2;
optional string http_request_uri = 3;
optional int32 http_response = 4;
StatusOp status_op = 5; // comparison operator for http_response
optional string website_regex = 6; // RE2 regex against website
optional string uri_regex = 7; // RE2 regex against http_request_uri
TorFilter tor = 8; // TOR_ANY (default) / TOR_YES / TOR_NO
optional int32 asn_number = 9; // filter by client ASN
StatusOp asn_op = 10; // comparison operator for asn_number
}
enum GroupBy { WEBSITE = 0; CLIENT_PREFIX = 1; REQUEST_URI = 2; HTTP_RESPONSE = 3; ASN_NUMBER = 4; }
enum Window { W1M = 0; W5M = 1; W15M = 2; W60M = 3; W6H = 4; W24H = 5; }
message TopNRequest { Filter filter = 1; GroupBy group_by = 2; int32 n = 3; Window window = 4; }
message TopNEntry { string label = 1; int64 count = 2; }
message TopNResponse { repeated TopNEntry entries = 1; string source = 2; }
// Trend: one total count per minute (or 5-min) bucket, for sparklines
message TrendRequest { Filter filter = 1; Window window = 4; }
message TrendPoint { int64 timestamp_unix = 1; int64 count = 2; }
message TrendResponse { repeated TrendPoint points = 1; string source = 2; }
// Streaming: collector pushes a fine snapshot after every minute rotation
message SnapshotRequest {}
message Snapshot {
string source = 1;
int64 timestamp = 2;
repeated TopNEntry entries = 3; // full top-50K for this bucket
}
// Target discovery: list the collectors behind the queried endpoint
message ListTargetsRequest {}
message TargetInfo {
string name = 1; // display name (--source value from the collector)
string addr = 2; // gRPC address; empty string means "this endpoint itself"
}
message ListTargetsResponse { repeated TargetInfo targets = 1; }
service LogtailService {
rpc TopN(TopNRequest) returns (TopNResponse);
rpc Trend(TrendRequest) returns (TrendResponse);
rpc StreamSnapshots(SnapshotRequest) returns (stream Snapshot);
rpc ListTargets(ListTargetsRequest) returns (ListTargetsResponse);
}
// Both collector and aggregator implement LogtailService.
// The aggregator's StreamSnapshots re-streams the merged view.
// ListTargets: aggregator returns all configured collectors; collector returns itself.
```
## Program 1 — Collector
### tailer.go
- **`MultiTailer`**: one shared `fsnotify.Watcher` for all files regardless of count — avoids
the inotify instance limit when tailing hundreds of files.
- On `WRITE` event: read all new lines from that file's `bufio.Reader`.
- On `RENAME`/`REMOVE` (logrotate): drain old fd to EOF, close, start retry-open goroutine with
exponential backoff. Sends the new `*os.File` back via a channel to keep map access single-threaded.
- Emits `LogRecord` structs on a shared buffered channel (capacity 200 K — absorbs ~20 s of peak).
- Accepts paths via `--logs` (comma-separated or glob) and `--logs-file` (one path/glob per line).
### parser.go
- Parses the fixed **logtail** nginx log format — tab-separated, fixed field order, no quoting:
```nginx
log_format logtail '$host\t$remote_addr\t$msec\t$request_method\t$request_uri\t$status\t$body_bytes_sent\t$request_time\t$is_tor\t$asn';
```
| # | Field | Used for |
|---|-------------------|------------------|
| 0 | `$host` | website |
| 1 | `$remote_addr` | client_prefix |
| 2 | `$msec` | (discarded) |
| 3 | `$request_method` | (discarded) |
| 4 | `$request_uri` | http_request_uri |
| 5 | `$status` | http_response |
| 6 | `$body_bytes_sent`| (discarded) |
| 7 | `$request_time` | (discarded) |
| 8 | `$is_tor` | is_tor |
| 9 | `$asn` | asn |
- `strings.SplitN(line, "\t", 10)` — ~50 ns/line. No regex.
- `$request_uri`: query string discarded at first `?`.
- `$remote_addr`: truncated to /24 (IPv4) or /48 (IPv6); prefix lengths configurable via flags.
- `$is_tor`: `1` if the client IP is a TOR exit node, `0` otherwise. Field is optional — lines
with exactly 8 fields (old format) are accepted and default to `is_tor=false`.
- `$asn`: client AS number as a decimal integer (from MaxMind GeoIP2). Field is optional —
lines without it default to `asn=0`.
- Lines with fewer than 8 fields are silently skipped.
### store.go
- **Single aggregator goroutine** reads from the channel and updates the live map — no locking on
the hot path. At 10 K lines/s the goroutine uses <1% CPU.
- Live map: `map[Tuple6]int64`, hard-capped at 100 K entries (new keys dropped when full).
- **Minute ticker**: heap-selects top-50K entries, writes snapshot to fine ring, resets live map.
- Every 5 fine ticks: merge last 5 fine snapshots → top-5K → write to coarse ring.
- **TopN query**: RLock ring, sum bucket range, apply filter, group by dimension, heap-select top N.
- **Trend query**: per-bucket filtered sum, returns one `TrendPoint` per bucket.
- **Subscriber fan-out**: per-subscriber buffered channel; `Subscribe`/`Unsubscribe` for streaming.
### server.go
- gRPC server on configurable port (default `:9090`).
- `TopN` and `Trend`: unary, answered from the ring buffer under RLock.
- `StreamSnapshots`: registers a subscriber channel; loops `Recv` on it; 30 s keepalive ticker.
## Program 2 — Aggregator
### subscriber.go
- One goroutine per collector. Dials, calls `StreamSnapshots`, forwards each `Snapshot` to the
merger.
- Reconnects with exponential backoff (100 ms → doubles → cap 30 s).
- After 3 consecutive failures: calls `merger.Zero(addr)` to remove that collector's contribution
from the merged view (prevents stale counts accumulating during outages).
- Resets failure count on first successful `Recv`; logs recovery.
### merger.go
- **Delta strategy**: on each new snapshot from collector X, subtract X's previous entries from
`merged`, add the new entries, store new map. O(snapshot_size) per update — not
O(N_collectors × snapshot_size).
- `Zero(addr)`: subtracts the collector's last-known contribution and deletes its entry — called
when a collector is marked degraded.
### cache.go
- **Tick-based rotation** (1-min ticker, not snapshot-triggered): keeps the aggregator ring aligned
to the same 1-minute cadence as collectors regardless of how many collectors are connected.
- Same tiered ring structure as the collector store; populated from `merger.TopK()` each tick.
- `QueryTopN`, `QueryTrend`, `Subscribe`/`Unsubscribe` — identical interface to collector store.
### registry.go
- **`TargetRegistry`**: `sync.RWMutex`-protected `map[addr → name]`. Initialised with the
configured collector addresses; display names are updated from the `source` field of the first
snapshot received from each collector.
- `Targets()` returns a stable sorted slice of `{name, addr}` pairs for `ListTargets` responses.
### server.go
- Implements `LogtailService` backed by the cache (not live fan-out).
- `StreamSnapshots` re-streams merged fine snapshots; usable by a second-tier aggregator or
monitoring system.
- `ListTargets` returns the current `TargetRegistry` contents — all configured collectors with
their display names and gRPC addresses.
## Program 3 — Frontend
### handler.go
- All filter state in the **URL query string**: `w` (window), `by` (group_by), `f_website`,
`f_prefix`, `f_uri`, `f_status`, `f_website_re`, `f_uri_re`, `f_is_tor`, `f_asn`, `n`, `target`. No
server-side session — URLs are shareable and bookmarkable; multiple operators see independent views.
- **Filter expression box**: a `q=` parameter carries a mini filter language
(`status>=400 AND website~=gouda.* AND uri~=^/api/`). On submission the handler parses it
via `ParseFilterExpr` and redirects to the canonical URL with individual `f_*` params; `q=`
never appears in the final URL. Parse errors re-render the current page with an inline message.
- **Status expressions**: `f_status` accepts `200`, `!=200`, `>=400`, `<500`, etc. — parsed by
`store.ParseStatusExpr` into `(value, StatusOp)` for the filter protobuf.
- **ASN expressions**: `f_asn` accepts the same expression syntax (`12345`, `!=65000`, `>=1000`,
`<64512`, etc.) — also parsed by `store.ParseStatusExpr`, stored as `(asn_number, AsnOp)` in the
filter protobuf.
- **Regex filters**: `f_website_re` and `f_uri_re` hold RE2 patterns; compiled once per request
into `store.CompiledFilter` before the query-loop iteration. Invalid regexes match nothing.
- `TopN`, `Trend`, and `ListTargets` RPCs issued **concurrently** (all with a 5 s deadline); page
renders with whatever completes. Trend failure suppresses the sparkline; `ListTargets` failure
hides the source picker — both are non-fatal.
- **Source picker**: `ListTargets` result drives a `source:` tab row. Clicking a collector tab
sets `target=` to that collector's address, querying it directly. The "all" tab resets to the
default aggregator. Picker is hidden when `ListTargets` returns ≤0 collectors (direct collector
mode).
- **Drilldown**: clicking a table row adds the current dimension's filter and advances `by` through
`website → prefix → uri → status → asn → website` (cycles).
- **`raw=1`**: returns the TopN result as JSON — same URL, no CLI needed for scripting.
- **`target=` override**: per-request gRPC endpoint override for comparing sources.
- Error pages render at HTTP 502 with the window/group-by tabs still functional.
### sparkline.go
- `renderSparkline([]*pb.TrendPoint) template.HTML` — fixed `viewBox="0 0 300 60"` SVG,
Y-scaled to max count, rendered as `<polyline>`. Returns `""` for fewer than 2 points or
all-zero data.
### templates/
- `base.html`: outer shell, inline CSS (~40 lines), conditional `<meta http-equiv="refresh">`.
- `index.html`: window tabs, group-by tabs, filter breadcrumb with `×` remove links, sparkline,
TopN table with `<meter>` bars (% relative to rank-1), footer with source and refresh info.
- No external CSS, no web fonts, no JavaScript. Renders in w3m/lynx.
## Program 4 — CLI
### Subcommands
```
logtail-cli topn [flags] ranked label → count table (exits after one response)
logtail-cli trend [flags] per-bucket time series (exits after one response)
logtail-cli stream [flags] live snapshot feed (runs until Ctrl-C, auto-reconnects)
logtail-cli targets [flags] list targets known to the queried endpoint
```
### Flags
**Shared** (all subcommands):
| Flag | Default | Description |
|---------------|------------------|----------------------------------------------------------|
| `--target` | `localhost:9090` | Comma-separated `host:port` list; fan-out to all |
| `--json` | false | Emit newline-delimited JSON instead of a table |
| `--website` | — | Filter: website |
| `--prefix` | — | Filter: client prefix |
| `--uri` | — | Filter: request URI |
| `--status` | — | Filter: HTTP status expression (`200`, `!=200`, `>=400`, `<500`, …) |
| `--website-re`| — | Filter: RE2 regex against website |
| `--uri-re` | — | Filter: RE2 regex against request URI |
| `--is-tor` | — | Filter: TOR traffic (`1` or `!=0` = TOR only; `0` or `!=1` = non-TOR only) |
| `--asn` | — | Filter: ASN expression (`12345`, `!=65000`, `>=1000`, `<64512`, …) |
**`topn` only**: `--n 10`, `--window 5m`, `--group-by website`
**`trend` only**: `--window 5m`
### Multi-target fan-out
`--target` accepts a comma-separated list. All targets are queried concurrently; results are
printed in order with a per-target header. Single-target output omits the header for clean
pipe-to-`jq` use.
### Output
Default: human-readable table with space-separated thousands (`18 432`).
`--json`: a single JSON array (one object per target) for `topn` and `trend`; NDJSON for `stream` (unbounded).
`stream` reconnects automatically on error (5 s backoff). All other subcommands exit immediately
with a non-zero code on gRPC error.
## Key Design Decisions
| Decision | Rationale |
|----------|-----------|
| Single aggregator goroutine in collector | Eliminates all map lock contention on the 10 K/s hot path |
| Hard cap live map at 100 K entries | Bounds memory regardless of DDoS cardinality explosion |
| Ring buffer of sorted snapshots (not raw maps) | TopN queries avoid re-sorting; merge is a single heap pass |
| Push-based streaming (collector → aggregator) | Aggregator cache always fresh; query latency is cache-read only |
| Delta merge in aggregator | O(snapshot_size) per update, not O(N_collectors × size) |
| Tick-based cache rotation in aggregator | Ring stays on the same 1-min cadence regardless of collector count |
| Degraded collector zeroing | Stale counts from failed collectors don't accumulate in the merged view |
| Same `LogtailService` for collector and aggregator | CLI and frontend work with either; no special-casing |
| `internal/store` shared package | ring-buffer, `Tuple6` encoding, and filter logic shared between collector and aggregator |
| Filter state in URL, not session cookie | Multiple concurrent operators; shareable/bookmarkable URLs |
| Query strings stripped at ingest | Major cardinality reduction; prevents URI explosion under attack |
| No persistent storage | Simplicity; acceptable for ops dashboards (restart = lose history) |
| Trusted internal network, no TLS | Reduces operational complexity; add a TLS proxy if needed later |
| Server-side SVG sparklines, meta-refresh | Zero JS dependencies; works in terminal browsers and curl |
| CLI default: human-readable table | Operator-friendly by default; `--json` opt-in for scripting |
| CLI multi-target fan-out | Compare a collector vs. aggregator, or two collectors, in one command |
| CLI uses stdlib `flag`, no framework | Four subcommands don't justify a dependency |
| Status filter as expression string (`!=200`, `>=400`) | Operator-friendly; parsed once at query boundary, encoded as `(int32, StatusOp)` in proto |
| ASN filter reuses `StatusOp` and `ParseStatusExpr` | Same 6-operator grammar as status; no duplicate enum or parser needed |
| Regex filters compiled once per query (`CompiledFilter`) | Up to 288 × 5 000 per-entry calls — compiling per-entry would dominate query latency |
| Filter expression box (`q=`) redirects to canonical URL | Filter state stays in individual `f_*` params; URLs remain shareable and bookmarkable |
| `ListTargets` + frontend source picker | "Which nginx is busiest?" answered by switching `target=` to a collector; no data model changes, no extra memory |

165
cmd/aggregator/backfill.go Normal file
View File

@@ -0,0 +1,165 @@
package main
import (
"context"
"io"
"log"
"sort"
"time"
st "git.ipng.ch/ipng/nginx-logtail/internal/store"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)
// Backfill calls DumpSnapshots on all collectors concurrently, merges their
// data per timestamp, and loads the result into the cache. It blocks until all
// collectors have responded or the context is cancelled.
func Backfill(ctx context.Context, collectorAddrs []string, cache *Cache) {
type result struct {
fine []st.Snapshot
coarse []st.Snapshot
}
ch := make(chan result, len(collectorAddrs))
for _, addr := range collectorAddrs {
addr := addr
go func() {
start := time.Now()
fine, coarse, err := dumpCollector(ctx, addr)
if err != nil {
if status.Code(err) == codes.Unimplemented {
log.Printf("backfill: %s: collector does not support DumpSnapshots (old binary), skipping", addr)
} else {
log.Printf("backfill: %s: failed after %s: %v", addr, time.Since(start).Round(time.Millisecond), err)
}
ch <- result{}
return
}
var fineEntries, coarseEntries int
for _, s := range fine {
fineEntries += len(s.Entries)
}
for _, s := range coarse {
coarseEntries += len(s.Entries)
}
log.Printf("backfill: %s: %d fine buckets (%d entries) + %d coarse buckets (%d entries) in %s",
addr, len(fine), fineEntries, len(coarse), coarseEntries, time.Since(start).Round(time.Millisecond))
ch <- result{fine, coarse}
}()
}
// Collect per-timestamp maps: unix-minute → label → total count.
fineByTS := make(map[int64]map[string]int64)
coarseByTS := make(map[int64]map[string]int64)
for range collectorAddrs {
r := <-ch
mergeDump(r.fine, fineByTS, time.Minute)
mergeDump(r.coarse, coarseByTS, 5*time.Minute)
}
mergeStart := time.Now()
fine := buildSnapshots(fineByTS, st.FineTopK, st.FineRingSize)
coarse := buildSnapshots(coarseByTS, st.CoarseTopK, st.CoarseRingSize)
log.Printf("backfill: merge+topk took %s", time.Since(mergeStart).Round(time.Microsecond))
if len(fine)+len(coarse) == 0 {
log.Printf("backfill: no data received from any collector")
return
}
loadStart := time.Now()
cache.LoadHistorical(fine, coarse)
log.Printf("backfill: loaded %d fine + %d coarse buckets in %s",
len(fine), len(coarse), time.Since(loadStart).Round(time.Microsecond))
}
// dumpCollector calls DumpSnapshots on one collector and returns the fine and
// coarse ring snapshots as separate slices.
func dumpCollector(ctx context.Context, addr string) (fine, coarse []st.Snapshot, err error) {
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, nil, err
}
defer conn.Close()
client := pb.NewLogtailServiceClient(conn)
stream, err := client.DumpSnapshots(ctx, &pb.DumpSnapshotsRequest{})
if err != nil {
return nil, nil, err
}
for {
msg, err := stream.Recv()
if err == io.EOF {
return fine, coarse, nil
}
if err != nil {
return fine, coarse, err
}
snap := st.Snapshot{
Timestamp: time.Unix(msg.Timestamp, 0),
Entries: pbEntriesToStore(msg.Entries),
}
if msg.IsCoarse {
coarse = append(coarse, snap)
} else {
fine = append(fine, snap)
}
}
}
// mergeDump adds all snapshots from one collector's dump into the per-timestamp
// accumulator map. Multiple collectors' entries for the same timestamp are summed.
// granularity should match the ring bucket size (time.Minute for fine, 5*time.Minute for coarse).
func mergeDump(snaps []st.Snapshot, byTS map[int64]map[string]int64, granularity time.Duration) {
for _, snap := range snaps {
ts := snap.Timestamp.Truncate(granularity).Unix()
m := byTS[ts]
if m == nil {
m = make(map[string]int64, len(snap.Entries))
byTS[ts] = m
}
for _, e := range snap.Entries {
m[e.Label] += e.Count
}
}
}
// buildSnapshots sorts the per-timestamp map chronologically, runs TopK on each
// bucket, and returns a slice capped to ringSize oldest-first snapshots.
func buildSnapshots(byTS map[int64]map[string]int64, topK, ringSize int) []st.Snapshot {
if len(byTS) == 0 {
return nil
}
timestamps := make([]int64, 0, len(byTS))
for ts := range byTS {
timestamps = append(timestamps, ts)
}
sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] })
// Keep only the most recent ringSize buckets.
if len(timestamps) > ringSize {
timestamps = timestamps[len(timestamps)-ringSize:]
}
snaps := make([]st.Snapshot, len(timestamps))
for i, ts := range timestamps {
snaps[i] = st.Snapshot{
Timestamp: time.Unix(ts, 0),
Entries: st.TopKFromMap(byTS[ts], topK),
}
}
return snaps
}
func pbEntriesToStore(entries []*pb.TopNEntry) []st.Entry {
out := make([]st.Entry, len(entries))
for i, e := range entries {
out[i] = st.Entry{Label: e.Label, Count: e.Count}
}
return out
}

View File

@@ -90,6 +90,26 @@ func (c *Cache) mergeFineBuckets(now time.Time) st.Snapshot {
return st.Snapshot{Timestamp: now, Entries: st.TopKFromMap(merged, st.CoarseTopK)} return st.Snapshot{Timestamp: now, Entries: st.TopKFromMap(merged, st.CoarseTopK)}
} }
// LoadHistorical pre-populates the ring buffers from backfill data before live
// streaming begins. fine and coarse must be sorted oldest-first; each slice
// must not exceed the respective ring size. Called once at startup, before Run.
func (c *Cache) LoadHistorical(fine, coarse []st.Snapshot) {
c.mu.Lock()
defer c.mu.Unlock()
for i, snap := range fine {
c.fineRing[i] = snap
}
c.fineFilled = len(fine)
c.fineHead = len(fine) % st.FineRingSize
for i, snap := range coarse {
c.coarseRing[i] = snap
}
c.coarseFilled = len(coarse)
c.coarseHead = len(coarse) % st.CoarseRingSize
}
// QueryTopN answers a TopN request from the ring buffers. // QueryTopN answers a TopN request from the ring buffers.
func (c *Cache) QueryTopN(filter *pb.Filter, groupBy pb.GroupBy, n int, window pb.Window) []st.Entry { func (c *Cache) QueryTopN(filter *pb.Filter, groupBy pb.GroupBy, n int, window pb.Window) []st.Entry {
cf := st.CompileFilter(filter) cf := st.CompileFilter(filter)

View File

@@ -15,13 +15,13 @@ import (
) )
func main() { func main() {
listen := flag.String("listen", ":9091", "gRPC listen address") listen := flag.String("listen", envOr("AGGREGATOR_LISTEN", ":9091"), "gRPC listen address (env: AGGREGATOR_LISTEN)")
collectors := flag.String("collectors", "", "comma-separated collector host:port addresses") collectors := flag.String("collectors", envOr("AGGREGATOR_COLLECTORS", ""), "comma-separated collector host:port addresses (env: AGGREGATOR_COLLECTORS)")
source := flag.String("source", hostname(), "name for this aggregator in responses") source := flag.String("source", envOr("AGGREGATOR_SOURCE", hostname()), "name for this aggregator in responses (env: AGGREGATOR_SOURCE, default: hostname)")
flag.Parse() flag.Parse()
if *collectors == "" { if *collectors == "" {
log.Fatal("aggregator: --collectors is required") log.Fatal("aggregator: --collectors / AGGREGATOR_COLLECTORS is required")
} }
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
@@ -38,13 +38,6 @@ func main() {
merger := NewMerger() merger := NewMerger()
cache := NewCache(merger, *source) cache := NewCache(merger, *source)
registry := NewTargetRegistry(collectorAddrs) registry := NewTargetRegistry(collectorAddrs)
go cache.Run(ctx)
for _, addr := range collectorAddrs {
sub := NewCollectorSub(addr, merger, registry)
go sub.Run(ctx)
log.Printf("aggregator: subscribing to collector %s", addr)
}
lis, err := net.Listen("tcp", *listen) lis, err := net.Listen("tcp", *listen)
if err != nil { if err != nil {
@@ -60,6 +53,17 @@ func main() {
} }
}() }()
go cache.Run(ctx)
for _, addr := range collectorAddrs {
sub := NewCollectorSub(addr, merger, registry)
go sub.Run(ctx)
log.Printf("aggregator: subscribing to collector %s", addr)
}
log.Printf("aggregator: backfilling from %d collector(s)", len(collectorAddrs))
go Backfill(ctx, collectorAddrs, cache)
<-ctx.Done() <-ctx.Done()
log.Printf("aggregator: shutting down") log.Printf("aggregator: shutting down")
grpcServer.GracefulStop() grpcServer.GracefulStop()
@@ -72,3 +76,10 @@ func hostname() string {
} }
return h return h
} }
func envOr(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}

View File

@@ -12,16 +12,18 @@ import (
// sharedFlags holds the flags common to every subcommand. // sharedFlags holds the flags common to every subcommand.
type sharedFlags struct { type sharedFlags struct {
targets []string targets []string
jsonOut bool jsonOut bool
website string website string
prefix string prefix string
uri string uri string
status string // expression: "200", "!=200", ">=400", etc. status string // expression: "200", "!=200", ">=400", etc.
websiteRe string // RE2 regex against website websiteRe string // RE2 regex against website
uriRe string // RE2 regex against request URI uriRe string // RE2 regex against request URI
isTor string // "", "1" / "!=0" (TOR only), "0" / "!=1" (non-TOR only) websiteReNeg string // RE2 regex exclusion against website
asn string // expression: "12345", "!=65000", ">=1000", etc. uriReNeg string // RE2 regex exclusion against request URI
isTor string // "", "1" / "!=0" (TOR only), "0" / "!=1" (non-TOR only)
asn string // expression: "12345", "!=65000", ">=1000", etc.
} }
// bindShared registers the shared flags on fs and returns a pointer to the // bindShared registers the shared flags on fs and returns a pointer to the
@@ -36,6 +38,8 @@ func bindShared(fs *flag.FlagSet) (*sharedFlags, *string) {
fs.StringVar(&sf.status, "status", "", "filter: HTTP status expression (200, !=200, >=400, <500, …)") fs.StringVar(&sf.status, "status", "", "filter: HTTP status expression (200, !=200, >=400, <500, …)")
fs.StringVar(&sf.websiteRe, "website-re", "", "filter: RE2 regex against website") fs.StringVar(&sf.websiteRe, "website-re", "", "filter: RE2 regex against website")
fs.StringVar(&sf.uriRe, "uri-re", "", "filter: RE2 regex against request URI") fs.StringVar(&sf.uriRe, "uri-re", "", "filter: RE2 regex against request URI")
fs.StringVar(&sf.websiteReNeg, "website-re-neg", "", "filter: RE2 regex exclusion against website")
fs.StringVar(&sf.uriReNeg, "uri-re-neg", "", "filter: RE2 regex exclusion against request URI")
fs.StringVar(&sf.isTor, "is-tor", "", "filter: TOR traffic (1 or !=0 = TOR only; 0 or !=1 = non-TOR only)") fs.StringVar(&sf.isTor, "is-tor", "", "filter: TOR traffic (1 or !=0 = TOR only; 0 or !=1 = non-TOR only)")
fs.StringVar(&sf.asn, "asn", "", "filter: ASN expression (12345, !=65000, >=1000, <64512, …)") fs.StringVar(&sf.asn, "asn", "", "filter: ASN expression (12345, !=65000, >=1000, <64512, …)")
return sf, target return sf, target
@@ -60,7 +64,7 @@ func parseTargets(s string) []string {
} }
func buildFilter(sf *sharedFlags) *pb.Filter { func buildFilter(sf *sharedFlags) *pb.Filter {
if sf.website == "" && sf.prefix == "" && sf.uri == "" && sf.status == "" && sf.websiteRe == "" && sf.uriRe == "" && sf.isTor == "" && sf.asn == "" { if sf.website == "" && sf.prefix == "" && sf.uri == "" && sf.status == "" && sf.websiteRe == "" && sf.uriRe == "" && sf.websiteReNeg == "" && sf.uriReNeg == "" && sf.isTor == "" && sf.asn == "" {
return nil return nil
} }
f := &pb.Filter{} f := &pb.Filter{}
@@ -88,6 +92,12 @@ func buildFilter(sf *sharedFlags) *pb.Filter {
if sf.uriRe != "" { if sf.uriRe != "" {
f.UriRegex = &sf.uriRe f.UriRegex = &sf.uriRe
} }
if sf.websiteReNeg != "" {
f.WebsiteRegexExclude = &sf.websiteReNeg
}
if sf.uriReNeg != "" {
f.UriRegexExclude = &sf.uriReNeg
}
switch sf.isTor { switch sf.isTor {
case "1", "!=0": case "1", "!=0":
f.Tor = pb.TorFilter_TOR_YES f.Tor = pb.TorFilter_TOR_YES

View File

@@ -6,9 +6,11 @@ import (
"flag" "flag"
"log" "log"
"net" "net"
"net/http"
"os" "os"
"os/signal" "os/signal"
"path/filepath" "path/filepath"
"strconv"
"strings" "strings"
"syscall" "syscall"
"time" "time"
@@ -18,13 +20,14 @@ import (
) )
func main() { func main() {
listen := flag.String("listen", ":9090", "gRPC listen address") listen := flag.String("listen", envOr("COLLECTOR_LISTEN", ":9090"), "gRPC listen address (env: COLLECTOR_LISTEN)")
logPaths := flag.String("logs", "", "comma-separated log file paths/globs to tail") promListen := flag.String("prom-listen", envOr("COLLECTOR_PROM_LISTEN", ":9100"), "Prometheus metrics listen address, empty to disable (env: COLLECTOR_PROM_LISTEN)")
logsFile := flag.String("logs-file", "", "file containing one log path/glob per line") logPaths := flag.String("logs", envOr("COLLECTOR_LOGS", ""), "comma-separated log file paths/globs to tail (env: COLLECTOR_LOGS)")
source := flag.String("source", hostname(), "name for this collector (default: hostname)") logsFile := flag.String("logs-file", envOr("COLLECTOR_LOGS_FILE", ""), "file containing one log path/glob per line (env: COLLECTOR_LOGS_FILE)")
v4prefix := flag.Int("v4prefix", 24, "IPv4 prefix length for client bucketing") source := flag.String("source", envOr("COLLECTOR_SOURCE", hostname()), "name for this collector (env: COLLECTOR_SOURCE, default: hostname)")
v6prefix := flag.Int("v6prefix", 48, "IPv6 prefix length for client bucketing") v4prefix := flag.Int("v4prefix", envOrInt("COLLECTOR_V4PREFIX", 24), "IPv4 prefix length for client bucketing (env: COLLECTOR_V4PREFIX)")
scanInterval := flag.Duration("scan-interval", 10*time.Second, "how often to rescan glob patterns for new/removed files") v6prefix := flag.Int("v6prefix", envOrInt("COLLECTOR_V6PREFIX", 48), "IPv6 prefix length for client bucketing (env: COLLECTOR_V6PREFIX)")
scanInterval := flag.Duration("scan-interval", envOrDuration("COLLECTOR_SCAN_INTERVAL", 10*time.Second), "how often to rescan glob patterns for new/removed files (env: COLLECTOR_SCAN_INTERVAL)")
flag.Parse() flag.Parse()
patterns := collectPatterns(*logPaths, *logsFile) patterns := collectPatterns(*logPaths, *logsFile)
@@ -40,6 +43,18 @@ func main() {
ch := make(chan LogRecord, 200_000) ch := make(chan LogRecord, 200_000)
store := NewStore(*source) store := NewStore(*source)
if *promListen != "" {
ps := NewPromStore()
store.prom = ps
mux := http.NewServeMux()
mux.Handle("/metrics", ps)
go func() {
log.Printf("collector: Prometheus metrics on %s/metrics", *promListen)
if err := http.ListenAndServe(*promListen, mux); err != nil {
log.Fatalf("collector: Prometheus server: %v", err)
}
}()
}
go store.Run(ch) go store.Run(ch)
tailer := NewMultiTailer(patterns, *scanInterval, *v4prefix, *v6prefix, ch) tailer := NewMultiTailer(patterns, *scanInterval, *v4prefix, *v6prefix, ch)
@@ -140,3 +155,30 @@ func hostname() string {
} }
return h return h
} }
func envOr(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
func envOrInt(key string, def int) int {
if v := os.Getenv(key); v != "" {
if n, err := strconv.Atoi(v); err == nil {
return n
}
log.Printf("collector: invalid int for %s=%q, using default %d", key, v, def)
}
return def
}
func envOrDuration(key string, def time.Duration) time.Duration {
if v := os.Getenv(key); v != "" {
if d, err := time.ParseDuration(v); err == nil {
return d
}
log.Printf("collector: invalid duration for %s=%q, using default %s", key, v, def)
}
return def
}

View File

@@ -9,12 +9,15 @@ import (
// LogRecord holds the dimensions extracted from a single nginx log line. // LogRecord holds the dimensions extracted from a single nginx log line.
type LogRecord struct { type LogRecord struct {
Website string Website string
ClientPrefix string ClientPrefix string
URI string URI string
Status string Status string
IsTor bool IsTor bool
ASN int32 ASN int32
Method string
BodyBytesSent int64
RequestTime float64
} }
// ParseLine parses a tab-separated logtail log line: // ParseLine parses a tab-separated logtail log line:
@@ -51,13 +54,26 @@ func ParseLine(line string, v4bits, v6bits int) (LogRecord, bool) {
} }
} }
var bodyBytes int64
if n, err := strconv.ParseInt(fields[6], 10, 64); err == nil {
bodyBytes = n
}
var reqTime float64
if f, err := strconv.ParseFloat(fields[7], 64); err == nil {
reqTime = f
}
return LogRecord{ return LogRecord{
Website: fields[0], Website: fields[0],
ClientPrefix: prefix, ClientPrefix: prefix,
URI: uri, URI: uri,
Status: fields[5], Status: fields[5],
IsTor: isTor, IsTor: isTor,
ASN: asn, ASN: asn,
Method: fields[3],
BodyBytesSent: bodyBytes,
RequestTime: reqTime,
}, true }, true
} }

View File

@@ -18,10 +18,13 @@ func TestParseLine(t *testing.T) {
line: good, line: good,
wantOK: true, wantOK: true,
want: LogRecord{ want: LogRecord{
Website: "www.example.com", Website: "www.example.com",
ClientPrefix: "1.2.3.0/24", ClientPrefix: "1.2.3.0/24",
URI: "/api/v1/search", URI: "/api/v1/search",
Status: "200", Status: "200",
Method: "GET",
BodyBytesSent: 1452,
RequestTime: 0.043,
}, },
}, },
{ {
@@ -33,6 +36,8 @@ func TestParseLine(t *testing.T) {
ClientPrefix: "10.0.0.0/24", ClientPrefix: "10.0.0.0/24",
URI: "/submit", URI: "/submit",
Status: "201", Status: "201",
Method: "POST",
RequestTime: 0.001,
}, },
}, },
{ {
@@ -44,6 +49,8 @@ func TestParseLine(t *testing.T) {
ClientPrefix: "2001:db8:cafe::/48", // /48 = 3 full 16-bit groups intact ClientPrefix: "2001:db8:cafe::/48", // /48 = 3 full 16-bit groups intact
URI: "/", URI: "/",
Status: "200", Status: "200",
Method: "GET",
RequestTime: 0.001,
}, },
}, },
{ {
@@ -70,6 +77,8 @@ func TestParseLine(t *testing.T) {
ClientPrefix: "5.6.7.0/24", ClientPrefix: "5.6.7.0/24",
URI: "/rate-limited", URI: "/rate-limited",
Status: "429", Status: "429",
Method: "GET",
RequestTime: 0.001,
}, },
}, },
{ {
@@ -82,6 +91,8 @@ func TestParseLine(t *testing.T) {
URI: "/", URI: "/",
Status: "200", Status: "200",
IsTor: true, IsTor: true,
Method: "GET",
RequestTime: 0.001,
}, },
}, },
{ {
@@ -94,6 +105,8 @@ func TestParseLine(t *testing.T) {
URI: "/", URI: "/",
Status: "200", Status: "200",
IsTor: false, IsTor: false,
Method: "GET",
RequestTime: 0.001,
}, },
}, },
{ {
@@ -106,6 +119,8 @@ func TestParseLine(t *testing.T) {
URI: "/", URI: "/",
Status: "200", Status: "200",
IsTor: false, IsTor: false,
Method: "GET",
RequestTime: 0.001,
}, },
}, },
{ {
@@ -119,6 +134,8 @@ func TestParseLine(t *testing.T) {
Status: "200", Status: "200",
IsTor: false, IsTor: false,
ASN: 12345, ASN: 12345,
Method: "GET",
RequestTime: 0.001,
}, },
}, },
{ {
@@ -132,6 +149,8 @@ func TestParseLine(t *testing.T) {
Status: "200", Status: "200",
IsTor: true, IsTor: true,
ASN: 65535, ASN: 65535,
Method: "GET",
RequestTime: 0.001,
}, },
}, },
{ {
@@ -145,6 +164,8 @@ func TestParseLine(t *testing.T) {
Status: "200", Status: "200",
IsTor: true, IsTor: true,
ASN: 0, ASN: 0,
Method: "GET",
RequestTime: 0.001,
}, },
}, },
{ {
@@ -158,6 +179,8 @@ func TestParseLine(t *testing.T) {
Status: "200", Status: "200",
IsTor: false, IsTor: false,
ASN: 0, ASN: 0,
Method: "GET",
RequestTime: 0.001,
}, },
}, },
} }

209
cmd/collector/prom.go Normal file
View File

@@ -0,0 +1,209 @@
package main
import (
"bufio"
"fmt"
"net/http"
"sort"
"strings"
"sync"
)
// Body-size histogram bucket upper bounds in bytes.
const promNumBodyBounds = 7
var promBodyBounds = [promNumBodyBounds]int64{256, 1024, 4096, 16384, 65536, 262144, 1048576}
// Request-time histogram bucket upper bounds in seconds (standard Prometheus defaults).
const promNumTimeBounds = 11
var promTimeBounds = [promNumTimeBounds]float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
const promCounterCap = 100_000 // safety cap on {host,method,status} counter entries
// promCounterKey is the label set for per-request counters.
type promCounterKey struct {
Host string
Method string
Status string
}
// promBodyEntry holds the body_bytes_sent histogram for one host.
type promBodyEntry struct {
buckets [promNumBodyBounds + 1]int64 // indices 0..N-1: le=bound[i]; index N: le=+Inf
sum int64
}
// promTimeEntry holds the request_time histogram for one host.
type promTimeEntry struct {
buckets [promNumTimeBounds + 1]int64
sum float64
}
// PromStore accumulates Prometheus metrics ingested from log records.
//
// Ingest must be called from exactly one goroutine (the store's Run goroutine).
// ServeHTTP may be called from any number of goroutines concurrently.
type PromStore struct {
mu sync.Mutex
counters map[promCounterKey]int64
body map[string]*promBodyEntry // keyed by host
reqTime map[string]*promTimeEntry // keyed by host
}
// NewPromStore returns an empty PromStore ready for use.
func NewPromStore() *PromStore {
return &PromStore{
counters: make(map[promCounterKey]int64, 1024),
body: make(map[string]*promBodyEntry, 64),
reqTime: make(map[string]*promTimeEntry, 64),
}
}
// Ingest records one log record into the Prometheus metrics.
// Must be called from a single goroutine.
func (p *PromStore) Ingest(r LogRecord) {
p.mu.Lock()
// --- per-{host,method,status} request counter ---
ck := promCounterKey{Host: r.Website, Method: r.Method, Status: r.Status}
if _, ok := p.counters[ck]; ok {
p.counters[ck]++
} else if len(p.counters) < promCounterCap {
p.counters[ck] = 1
}
// --- body_bytes_sent histogram (keyed by host only) ---
be, ok := p.body[r.Website]
if !ok {
be = &promBodyEntry{}
p.body[r.Website] = be
}
for i, bound := range promBodyBounds {
if r.BodyBytesSent <= bound {
be.buckets[i]++
}
}
be.buckets[promNumBodyBounds]++ // +Inf
be.sum += r.BodyBytesSent
// --- request_time histogram (keyed by host only) ---
te, ok := p.reqTime[r.Website]
if !ok {
te = &promTimeEntry{}
p.reqTime[r.Website] = te
}
for i, bound := range promTimeBounds {
if r.RequestTime <= bound {
te.buckets[i]++
}
}
te.buckets[promNumTimeBounds]++ // +Inf
te.sum += r.RequestTime
p.mu.Unlock()
}
// ServeHTTP renders all metrics in the Prometheus text exposition format (0.0.4).
func (p *PromStore) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
// Snapshot everything under the lock, then render without holding it.
p.mu.Lock()
type counterSnap struct {
k promCounterKey
v int64
}
counters := make([]counterSnap, 0, len(p.counters))
for k, v := range p.counters {
counters = append(counters, counterSnap{k, v})
}
type bodySnap struct {
host string
e promBodyEntry
}
bodySnaps := make([]bodySnap, 0, len(p.body))
for h, e := range p.body {
bodySnaps = append(bodySnaps, bodySnap{h, *e})
}
type timeSnap struct {
host string
e promTimeEntry
}
timeSnaps := make([]timeSnap, 0, len(p.reqTime))
for h, e := range p.reqTime {
timeSnaps = append(timeSnaps, timeSnap{h, *e})
}
p.mu.Unlock()
// Sort for stable, human-readable output.
sort.Slice(counters, func(i, j int) bool {
a, b := counters[i].k, counters[j].k
if a.Host != b.Host {
return a.Host < b.Host
}
if a.Method != b.Method {
return a.Method < b.Method
}
return a.Status < b.Status
})
sort.Slice(bodySnaps, func(i, j int) bool { return bodySnaps[i].host < bodySnaps[j].host })
sort.Slice(timeSnaps, func(i, j int) bool { return timeSnaps[i].host < timeSnaps[j].host })
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
bw := bufio.NewWriterSize(w, 256*1024)
// nginx_http_requests_total
fmt.Fprintln(bw, "# HELP nginx_http_requests_total Total number of HTTP requests processed.")
fmt.Fprintln(bw, "# TYPE nginx_http_requests_total counter")
for _, c := range counters {
fmt.Fprintf(bw, "nginx_http_requests_total{host=%q,method=%q,status=%q} %d\n",
c.k.Host, c.k.Method, c.k.Status, c.v)
}
// nginx_http_response_body_bytes (histogram, labeled by host)
fmt.Fprintln(bw, "# HELP nginx_http_response_body_bytes HTTP response body size distribution in bytes.")
fmt.Fprintln(bw, "# TYPE nginx_http_response_body_bytes histogram")
for _, s := range bodySnaps {
for i, bound := range promBodyBounds {
fmt.Fprintf(bw, "nginx_http_response_body_bytes_bucket{host=%q,le=%q} %d\n",
s.host, fmt.Sprintf("%d", bound), s.e.buckets[i])
}
fmt.Fprintf(bw, "nginx_http_response_body_bytes_bucket{host=%q,le=\"+Inf\"} %d\n",
s.host, s.e.buckets[promNumBodyBounds])
fmt.Fprintf(bw, "nginx_http_response_body_bytes_count{host=%q} %d\n",
s.host, s.e.buckets[promNumBodyBounds])
fmt.Fprintf(bw, "nginx_http_response_body_bytes_sum{host=%q} %d\n",
s.host, s.e.sum)
}
// nginx_http_request_duration_seconds (histogram, labeled by host)
fmt.Fprintln(bw, "# HELP nginx_http_request_duration_seconds HTTP request processing time in seconds.")
fmt.Fprintln(bw, "# TYPE nginx_http_request_duration_seconds histogram")
for _, s := range timeSnaps {
for i, bound := range promTimeBounds {
fmt.Fprintf(bw, "nginx_http_request_duration_seconds_bucket{host=%q,le=%q} %d\n",
s.host, formatFloat(bound), s.e.buckets[i])
}
fmt.Fprintf(bw, "nginx_http_request_duration_seconds_bucket{host=%q,le=\"+Inf\"} %d\n",
s.host, s.e.buckets[promNumTimeBounds])
fmt.Fprintf(bw, "nginx_http_request_duration_seconds_count{host=%q} %d\n",
s.host, s.e.buckets[promNumTimeBounds])
fmt.Fprintf(bw, "nginx_http_request_duration_seconds_sum{host=%q} %g\n",
s.host, s.e.sum)
}
bw.Flush()
}
// formatFloat renders a float64 bucket bound without trailing zeros but always
// with at least one decimal place, matching Prometheus convention (e.g. "0.5", "10").
func formatFloat(f float64) string {
s := fmt.Sprintf("%g", f)
if !strings.Contains(s, ".") && !strings.Contains(s, "e") {
s += ".0" // ensure it looks like a float, not an integer
}
return s
}

130
cmd/collector/prom_test.go Normal file
View File

@@ -0,0 +1,130 @@
package main
import (
"net/http/httptest"
"strings"
"testing"
)
func TestPromStoreIngestBodyBuckets(t *testing.T) {
ps := NewPromStore()
// 512 bytes: > 256, ≤ 1024 → bucket[0] stays 0, buckets[1..N] get 1
ps.Ingest(LogRecord{Website: "example.com", Method: "GET", Status: "200", BodyBytesSent: 512})
ps.mu.Lock()
be := ps.body["example.com"]
ps.mu.Unlock()
if be == nil {
t.Fatal("expected body entry, got nil")
}
if be.buckets[0] != 0 { // le=256: 512 > 256
t.Errorf("le=256 bucket = %d, want 0", be.buckets[0])
}
if be.buckets[1] != 1 { // le=1024: 512 ≤ 1024
t.Errorf("le=1024 bucket = %d, want 1", be.buckets[1])
}
for i := 2; i <= promNumBodyBounds; i++ {
if be.buckets[i] != 1 {
t.Errorf("bucket[%d] = %d, want 1", i, be.buckets[i])
}
}
if be.sum != 512 {
t.Errorf("sum = %d, want 512", be.sum)
}
}
func TestPromStoreIngestTimeBuckets(t *testing.T) {
ps := NewPromStore()
// 0.075s: > 0.05, ≤ 0.1
ps.Ingest(LogRecord{Website: "example.com", Method: "GET", Status: "200", RequestTime: 0.075})
ps.mu.Lock()
te := ps.reqTime["example.com"]
ps.mu.Unlock()
if te == nil {
t.Fatal("expected time entry, got nil")
}
// le=0.05 (index 3): 0.075 > 0.05 → 0
if te.buckets[3] != 0 {
t.Errorf("le=0.05 bucket = %d, want 0", te.buckets[3])
}
// le=0.1 (index 4): 0.075 ≤ 0.1 → 1
if te.buckets[4] != 1 {
t.Errorf("le=0.1 bucket = %d, want 1", te.buckets[4])
}
// +Inf (last): always 1
if te.buckets[promNumTimeBounds] != 1 {
t.Errorf("+Inf bucket = %d, want 1", te.buckets[promNumTimeBounds])
}
}
func TestPromStoreCounter(t *testing.T) {
ps := NewPromStore()
ps.Ingest(LogRecord{Website: "a.com", Method: "GET", Status: "200"})
ps.Ingest(LogRecord{Website: "a.com", Method: "GET", Status: "200"})
ps.Ingest(LogRecord{Website: "a.com", Method: "POST", Status: "201"})
ps.mu.Lock()
c1 := ps.counters[promCounterKey{"a.com", "GET", "200"}]
c2 := ps.counters[promCounterKey{"a.com", "POST", "201"}]
ps.mu.Unlock()
if c1 != 2 {
t.Errorf("GET/200 count = %d, want 2", c1)
}
if c2 != 1 {
t.Errorf("POST/201 count = %d, want 1", c2)
}
}
func TestPromStoreServeHTTP(t *testing.T) {
ps := NewPromStore()
ps.Ingest(LogRecord{
Website: "example.com", Method: "GET", Status: "200",
BodyBytesSent: 100, RequestTime: 0.042,
})
req := httptest.NewRequest("GET", "/metrics", nil)
rec := httptest.NewRecorder()
ps.ServeHTTP(rec, req)
body := rec.Body.String()
checks := []string{
"# TYPE nginx_http_requests_total counter",
`nginx_http_requests_total{host="example.com",method="GET",status="200"} 1`,
"# TYPE nginx_http_response_body_bytes histogram",
`nginx_http_response_body_bytes_bucket{host="example.com",le="256"} 1`, // 100 ≤ 256
`nginx_http_response_body_bytes_count{host="example.com"} 1`,
`nginx_http_response_body_bytes_sum{host="example.com"} 100`,
"# TYPE nginx_http_request_duration_seconds histogram",
`nginx_http_request_duration_seconds_bucket{host="example.com",le="0.05"} 1`, // 0.042 ≤ 0.05
`nginx_http_request_duration_seconds_count{host="example.com"} 1`,
}
for _, want := range checks {
if !strings.Contains(body, want) {
t.Errorf("missing %q in output:\n%s", want, body)
}
}
}
func TestPromStoreCounterCap(t *testing.T) {
ps := NewPromStore()
// Fill to cap with distinct {host,method,status} combos
for i := 0; i < promCounterCap+10; i++ {
host := strings.Repeat("x", i%10+1) + ".com"
status := "200"
if i%3 == 0 {
status = "404"
}
ps.Ingest(LogRecord{Website: host, Method: "GET", Status: status})
}
ps.mu.Lock()
n := len(ps.counters)
ps.mu.Unlock()
if n > promCounterCap {
t.Errorf("counter map size %d exceeds cap %d", n, promCounterCap)
}
}

View File

@@ -5,6 +5,7 @@ import (
"log" "log"
"time" "time"
st "git.ipng.ch/ipng/nginx-logtail/internal/store"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@@ -70,6 +71,33 @@ func peerAddr(ctx context.Context) string {
return "unknown" return "unknown"
} }
func (srv *Server) DumpSnapshots(_ *pb.DumpSnapshotsRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error {
fine, coarse := srv.store.DumpRings()
for _, snap := range fine {
if err := stream.Send(storeSnapshotToProto(snap, srv.source, false)); err != nil {
return err
}
}
for _, snap := range coarse {
if err := stream.Send(storeSnapshotToProto(snap, srv.source, true)); err != nil {
return err
}
}
return nil
}
func storeSnapshotToProto(snap st.Snapshot, source string, isCoarse bool) *pb.Snapshot {
msg := &pb.Snapshot{
Source: source,
Timestamp: snap.Timestamp.Unix(),
IsCoarse: isCoarse,
}
for _, e := range snap.Entries {
msg.Entries = append(msg.Entries, &pb.TopNEntry{Label: e.Label, Count: e.Count})
}
return msg
}
func (srv *Server) StreamSnapshots(req *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error { func (srv *Server) StreamSnapshots(req *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error {
ch := srv.store.Subscribe() ch := srv.store.Subscribe()
defer srv.store.Unsubscribe(ch) defer srv.store.Unsubscribe(ch)

View File

@@ -13,6 +13,7 @@ const liveMapCap = 100_000 // hard cap on live map entries
// Store holds the live map and both ring buffers. // Store holds the live map and both ring buffers.
type Store struct { type Store struct {
source string source string
prom *PromStore // optional; if non-nil, receives every ingested record
// live map — written only by the Run goroutine; no locking needed on writes // live map — written only by the Run goroutine; no locking needed on writes
live map[st.Tuple6]int64 live map[st.Tuple6]int64
@@ -41,9 +42,12 @@ func NewStore(source string) *Store {
} }
} }
// ingest records one log record into the live map. // ingest records one log record into the live map and the Prometheus store (if set).
// Must only be called from the Run goroutine. // Must only be called from the Run goroutine.
func (s *Store) ingest(r LogRecord) { func (s *Store) ingest(r LogRecord) {
if s.prom != nil {
s.prom.Ingest(r)
}
key := st.Tuple6{Website: r.Website, Prefix: r.ClientPrefix, URI: r.URI, Status: r.Status, IsTor: r.IsTor, ASN: r.ASN} key := st.Tuple6{Website: r.Website, Prefix: r.ClientPrefix, URI: r.URI, Status: r.Status, IsTor: r.IsTor, ASN: r.ASN}
if _, exists := s.live[key]; !exists { if _, exists := s.live[key]; !exists {
if s.liveLen >= liveMapCap { if s.liveLen >= liveMapCap {
@@ -150,6 +154,32 @@ func (s *Store) coarseView() st.RingView {
return st.RingView{Ring: ring, Head: s.coarseHead, Size: st.CoarseRingSize} return st.RingView{Ring: ring, Head: s.coarseHead, Size: st.CoarseRingSize}
} }
// DumpRings returns copies of all non-empty fine and coarse ring snapshots in
// chronological order. The lock is held only for the duration of the copy.
func (s *Store) DumpRings() (fine, coarse []st.Snapshot) {
s.mu.RLock()
fineRing := s.fineRing
fineHead := s.fineHead
fineFilled := s.fineFilled
coarseRing := s.coarseRing
coarseHead := s.coarseHead
coarseFilled := s.coarseFilled
s.mu.RUnlock()
fine = make([]st.Snapshot, 0, fineFilled)
for i := 0; i < fineFilled; i++ {
idx := (fineHead - fineFilled + i + st.FineRingSize) % st.FineRingSize
fine = append(fine, fineRing[idx])
}
coarse = make([]st.Snapshot, 0, coarseFilled)
for i := 0; i < coarseFilled; i++ {
idx := (coarseHead - coarseFilled + i + st.CoarseRingSize) % st.CoarseRingSize
coarse = append(coarse, coarseRing[idx])
}
return fine, coarse
}
func (s *Store) Subscribe() chan st.Snapshot { func (s *Store) Subscribe() chan st.Snapshot {
ch := make(chan st.Snapshot, 4) ch := make(chan st.Snapshot, 4)
s.subMu.Lock() s.subMu.Lock()

View File

@@ -19,9 +19,11 @@ var andRe = regexp.MustCompile(`(?i)\s+and\s+`)
// //
// status=200 status!=200 status>=400 status>400 status<=500 status<500 // status=200 status!=200 status>=400 status>400 status<=500 status<500
// website=example.com — exact match // website=example.com — exact match
// website~=gouda.* — RE2 regex // website~=gouda.* — RE2 regex match
// website!~=gouda.* — RE2 regex exclusion
// uri=/api/v1/ — exact match // uri=/api/v1/ — exact match
// uri~=^/api/.* — RE2 regex // uri~=^/api/.* — RE2 regex match
// uri!~=^/ct/.* — RE2 regex exclusion
// prefix=1.2.3.0/24 — exact match // prefix=1.2.3.0/24 — exact match
// //
// Values may be enclosed in double or single quotes. // Values may be enclosed in double or single quotes.
@@ -57,6 +59,8 @@ func applyTerm(term string, fs *filterState) error {
var op, value string var op, value string
switch { switch {
case strings.HasPrefix(rest, "!~="):
op, value = "!~=", rest[3:]
case strings.HasPrefix(rest, "~="): case strings.HasPrefix(rest, "~="):
op, value = "~=", rest[2:] op, value = "~=", rest[2:]
case strings.HasPrefix(rest, "!="): case strings.HasPrefix(rest, "!="):
@@ -96,8 +100,10 @@ func applyTerm(term string, fs *filterState) error {
fs.Website = value fs.Website = value
case "~=": case "~=":
fs.WebsiteRe = value fs.WebsiteRe = value
case "!~=":
fs.WebsiteReNeg = value
default: default:
return fmt.Errorf("website only supports = and ~=, not %q", op) return fmt.Errorf("website only supports =, ~=, and !~=, not %q", op)
} }
case "uri": case "uri":
switch op { switch op {
@@ -105,8 +111,10 @@ func applyTerm(term string, fs *filterState) error {
fs.URI = value fs.URI = value
case "~=": case "~=":
fs.URIRe = value fs.URIRe = value
case "!~=":
fs.URIReNeg = value
default: default:
return fmt.Errorf("uri only supports = and ~=, not %q", op) return fmt.Errorf("uri only supports =, ~=, and !~=, not %q", op)
} }
case "prefix": case "prefix":
if op != "=" { if op != "=" {
@@ -164,6 +172,9 @@ func FilterExprString(f filterState) string {
if f.WebsiteRe != "" { if f.WebsiteRe != "" {
parts = append(parts, "website~="+quoteMaybe(f.WebsiteRe)) parts = append(parts, "website~="+quoteMaybe(f.WebsiteRe))
} }
if f.WebsiteReNeg != "" {
parts = append(parts, "website!~="+quoteMaybe(f.WebsiteReNeg))
}
if f.Prefix != "" { if f.Prefix != "" {
parts = append(parts, "prefix="+quoteMaybe(f.Prefix)) parts = append(parts, "prefix="+quoteMaybe(f.Prefix))
} }
@@ -173,6 +184,9 @@ func FilterExprString(f filterState) string {
if f.URIRe != "" { if f.URIRe != "" {
parts = append(parts, "uri~="+quoteMaybe(f.URIRe)) parts = append(parts, "uri~="+quoteMaybe(f.URIRe))
} }
if f.URIReNeg != "" {
parts = append(parts, "uri!~="+quoteMaybe(f.URIReNeg))
}
if f.Status != "" { if f.Status != "" {
parts = append(parts, statusTermStr(f.Status)) parts = append(parts, statusTermStr(f.Status))
} }

View File

@@ -13,6 +13,7 @@ import (
st "git.ipng.ch/ipng/nginx-logtail/internal/store" st "git.ipng.ch/ipng/nginx-logtail/internal/store"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc"
) )
// Handler is the HTTP handler for the frontend. // Handler is the HTTP handler for the frontend.
@@ -47,14 +48,16 @@ type TableRow struct {
// filterState holds the filter fields parsed from URL params. // filterState holds the filter fields parsed from URL params.
type filterState struct { type filterState struct {
Website string Website string
Prefix string Prefix string
URI string URI string
Status string // expression: "200", "!=200", ">=400", etc. Status string // expression: "200", "!=200", ">=400", etc.
WebsiteRe string // RE2 regex against website WebsiteRe string // RE2 regex against website
URIRe string // RE2 regex against request URI URIRe string // RE2 regex against request URI
IsTor string // "", "1" (TOR only), "0" (non-TOR only) WebsiteReNeg string // RE2 regex exclusion against website
ASN string // expression: "12345", "!=65000", ">=1000", etc. URIReNeg string // RE2 regex exclusion against request URI
IsTor string // "", "1" (TOR only), "0" (non-TOR only)
ASN string // expression: "12345", "!=65000", ">=1000", etc.
} }
// QueryParams holds all parsed URL parameters for one page request. // QueryParams holds all parsed URL parameters for one page request.
@@ -155,20 +158,22 @@ func (h *Handler) parseParams(r *http.Request) QueryParams {
GroupByS: grpS, GroupByS: grpS,
N: n, N: n,
Filter: filterState{ Filter: filterState{
Website: q.Get("f_website"), Website: q.Get("f_website"),
Prefix: q.Get("f_prefix"), Prefix: q.Get("f_prefix"),
URI: q.Get("f_uri"), URI: q.Get("f_uri"),
Status: q.Get("f_status"), Status: q.Get("f_status"),
WebsiteRe: q.Get("f_website_re"), WebsiteRe: q.Get("f_website_re"),
URIRe: q.Get("f_uri_re"), URIRe: q.Get("f_uri_re"),
IsTor: q.Get("f_is_tor"), WebsiteReNeg: q.Get("f_website_re_neg"),
ASN: q.Get("f_asn"), URIReNeg: q.Get("f_uri_re_neg"),
IsTor: q.Get("f_is_tor"),
ASN: q.Get("f_asn"),
}, },
} }
} }
func buildFilter(f filterState) *pb.Filter { func buildFilter(f filterState) *pb.Filter {
if f.Website == "" && f.Prefix == "" && f.URI == "" && f.Status == "" && f.WebsiteRe == "" && f.URIRe == "" && f.IsTor == "" && f.ASN == "" { if f.Website == "" && f.Prefix == "" && f.URI == "" && f.Status == "" && f.WebsiteRe == "" && f.URIRe == "" && f.WebsiteReNeg == "" && f.URIReNeg == "" && f.IsTor == "" && f.ASN == "" {
return nil return nil
} }
out := &pb.Filter{} out := &pb.Filter{}
@@ -193,6 +198,12 @@ func buildFilter(f filterState) *pb.Filter {
if f.URIRe != "" { if f.URIRe != "" {
out.UriRegex = &f.URIRe out.UriRegex = &f.URIRe
} }
if f.WebsiteReNeg != "" {
out.WebsiteRegexExclude = &f.WebsiteReNeg
}
if f.URIReNeg != "" {
out.UriRegexExclude = &f.URIReNeg
}
switch f.IsTor { switch f.IsTor {
case "1": case "1":
out.Tor = pb.TorFilter_TOR_YES out.Tor = pb.TorFilter_TOR_YES
@@ -233,6 +244,12 @@ func (p QueryParams) toValues() url.Values {
if p.Filter.URIRe != "" { if p.Filter.URIRe != "" {
v.Set("f_uri_re", p.Filter.URIRe) v.Set("f_uri_re", p.Filter.URIRe)
} }
if p.Filter.WebsiteReNeg != "" {
v.Set("f_website_re_neg", p.Filter.WebsiteReNeg)
}
if p.Filter.URIReNeg != "" {
v.Set("f_uri_re_neg", p.Filter.URIReNeg)
}
if p.Filter.IsTor != "" { if p.Filter.IsTor != "" {
v.Set("f_is_tor", p.Filter.IsTor) v.Set("f_is_tor", p.Filter.IsTor)
} }
@@ -260,7 +277,8 @@ func (p QueryParams) buildURL(overrides map[string]string) string {
func (p QueryParams) clearFilterURL() string { func (p QueryParams) clearFilterURL() string {
return p.buildURL(map[string]string{ return p.buildURL(map[string]string{
"f_website": "", "f_prefix": "", "f_uri": "", "f_status": "", "f_website": "", "f_prefix": "", "f_uri": "", "f_status": "",
"f_website_re": "", "f_uri_re": "", "f_is_tor": "", "f_asn": "", "f_website_re": "", "f_uri_re": "", "f_website_re_neg": "", "f_uri_re_neg": "",
"f_is_tor": "", "f_asn": "",
}) })
} }
@@ -343,6 +361,18 @@ func buildCrumbs(p QueryParams) []Crumb {
RemoveURL: p.buildURL(map[string]string{"f_uri_re": ""}), RemoveURL: p.buildURL(map[string]string{"f_uri_re": ""}),
}) })
} }
if p.Filter.WebsiteReNeg != "" {
crumbs = append(crumbs, Crumb{
Text: "website!~=" + p.Filter.WebsiteReNeg,
RemoveURL: p.buildURL(map[string]string{"f_website_re_neg": ""}),
})
}
if p.Filter.URIReNeg != "" {
crumbs = append(crumbs, Crumb{
Text: "uri!~=" + p.Filter.URIReNeg,
RemoveURL: p.buildURL(map[string]string{"f_uri_re_neg": ""}),
})
}
switch p.Filter.IsTor { switch p.Filter.IsTor {
case "1": case "1":
crumbs = append(crumbs, Crumb{ crumbs = append(crumbs, Crumb{
@@ -526,7 +556,21 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
trendCh <- trendResult{resp, err} trendCh <- trendResult{resp, err}
}() }()
go func() { go func() {
resp, err := client.ListTargets(ctx, &pb.ListTargetsRequest{}) // Always query the default target for ListTargets so we get the full
// list of available sources even when viewing a specific collector.
ltClient := client
var ltConn *grpc.ClientConn
if params.Target != h.defaultTarget {
c, cl, err := dial(h.defaultTarget)
if err == nil {
ltConn = c
ltClient = cl
}
}
resp, err := ltClient.ListTargets(ctx, &pb.ListTargetsRequest{})
if ltConn != nil {
ltConn.Close()
}
if err != nil { if err != nil {
ltCh <- nil ltCh <- nil
} else { } else {

View File

@@ -9,6 +9,7 @@ import (
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"strconv"
"syscall" "syscall"
) )
@@ -16,10 +17,10 @@ import (
var templatesFS embed.FS var templatesFS embed.FS
func main() { func main() {
listen := flag.String("listen", ":8080", "HTTP listen address") listen := flag.String("listen", envOr("FRONTEND_LISTEN", ":8080"), "HTTP listen address (env: FRONTEND_LISTEN)")
target := flag.String("target", "localhost:9091", "default gRPC endpoint (aggregator or collector)") target := flag.String("target", envOr("FRONTEND_TARGET", "localhost:9091"), "default gRPC endpoint, aggregator or collector (env: FRONTEND_TARGET)")
n := flag.Int("n", 25, "default number of table rows") n := flag.Int("n", envOrInt("FRONTEND_N", 25), "default number of table rows (env: FRONTEND_N)")
refresh := flag.Int("refresh", 30, "meta-refresh interval in seconds (0 = disabled)") refresh := flag.Int("refresh", envOrInt("FRONTEND_REFRESH", 30), "meta-refresh interval in seconds, 0 to disable (env: FRONTEND_REFRESH)")
flag.Parse() flag.Parse()
funcMap := template.FuncMap{"fmtCount": fmtCount} funcMap := template.FuncMap{"fmtCount": fmtCount}
@@ -51,3 +52,20 @@ func main() {
log.Printf("frontend: shutting down") log.Printf("frontend: shutting down")
srv.Shutdown(context.Background()) srv.Shutdown(context.Background())
} }
func envOr(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
func envOrInt(key string, def int) int {
if v := os.Getenv(key); v != "" {
if n, err := strconv.Atoi(v); err == nil {
return n
}
log.Printf("frontend: invalid int for %s=%q, using default %d", key, v, def)
}
return def
}

View File

@@ -32,7 +32,7 @@
<input type="hidden" name="w" value="{{.Params.WindowS}}"> <input type="hidden" name="w" value="{{.Params.WindowS}}">
<input type="hidden" name="by" value="{{.Params.GroupByS}}"> <input type="hidden" name="by" value="{{.Params.GroupByS}}">
<input type="hidden" name="n" value="{{.Params.N}}"> <input type="hidden" name="n" value="{{.Params.N}}">
<input class="filter-input" type="text" name="q" value="{{.FilterExpr}}" placeholder="status>=400 AND website~=gouda.* AND uri~=^/ct/v1/ AND is_tor=0 AND asn=8298"> <input class="filter-input" type="text" name="q" value="{{.FilterExpr}}" placeholder="status>=400 AND website!~=^gouda.* AND uri~=^/ct/v1/ AND is_tor=0 AND asn=8298">
<button type="submit">filter</button> <button type="submit">filter</button>
{{- if .FilterExpr}} <a class="clear" href="{{.ClearFilterURL}}">× clear</a>{{end}} {{- if .FilterExpr}} <a class="clear" href="{{.ClearFilterURL}}">× clear</a>{{end}}
</form> </form>

1
dashboards/README.md Normal file
View File

@@ -0,0 +1 @@
This nginx-logtail dashboard is just to get you started. It was autogenerated by Claude.

File diff suppressed because it is too large Load Diff

26
docker-compose.yml Normal file
View File

@@ -0,0 +1,26 @@
services:
aggregator:
build: .
image: git.ipng.ch/ipng/nginx-logtail
command: ["/usr/local/bin/aggregator"]
restart: unless-stopped
environment:
AGGREGATOR_LISTEN: ":9091"
AGGREGATOR_COLLECTORS: "" # e.g. "collector1:9090,collector2:9090"
AGGREGATOR_SOURCE: "" # defaults to container hostname
ports:
- "9091:9091"
frontend:
image: git.ipng.ch/ipng/nginx-logtail
command: ["/usr/local/bin/frontend"]
restart: unless-stopped
environment:
FRONTEND_LISTEN: ":8080"
FRONTEND_TARGET: "aggregator:9091"
FRONTEND_N: "25"
FRONTEND_REFRESH: "30"
ports:
- "8080:8080"
depends_on:
- aggregator

528
docs/DETAILS.md Normal file
View File

@@ -0,0 +1,528 @@
PREAMBLE
Although this computer program has a permissive license (AP2.0), if you came here looking to ask
questions, you're better off just moving on :) This program is shared AS-IS and really without any
intent for anybody but IPng Networks to use it. Also, in case the structure of the repo and the
style of this README wasn't already clear, this program is 100% written and maintained by Claude
Code.
You have been warned :)
SPECIFICATION
This project contains four programs:
1) A **collector** that tails any number of nginx log files and maintains an in-memory structure of
`{website, client_prefix, http_request_uri, http_response, is_tor, asn}` counts across all files.
It answers TopN and Trend queries via gRPC and pushes minute snapshots to the aggregator via
server-streaming. It also exposes a Prometheus `/metrics` endpoint (default `:9100`) with per-host
request counters and response-body/request-time histograms.
Runs on each nginx machine in the cluster. No UI — gRPC and HTTP interfaces only.
2) An **aggregator** that subscribes to the snapshot stream from all collectors, merges their data
into a unified in-memory cache, and exposes the same gRPC interface. Answers questions like "what
is the busiest website globally", "which client prefix is causing the most HTTP 503s", and shows
trending information useful for DDoS detection. Runs on a central machine.
3) An **HTTP frontend** companion to the aggregator that renders a drilldown dashboard. Operators
can restrict by `http_response=429`, then by `website=www.example.com`, and so on. Works with
either a collector or aggregator as its backend. Zero JavaScript — server-rendered HTML with inline
SVG sparklines and meta-refresh.
4) A **CLI** for shell-based debugging. Sends `topn`, `trend`, and `stream` queries to any
collector or aggregator, fans out to multiple targets in parallel, and outputs human-readable
tables or newline-delimited JSON.
Programs are written in Go. No CGO, no external runtime dependencies.
---
![nginx-logtail frontend](docs/frontend.png)
---
DEPLOYMENT
## Docker
All four binaries are published in a single image: `git.ipng.ch/ipng/nginx-logtail`.
The image is built with a two-stage Dockerfile: a `golang:1.24-alpine` builder produces
statically-linked, stripped binaries (`CGO_ENABLED=0`, `-trimpath -ldflags="-s -w"`); the final
stage is `scratch` — no OS, no shell, no runtime dependencies. Each binary is invoked explicitly
via the container `command`.
### Build and push
```
docker compose build --push
```
### Running aggregator + frontend
The `docker-compose.yml` in the repo root runs the aggregator and frontend together. At minimum,
set `AGGREGATOR_COLLECTORS` to the comma-separated `host:port` list of your collector(s):
```sh
AGGREGATOR_COLLECTORS=nginx1:9090,nginx2:9090 docker compose up -d
```
The frontend reaches the aggregator at `aggregator:9091` via Docker's internal DNS. The frontend
UI is available on port `8080`.
### Environment variables
All flags have environment variable equivalents. CLI flags take precedence over env vars.
**collector** (runs on each nginx host, not in Docker):
| Env var | Flag | Default |
|--------------------------|-------------------|-------------|
| `COLLECTOR_LISTEN` | `-listen` | `:9090` |
| `COLLECTOR_PROM_LISTEN` | `-prom-listen` | `:9100` |
| `COLLECTOR_LOGS` | `-logs` | — |
| `COLLECTOR_LOGS_FILE` | `-logs-file` | — |
| `COLLECTOR_SOURCE` | `-source` | hostname |
| `COLLECTOR_V4PREFIX` | `-v4prefix` | `24` |
| `COLLECTOR_V6PREFIX` | `-v6prefix` | `48` |
| `COLLECTOR_SCAN_INTERVAL`| `-scan-interval` | `10s` |
**aggregator**:
| Env var | Flag | Default |
|--------------------------|---------------|-------------|
| `AGGREGATOR_LISTEN` | `-listen` | `:9091` |
| `AGGREGATOR_COLLECTORS` | `-collectors` | — (required)|
| `AGGREGATOR_SOURCE` | `-source` | hostname |
**frontend**:
| Env var | Flag | Default |
|------------------|------------|-------------------|
| `FRONTEND_LISTEN`| `-listen` | `:8080` |
| `FRONTEND_TARGET`| `-target` | `localhost:9091` |
| `FRONTEND_N` | `-n` | `25` |
| `FRONTEND_REFRESH`| `-refresh`| `30` |
---
DESIGN
## Directory Layout
```
nginx-logtail/
├── proto/
│ ├── logtail.proto # shared protobuf definitions
│ └── logtailpb/
│ ├── logtail.pb.go # generated: messages, enums
│ └── logtail_grpc.pb.go # generated: service stubs
├── internal/
│ └── store/
│ └── store.go # shared types: Tuple6, Entry, Snapshot, ring helpers
└── cmd/
├── collector/
│ ├── main.go
│ ├── tailer.go # MultiTailer: tail N files via one shared fsnotify watcher
│ ├── parser.go # tab-separated logtail log_format parser (~50 ns/line)
│ ├── store.go # bounded top-K in-memory store + tiered ring buffers
│ └── server.go # gRPC server: TopN, Trend, StreamSnapshots
├── aggregator/
│ ├── main.go
│ ├── subscriber.go # one goroutine per collector; StreamSnapshots with backoff
│ ├── merger.go # delta-merge: O(snapshot_size) per update
│ ├── cache.go # tick-based ring buffer cache served to clients
│ ├── registry.go # TargetRegistry: addr→name map updated from snapshot sources
│ └── server.go # gRPC server (same surface as collector)
├── frontend/
│ ├── main.go
│ ├── handler.go # URL param parsing, concurrent TopN+Trend, template exec
│ ├── filter.go # ParseFilterExpr / FilterExprString mini filter language
│ ├── client.go # gRPC dial helper
│ ├── sparkline.go # TrendPoints → inline SVG polyline
│ ├── format.go # fmtCount (space thousands separator)
│ └── templates/
│ ├── base.html # outer HTML shell, inline CSS, meta-refresh
│ └── index.html # window tabs, group-by tabs, breadcrumb, table, footer
└── cli/
├── main.go # subcommand dispatch and usage
├── flags.go # shared flags, parseTargets, buildFilter, parseWindow
├── client.go # gRPC dial helper
├── format.go # printTable, fmtCount, fmtTime, targetHeader
├── cmd_topn.go # topn: concurrent fan-out, table + JSON output
├── cmd_trend.go # trend: concurrent fan-out, table + JSON output
├── cmd_stream.go # stream: multiplexed streams, auto-reconnect
└── cmd_targets.go # targets: list collectors known to the endpoint
```
## Data Model
The core unit is a **count keyed by six dimensions**:
| Field | Description | Example |
|-------------------|------------------------------------------------------|-------------------|
| `website` | nginx `$host` | `www.example.com` |
| `client_prefix` | client IP truncated to /24 IPv4 or /48 IPv6 | `1.2.3.0/24` |
| `http_request_uri`| `$request_uri` path only — query string stripped | `/api/v1/search` |
| `http_response` | HTTP status code | `429` |
| `is_tor` | whether the client IP is a TOR exit node | `1` |
| `asn` | client AS number (MaxMind GeoIP2, 32-bit int) | `8298` |
## Time Windows & Tiered Ring Buffers
Two ring buffers at different resolutions cover all query windows up to 24 hours:
| Tier | Bucket size | Buckets | Top-K/bucket | Covers | Roll-up trigger |
|--------|-------------|---------|--------------|--------|---------------------|
| Fine | 1 min | 60 | 50 000 | 1 h | every minute |
| Coarse | 5 min | 288 | 5 000 | 24 h | every 5 fine ticks |
Supported query windows and which tier they read from:
| Window | Tier | Buckets summed |
|--------|--------|----------------|
| 1 min | fine | last 1 |
| 5 min | fine | last 5 |
| 15 min | fine | last 15 |
| 60 min | fine | all 60 |
| 6 h | coarse | last 72 |
| 24 h | coarse | all 288 |
Every minute: snapshot live map → top-50K → append to fine ring, reset live map.
Every 5 minutes: merge last 5 fine snapshots → top-5K → append to coarse ring.
## Memory Budget (Collector, target ≤ 1 GB)
Entry size: ~30 B website + ~15 B prefix + ~50 B URI + 3 B status + 1 B is_tor + 4 B asn + 8 B count + ~80 B Go map
overhead ≈ **~191 bytes per entry**.
| Structure | Entries | Size |
|-------------------------|-------------|-------------|
| Live map (capped) | 100 000 | ~19 MB |
| Fine ring (60 × 1-min) | 60 × 50 000 | ~558 MB |
| Coarse ring (288 × 5-min)| 288 × 5 000| ~268 MB |
| **Total** | | **~845 MB** |
The live map is **hard-capped at 100 K entries**. Once full, only updates to existing keys are
accepted; new keys are dropped until the next rotation resets the map. This keeps memory bounded
regardless of attack cardinality.
## Future Work — ClickHouse Export (post-MVP)
> **Do not implement until the end-to-end MVP is running.**
The aggregator will optionally write 1-minute pre-aggregated rows to ClickHouse for 7d/30d
historical views. Schema sketch:
```sql
CREATE TABLE logtail (
ts DateTime,
website LowCardinality(String),
client_prefix String,
request_uri LowCardinality(String),
status UInt16,
count UInt64
) ENGINE = SummingMergeTree(count)
PARTITION BY toYYYYMMDD(ts)
ORDER BY (ts, website, status, client_prefix, request_uri);
```
The frontend routes `window=7d|30d` queries to ClickHouse; all shorter windows continue to use
the in-memory cache. Kafka is not needed — the aggregator writes directly. This is purely additive
and does not change any existing interface.
## Protobuf API (`proto/logtail.proto`)
```protobuf
enum TorFilter { TOR_ANY = 0; TOR_YES = 1; TOR_NO = 2; }
enum StatusOp { EQ = 0; NE = 1; GT = 2; GE = 3; LT = 4; LE = 5; }
message Filter {
optional string website = 1;
optional string client_prefix = 2;
optional string http_request_uri = 3;
optional int32 http_response = 4;
StatusOp status_op = 5; // comparison operator for http_response
optional string website_regex = 6; // RE2 regex against website
optional string uri_regex = 7; // RE2 regex against http_request_uri
TorFilter tor = 8; // TOR_ANY (default) / TOR_YES / TOR_NO
optional int32 asn_number = 9; // filter by client ASN
StatusOp asn_op = 10; // comparison operator for asn_number
}
enum GroupBy { WEBSITE = 0; CLIENT_PREFIX = 1; REQUEST_URI = 2; HTTP_RESPONSE = 3; ASN_NUMBER = 4; }
enum Window { W1M = 0; W5M = 1; W15M = 2; W60M = 3; W6H = 4; W24H = 5; }
message TopNRequest { Filter filter = 1; GroupBy group_by = 2; int32 n = 3; Window window = 4; }
message TopNEntry { string label = 1; int64 count = 2; }
message TopNResponse { repeated TopNEntry entries = 1; string source = 2; }
// Trend: one total count per minute (or 5-min) bucket, for sparklines
message TrendRequest { Filter filter = 1; Window window = 4; }
message TrendPoint { int64 timestamp_unix = 1; int64 count = 2; }
message TrendResponse { repeated TrendPoint points = 1; string source = 2; }
// Streaming: collector pushes a fine snapshot after every minute rotation
message SnapshotRequest {}
message Snapshot {
string source = 1;
int64 timestamp = 2;
repeated TopNEntry entries = 3; // full top-50K for this bucket
bool is_coarse = 4; // true for 5-min coarse buckets (DumpSnapshots only)
}
// Target discovery: list the collectors behind the queried endpoint
message ListTargetsRequest {}
message TargetInfo {
string name = 1; // display name (--source value from the collector)
string addr = 2; // gRPC address; empty string means "this endpoint itself"
}
message ListTargetsResponse { repeated TargetInfo targets = 1; }
// Backfill: dump full ring buffer contents for aggregator restart recovery
message DumpSnapshotsRequest {}
// Response reuses Snapshot; is_coarse distinguishes fine (1-min) from coarse (5-min) buckets.
// Stream closes after all historical data is sent (unlike StreamSnapshots which stays open).
service LogtailService {
rpc TopN(TopNRequest) returns (TopNResponse);
rpc Trend(TrendRequest) returns (TrendResponse);
rpc StreamSnapshots(SnapshotRequest) returns (stream Snapshot);
rpc ListTargets(ListTargetsRequest) returns (ListTargetsResponse);
rpc DumpSnapshots(DumpSnapshotsRequest) returns (stream Snapshot);
}
// Both collector and aggregator implement LogtailService.
// The aggregator's StreamSnapshots re-streams the merged view.
// ListTargets: aggregator returns all configured collectors; collector returns itself.
// DumpSnapshots: collector only; aggregator calls this on startup to backfill its ring.
```
## Program 1 — Collector
### tailer.go
- **`MultiTailer`**: one shared `fsnotify.Watcher` for all files regardless of count — avoids
the inotify instance limit when tailing hundreds of files.
- On `WRITE` event: read all new lines from that file's `bufio.Reader`.
- On `RENAME`/`REMOVE` (logrotate): drain old fd to EOF, close, start retry-open goroutine with
exponential backoff. Sends the new `*os.File` back via a channel to keep map access single-threaded.
- Emits `LogRecord` structs on a shared buffered channel (capacity 200 K — absorbs ~20 s of peak).
- Accepts paths via `--logs` (comma-separated or glob) and `--logs-file` (one path/glob per line).
### parser.go
- Parses the fixed **logtail** nginx log format — tab-separated, fixed field order, no quoting:
```nginx
log_format logtail '$host\t$remote_addr\t$msec\t$request_method\t$request_uri\t$status\t$body_bytes_sent\t$request_time\t$is_tor\t$asn';
```
| # | Field | Used for |
|---|-------------------|------------------|
| 0 | `$host` | website |
| 1 | `$remote_addr` | client_prefix |
| 2 | `$msec` | (discarded) |
| 3 | `$request_method` | (discarded) |
| 4 | `$request_uri` | http_request_uri |
| 5 | `$status` | http_response |
| 6 | `$body_bytes_sent`| (discarded) |
| 7 | `$request_time` | (discarded) |
| 8 | `$is_tor` | is_tor |
| 9 | `$asn` | asn |
- `strings.SplitN(line, "\t", 10)` — ~50 ns/line. No regex.
- `$request_uri`: query string discarded at first `?`.
- `$remote_addr`: truncated to /24 (IPv4) or /48 (IPv6); prefix lengths configurable via flags.
- `$is_tor`: `1` if the client IP is a TOR exit node, `0` otherwise. Field is optional — lines
with exactly 8 fields (old format) are accepted and default to `is_tor=false`.
- `$asn`: client AS number as a decimal integer (from MaxMind GeoIP2). Field is optional —
lines without it default to `asn=0`.
- Lines with fewer than 8 fields are silently skipped.
### store.go
- **Single aggregator goroutine** reads from the channel and updates the live map — no locking on
the hot path. At 10 K lines/s the goroutine uses <1% CPU.
- Live map: `map[Tuple6]int64`, hard-capped at 100 K entries (new keys dropped when full).
- **Minute ticker**: heap-selects top-50K entries, writes snapshot to fine ring, resets live map.
- Every 5 fine ticks: merge last 5 fine snapshots → top-5K → write to coarse ring.
- **TopN query**: RLock ring, sum bucket range, apply filter, group by dimension, heap-select top N.
- **Trend query**: per-bucket filtered sum, returns one `TrendPoint` per bucket.
- **Subscriber fan-out**: per-subscriber buffered channel; `Subscribe`/`Unsubscribe` for streaming.
- **`DumpRings()`**: acquires `RLock`, copies both ring arrays and their head/filled pointers
(just slice headers — microseconds), releases lock, then returns chronologically-ordered fine
and coarse snapshot slices. The lock is never held during serialisation or network I/O.
### server.go
- gRPC server on configurable port (default `:9090`).
- `TopN` and `Trend`: unary, answered from the ring buffer under RLock.
- `StreamSnapshots`: registers a subscriber channel; loops `Recv` on it; 30 s keepalive ticker.
- `DumpSnapshots`: calls `DumpRings()`, streams all fine buckets (`is_coarse=false`) then all
coarse buckets (`is_coarse=true`), then closes the stream. No lock held during streaming.
## Program 2 — Aggregator
### subscriber.go
- One goroutine per collector. Dials, calls `StreamSnapshots`, forwards each `Snapshot` to the
merger.
- Reconnects with exponential backoff (100 ms → doubles → cap 30 s).
- After 3 consecutive failures: calls `merger.Zero(addr)` to remove that collector's contribution
from the merged view (prevents stale counts accumulating during outages).
- Resets failure count on first successful `Recv`; logs recovery.
### merger.go
- **Delta strategy**: on each new snapshot from collector X, subtract X's previous entries from
`merged`, add the new entries, store new map. O(snapshot_size) per update — not
O(N_collectors × snapshot_size).
- `Zero(addr)`: subtracts the collector's last-known contribution and deletes its entry — called
when a collector is marked degraded.
### cache.go
- **Tick-based rotation** (1-min ticker, not snapshot-triggered): keeps the aggregator ring aligned
to the same 1-minute cadence as collectors regardless of how many collectors are connected.
- Same tiered ring structure as the collector store; populated from `merger.TopK()` each tick.
- `QueryTopN`, `QueryTrend`, `Subscribe`/`Unsubscribe` — identical interface to collector store.
- **`LoadHistorical(fine, coarse []Snapshot)`**: writes pre-merged backfill snapshots directly into
the ring arrays under `mu.Lock()`, sets head and filled counters, then returns. Safe to call
concurrently with queries. The live ticker continues from the updated head after this returns.
### backfill.go
- **`Backfill(ctx, collectorAddrs, cache)`**: called once at aggregator startup (in a goroutine,
after the gRPC server is already listening so the frontend is never blocked).
- Dials all collectors concurrently and calls `DumpSnapshots` on each.
- Accumulates entries per timestamp in `map[unix-second]map[label]count`; multiple collectors'
contributions for the same bucket are summed — the same delta-merge semantics as the live path.
- Sorts timestamps chronologically, runs `TopKFromMap` per bucket, caps to ring size.
- Calls `cache.LoadHistorical` once with the merged results.
- **Graceful degradation**: if a collector returns `Unimplemented` (old binary without
`DumpSnapshots`), logs an informational message and skips it — live streaming still starts
normally. Any other error is logged with timing and also skipped. Partial backfill (some
collectors succeed, some fail) is supported.
- Logs per-collector stats: bucket counts, total entry counts, and wall-clock duration.
### registry.go
- **`TargetRegistry`**: `sync.RWMutex`-protected `map[addr → name]`. Initialised with the
configured collector addresses; display names are updated from the `source` field of the first
snapshot received from each collector.
- `Targets()` returns a stable sorted slice of `{name, addr}` pairs for `ListTargets` responses.
### server.go
- Implements `LogtailService` backed by the cache (not live fan-out).
- `StreamSnapshots` re-streams merged fine snapshots; usable by a second-tier aggregator or
monitoring system.
- `ListTargets` returns the current `TargetRegistry` contents — all configured collectors with
their display names and gRPC addresses.
## Program 3 — Frontend
### handler.go
- All filter state in the **URL query string**: `w` (window), `by` (group_by), `f_website`,
`f_prefix`, `f_uri`, `f_status`, `f_website_re`, `f_uri_re`, `f_is_tor`, `f_asn`, `n`, `target`. No
server-side session — URLs are shareable and bookmarkable; multiple operators see independent views.
- **Filter expression box**: a `q=` parameter carries a mini filter language
(`status>=400 AND website~=gouda.* AND uri~=^/api/`). On submission the handler parses it
via `ParseFilterExpr` and redirects to the canonical URL with individual `f_*` params; `q=`
never appears in the final URL. Parse errors re-render the current page with an inline message.
- **Status expressions**: `f_status` accepts `200`, `!=200`, `>=400`, `<500`, etc. — parsed by
`store.ParseStatusExpr` into `(value, StatusOp)` for the filter protobuf.
- **ASN expressions**: `f_asn` accepts the same expression syntax (`12345`, `!=65000`, `>=1000`,
`<64512`, etc.) — also parsed by `store.ParseStatusExpr`, stored as `(asn_number, AsnOp)` in the
filter protobuf.
- **Regex filters**: `f_website_re` and `f_uri_re` hold RE2 patterns; compiled once per request
into `store.CompiledFilter` before the query-loop iteration. Invalid regexes match nothing.
- `TopN`, `Trend`, and `ListTargets` RPCs issued **concurrently** (all with a 5 s deadline); page
renders with whatever completes. Trend failure suppresses the sparkline; `ListTargets` failure
hides the source picker — both are non-fatal.
- **Source picker**: `ListTargets` result drives a `source:` tab row. Clicking a collector tab
sets `target=` to that collector's address, querying it directly. The "all" tab resets to the
default aggregator. Picker is hidden when `ListTargets` returns ≤0 collectors (direct collector
mode).
- **Drilldown**: clicking a table row adds the current dimension's filter and advances `by` through
`website → prefix → uri → status → asn → website` (cycles).
- **`raw=1`**: returns the TopN result as JSON — same URL, no CLI needed for scripting.
- **`target=` override**: per-request gRPC endpoint override for comparing sources.
- Error pages render at HTTP 502 with the window/group-by tabs still functional.
### sparkline.go
- `renderSparkline([]*pb.TrendPoint) template.HTML` — fixed `viewBox="0 0 300 60"` SVG,
Y-scaled to max count, rendered as `<polyline>`. Returns `""` for fewer than 2 points or
all-zero data.
### templates/
- `base.html`: outer shell, inline CSS (~40 lines), conditional `<meta http-equiv="refresh">`.
- `index.html`: window tabs, group-by tabs, filter breadcrumb with `×` remove links, sparkline,
TopN table with `<meter>` bars (% relative to rank-1), footer with source and refresh info.
- No external CSS, no web fonts, no JavaScript. Renders in w3m/lynx.
## Program 4 — CLI
### Subcommands
```
logtail-cli topn [flags] ranked label → count table (exits after one response)
logtail-cli trend [flags] per-bucket time series (exits after one response)
logtail-cli stream [flags] live snapshot feed (runs until Ctrl-C, auto-reconnects)
logtail-cli targets [flags] list targets known to the queried endpoint
```
### Flags
**Shared** (all subcommands):
| Flag | Default | Description |
|---------------|------------------|----------------------------------------------------------|
| `--target` | `localhost:9090` | Comma-separated `host:port` list; fan-out to all |
| `--json` | false | Emit newline-delimited JSON instead of a table |
| `--website` | — | Filter: website |
| `--prefix` | — | Filter: client prefix |
| `--uri` | — | Filter: request URI |
| `--status` | — | Filter: HTTP status expression (`200`, `!=200`, `>=400`, `<500`, …) |
| `--website-re`| — | Filter: RE2 regex against website |
| `--uri-re` | — | Filter: RE2 regex against request URI |
| `--is-tor` | — | Filter: TOR traffic (`1` or `!=0` = TOR only; `0` or `!=1` = non-TOR only) |
| `--asn` | — | Filter: ASN expression (`12345`, `!=65000`, `>=1000`, `<64512`, …) |
**`topn` only**: `--n 10`, `--window 5m`, `--group-by website`
**`trend` only**: `--window 5m`
### Multi-target fan-out
`--target` accepts a comma-separated list. All targets are queried concurrently; results are
printed in order with a per-target header. Single-target output omits the header for clean
pipe-to-`jq` use.
### Output
Default: human-readable table with space-separated thousands (`18 432`).
`--json`: a single JSON array (one object per target) for `topn` and `trend`; NDJSON for `stream` (unbounded).
`stream` reconnects automatically on error (5 s backoff). All other subcommands exit immediately
with a non-zero code on gRPC error.
## Key Design Decisions
| Decision | Rationale |
|----------|-----------|
| Single aggregator goroutine in collector | Eliminates all map lock contention on the 10 K/s hot path |
| Hard cap live map at 100 K entries | Bounds memory regardless of DDoS cardinality explosion |
| Ring buffer of sorted snapshots (not raw maps) | TopN queries avoid re-sorting; merge is a single heap pass |
| Push-based streaming (collector → aggregator) | Aggregator cache always fresh; query latency is cache-read only |
| Delta merge in aggregator | O(snapshot_size) per update, not O(N_collectors × size) |
| Tick-based cache rotation in aggregator | Ring stays on the same 1-min cadence regardless of collector count |
| Degraded collector zeroing | Stale counts from failed collectors don't accumulate in the merged view |
| Same `LogtailService` for collector and aggregator | CLI and frontend work with either; no special-casing |
| `internal/store` shared package | ring-buffer, `Tuple6` encoding, and filter logic shared between collector and aggregator |
| Filter state in URL, not session cookie | Multiple concurrent operators; shareable/bookmarkable URLs |
| Query strings stripped at ingest | Major cardinality reduction; prevents URI explosion under attack |
| No persistent storage | Simplicity; acceptable for ops dashboards (restart = lose history) |
| Trusted internal network, no TLS | Reduces operational complexity; add a TLS proxy if needed later |
| Server-side SVG sparklines, meta-refresh | Zero JS dependencies; works in terminal browsers and curl |
| CLI default: human-readable table | Operator-friendly by default; `--json` opt-in for scripting |
| CLI multi-target fan-out | Compare a collector vs. aggregator, or two collectors, in one command |
| CLI uses stdlib `flag`, no framework | Four subcommands don't justify a dependency |
| Status filter as expression string (`!=200`, `>=400`) | Operator-friendly; parsed once at query boundary, encoded as `(int32, StatusOp)` in proto |
| ASN filter reuses `StatusOp` and `ParseStatusExpr` | Same 6-operator grammar as status; no duplicate enum or parser needed |
| Regex filters compiled once per query (`CompiledFilter`) | Up to 288 × 5 000 per-entry calls — compiling per-entry would dominate query latency |
| Filter expression box (`q=`) redirects to canonical URL | Filter state stays in individual `f_*` params; URLs remain shareable and bookmarkable |
| `ListTargets` + frontend source picker | "Which nginx is busiest?" answered by switching `target=` to a collector; no data model changes, no extra memory |
| Backfill via `DumpSnapshots` on restart | Aggregator recovers full 24h ring from collectors on restart; gRPC server starts first so frontend is never blocked during backfill |
| `DumpRings()` copies under lock, streams without lock | Lock held for microseconds (slice-header copy only); network I/O happens outside the lock so minute rotation is never delayed |
| Backfill merges per-timestamp across collectors | Correct cross-collector sums per bucket, same semantics as live delta-merge; collectors that don't support `DumpSnapshots` are skipped gracefully |

View File

@@ -76,6 +76,7 @@ windows, and exposes a gRPC interface for the aggregator (and directly for the C
| Flag | Default | Description | | Flag | Default | Description |
|-------------------|--------------|-----------------------------------------------------------| |-------------------|--------------|-----------------------------------------------------------|
| `--listen` | `:9090` | gRPC listen address | | `--listen` | `:9090` | gRPC listen address |
| `--prom-listen` | `:9100` | Prometheus metrics address; empty string to disable |
| `--logs` | — | Comma-separated log file paths or glob patterns | | `--logs` | — | Comma-separated log file paths or glob patterns |
| `--logs-file` | — | File containing one log path/glob per line | | `--logs-file` | — | File containing one log path/glob per line |
| `--source` | hostname | Name for this collector in query responses | | `--source` | hostname | Name for this collector in query responses |
@@ -123,6 +124,73 @@ The collector handles logrotate automatically. On `RENAME`/`REMOVE` events it dr
descriptor to EOF (so no lines are lost), then retries opening the original path with backoff until descriptor to EOF (so no lines are lost), then retries opening the original path with backoff until
the new file appears. No restart or SIGHUP required. the new file appears. No restart or SIGHUP required.
### Prometheus metrics
The collector exposes a Prometheus-compatible `/metrics` endpoint on `--prom-listen` (default
`:9100`). Set `--prom-listen ""` to disable it entirely.
Three metrics are exported:
**`nginx_http_requests_total`** — counter, labeled `{host, method, status}`:
```
nginx_http_requests_total{host="example.com",method="GET",status="200"} 18432
nginx_http_requests_total{host="example.com",method="POST",status="201"} 304
nginx_http_requests_total{host="api.example.com",method="GET",status="429"} 57
```
**`nginx_http_response_body_bytes`** — histogram, labeled `{host}`. Observes the
`$body_bytes_sent` value for every request. Bucket upper bounds (bytes):
`256, 1024, 4096, 16384, 65536, 262144, 1048576, +Inf`.
**`nginx_http_request_duration_seconds`** — histogram, labeled `{host}`. Observes the
`$request_time` value for every request. Bucket upper bounds (seconds):
`0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, +Inf`.
Body and request-time histograms use only the `host` label (not method/status) to keep
cardinality bounded — the label sets stay proportional to the number of virtual hosts, not
the number of unique method × status combinations.
The counter map is capped at 100 000 distinct `{host, method, status}` tuples. Entries beyond
the cap are silently dropped for the current scrape interval, so memory is bounded regardless
of traffic patterns.
**Prometheus scrape config:**
```yaml
scrape_configs:
- job_name: nginx_logtail
static_configs:
- targets:
- nginx1:9100
- nginx2:9100
- nginx3:9100
```
Or with service discovery — the collector has no special requirements beyond a reachable
TCP port.
**Example queries:**
```promql
# Request rate per host over last 5 minutes
rate(nginx_http_requests_total[5m])
# 5xx error rate fraction per host
sum by (host) (rate(nginx_http_requests_total{status=~"5.."}[5m]))
/
sum by (host) (rate(nginx_http_requests_total[5m]))
# 95th percentile response time per host
histogram_quantile(0.95,
sum by (host, le) (rate(nginx_http_request_duration_seconds_bucket[5m]))
)
# Median response body size per host
histogram_quantile(0.50,
sum by (host, le) (rate(nginx_http_response_body_bytes_bucket[5m]))
)
```
### Memory usage ### Memory usage
The collector is designed to stay well under 1 GB: The collector is designed to stay well under 1 GB:

BIN
docs/frontend.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 300 KiB

View File

@@ -133,9 +133,11 @@ func indexOf(s string, b byte) int {
// CompiledFilter wraps a pb.Filter with pre-compiled regular expressions. // CompiledFilter wraps a pb.Filter with pre-compiled regular expressions.
// Use CompileFilter to construct one before a query loop. // Use CompileFilter to construct one before a query loop.
type CompiledFilter struct { type CompiledFilter struct {
Proto *pb.Filter Proto *pb.Filter
WebsiteRe *regexp.Regexp // nil if no website_regex or compilation failed WebsiteRe *regexp.Regexp // nil if no website_regex or compilation failed
URIRe *regexp.Regexp // nil if no uri_regex or compilation failed URIRe *regexp.Regexp // nil if no uri_regex or compilation failed
WebsiteReExcl *regexp.Regexp // nil if no website_regex_exclude or compilation failed
URIReExcl *regexp.Regexp // nil if no uri_regex_exclude or compilation failed
} }
// CompileFilter compiles the regex fields in f once. Invalid regexes are // CompileFilter compiles the regex fields in f once. Invalid regexes are
@@ -161,6 +163,22 @@ func CompileFilter(f *pb.Filter) *CompiledFilter {
cf.URIRe = re cf.URIRe = re
} }
} }
if f.WebsiteRegexExclude != nil {
re, err := regexp.Compile(f.GetWebsiteRegexExclude())
if err != nil {
log.Printf("store: invalid website_regex_exclude %q: %v", f.GetWebsiteRegexExclude(), err)
} else {
cf.WebsiteReExcl = re
}
}
if f.UriRegexExclude != nil {
re, err := regexp.Compile(f.GetUriRegexExclude())
if err != nil {
log.Printf("store: invalid uri_regex_exclude %q: %v", f.GetUriRegexExclude(), err)
} else {
cf.URIReExcl = re
}
}
return cf return cf
} }
@@ -193,6 +211,18 @@ func MatchesFilter(t Tuple6, f *CompiledFilter) bool {
if p.UriRegex != nil && f.URIRe == nil { if p.UriRegex != nil && f.URIRe == nil {
return false return false
} }
if f.WebsiteReExcl != nil && f.WebsiteReExcl.MatchString(t.Website) {
return false
}
if p.WebsiteRegexExclude != nil && f.WebsiteReExcl == nil {
return false
}
if f.URIReExcl != nil && f.URIReExcl.MatchString(t.URI) {
return false
}
if p.UriRegexExclude != nil && f.URIReExcl == nil {
return false
}
if p.HttpResponse != nil && !matchesStatusOp(t.Status, p.GetHttpResponse(), p.StatusOp) { if p.HttpResponse != nil && !matchesStatusOp(t.Status, p.GetHttpResponse(), p.StatusOp) {
return false return false
} }

File diff suppressed because it is too large Load Diff

View File

@@ -31,8 +31,10 @@ message Filter {
optional string http_request_uri = 3; optional string http_request_uri = 3;
optional int32 http_response = 4; optional int32 http_response = 4;
StatusOp status_op = 5; // operator for http_response; ignored when unset StatusOp status_op = 5; // operator for http_response; ignored when unset
optional string website_regex = 6; // RE2 regex matched against website optional string website_regex = 6; // RE2 regex matched against website
optional string uri_regex = 7; // RE2 regex matched against http_request_uri optional string uri_regex = 7; // RE2 regex matched against http_request_uri
optional string website_regex_exclude = 11; // RE2 regex; entries matching this are excluded
optional string uri_regex_exclude = 12; // RE2 regex; entries matching this are excluded
TorFilter tor = 8; // restrict to TOR / non-TOR clients TorFilter tor = 8; // restrict to TOR / non-TOR clients
optional int32 asn_number = 9; // filter by client ASN optional int32 asn_number = 9; // filter by client ASN
StatusOp asn_op = 10; // operator for asn_number; ignored when unset StatusOp asn_op = 10; // operator for asn_number; ignored when unset
@@ -99,8 +101,15 @@ message Snapshot {
string source = 1; string source = 1;
int64 timestamp = 2; int64 timestamp = 2;
repeated TopNEntry entries = 3; // top-50K for this 1-minute bucket, sorted desc repeated TopNEntry entries = 3; // top-50K for this 1-minute bucket, sorted desc
bool is_coarse = 4; // true for coarse-ring (5-min) buckets in DumpSnapshots
} }
// DumpSnapshots — returns all ring buffer contents for backfill on aggregator restart.
// Streams fine-ring buckets (is_coarse=false) followed by coarse-ring buckets
// (is_coarse=true), then closes. The lock is held only for the initial copy.
message DumpSnapshotsRequest {}
// ListTargets — returns the targets this node knows about. // ListTargets — returns the targets this node knows about.
// The aggregator returns all configured collectors; a collector returns itself. // The aggregator returns all configured collectors; a collector returns itself.
@@ -120,4 +129,5 @@ service LogtailService {
rpc Trend (TrendRequest) returns (TrendResponse); rpc Trend (TrendRequest) returns (TrendResponse);
rpc StreamSnapshots (SnapshotRequest) returns (stream Snapshot); rpc StreamSnapshots (SnapshotRequest) returns (stream Snapshot);
rpc ListTargets (ListTargetsRequest) returns (ListTargetsResponse); rpc ListTargets (ListTargetsRequest) returns (ListTargetsResponse);
rpc DumpSnapshots (DumpSnapshotsRequest) returns (stream Snapshot);
} }

View File

@@ -1,239 +0,0 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.6.1
// - protoc v3.21.12
// source: proto/logtail.proto
package logtailpb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
LogtailService_TopN_FullMethodName = "/logtail.LogtailService/TopN"
LogtailService_Trend_FullMethodName = "/logtail.LogtailService/Trend"
LogtailService_StreamSnapshots_FullMethodName = "/logtail.LogtailService/StreamSnapshots"
LogtailService_ListTargets_FullMethodName = "/logtail.LogtailService/ListTargets"
)
// LogtailServiceClient is the client API for LogtailService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type LogtailServiceClient interface {
TopN(ctx context.Context, in *TopNRequest, opts ...grpc.CallOption) (*TopNResponse, error)
Trend(ctx context.Context, in *TrendRequest, opts ...grpc.CallOption) (*TrendResponse, error)
StreamSnapshots(ctx context.Context, in *SnapshotRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Snapshot], error)
ListTargets(ctx context.Context, in *ListTargetsRequest, opts ...grpc.CallOption) (*ListTargetsResponse, error)
}
type logtailServiceClient struct {
cc grpc.ClientConnInterface
}
func NewLogtailServiceClient(cc grpc.ClientConnInterface) LogtailServiceClient {
return &logtailServiceClient{cc}
}
func (c *logtailServiceClient) TopN(ctx context.Context, in *TopNRequest, opts ...grpc.CallOption) (*TopNResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(TopNResponse)
err := c.cc.Invoke(ctx, LogtailService_TopN_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *logtailServiceClient) Trend(ctx context.Context, in *TrendRequest, opts ...grpc.CallOption) (*TrendResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(TrendResponse)
err := c.cc.Invoke(ctx, LogtailService_Trend_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *logtailServiceClient) StreamSnapshots(ctx context.Context, in *SnapshotRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Snapshot], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &LogtailService_ServiceDesc.Streams[0], LogtailService_StreamSnapshots_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[SnapshotRequest, Snapshot]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type LogtailService_StreamSnapshotsClient = grpc.ServerStreamingClient[Snapshot]
func (c *logtailServiceClient) ListTargets(ctx context.Context, in *ListTargetsRequest, opts ...grpc.CallOption) (*ListTargetsResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ListTargetsResponse)
err := c.cc.Invoke(ctx, LogtailService_ListTargets_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// LogtailServiceServer is the server API for LogtailService service.
// All implementations must embed UnimplementedLogtailServiceServer
// for forward compatibility.
type LogtailServiceServer interface {
TopN(context.Context, *TopNRequest) (*TopNResponse, error)
Trend(context.Context, *TrendRequest) (*TrendResponse, error)
StreamSnapshots(*SnapshotRequest, grpc.ServerStreamingServer[Snapshot]) error
ListTargets(context.Context, *ListTargetsRequest) (*ListTargetsResponse, error)
mustEmbedUnimplementedLogtailServiceServer()
}
// UnimplementedLogtailServiceServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedLogtailServiceServer struct{}
func (UnimplementedLogtailServiceServer) TopN(context.Context, *TopNRequest) (*TopNResponse, error) {
return nil, status.Error(codes.Unimplemented, "method TopN not implemented")
}
func (UnimplementedLogtailServiceServer) Trend(context.Context, *TrendRequest) (*TrendResponse, error) {
return nil, status.Error(codes.Unimplemented, "method Trend not implemented")
}
func (UnimplementedLogtailServiceServer) StreamSnapshots(*SnapshotRequest, grpc.ServerStreamingServer[Snapshot]) error {
return status.Error(codes.Unimplemented, "method StreamSnapshots not implemented")
}
func (UnimplementedLogtailServiceServer) ListTargets(context.Context, *ListTargetsRequest) (*ListTargetsResponse, error) {
return nil, status.Error(codes.Unimplemented, "method ListTargets not implemented")
}
func (UnimplementedLogtailServiceServer) mustEmbedUnimplementedLogtailServiceServer() {}
func (UnimplementedLogtailServiceServer) testEmbeddedByValue() {}
// UnsafeLogtailServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to LogtailServiceServer will
// result in compilation errors.
type UnsafeLogtailServiceServer interface {
mustEmbedUnimplementedLogtailServiceServer()
}
func RegisterLogtailServiceServer(s grpc.ServiceRegistrar, srv LogtailServiceServer) {
// If the following call panics, it indicates UnimplementedLogtailServiceServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&LogtailService_ServiceDesc, srv)
}
func _LogtailService_TopN_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(TopNRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(LogtailServiceServer).TopN(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: LogtailService_TopN_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(LogtailServiceServer).TopN(ctx, req.(*TopNRequest))
}
return interceptor(ctx, in, info, handler)
}
func _LogtailService_Trend_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(TrendRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(LogtailServiceServer).Trend(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: LogtailService_Trend_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(LogtailServiceServer).Trend(ctx, req.(*TrendRequest))
}
return interceptor(ctx, in, info, handler)
}
func _LogtailService_StreamSnapshots_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SnapshotRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(LogtailServiceServer).StreamSnapshots(m, &grpc.GenericServerStream[SnapshotRequest, Snapshot]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type LogtailService_StreamSnapshotsServer = grpc.ServerStreamingServer[Snapshot]
func _LogtailService_ListTargets_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListTargetsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(LogtailServiceServer).ListTargets(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: LogtailService_ListTargets_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(LogtailServiceServer).ListTargets(ctx, req.(*ListTargetsRequest))
}
return interceptor(ctx, in, info, handler)
}
// LogtailService_ServiceDesc is the grpc.ServiceDesc for LogtailService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var LogtailService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "logtail.LogtailService",
HandlerType: (*LogtailServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "TopN",
Handler: _LogtailService_TopN_Handler,
},
{
MethodName: "Trend",
Handler: _LogtailService_Trend_Handler,
},
{
MethodName: "ListTargets",
Handler: _LogtailService_ListTargets_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "StreamSnapshots",
Handler: _LogtailService_StreamSnapshots_Handler,
ServerStreams: true,
},
},
Metadata: "proto/logtail.proto",
}

View File

@@ -248,19 +248,21 @@ func (Window) EnumDescriptor() ([]byte, []int) {
// Filter restricts results to entries matching all specified fields. // Filter restricts results to entries matching all specified fields.
// Unset fields match everything. Exact-match and regex fields are ANDed. // Unset fields match everything. Exact-match and regex fields are ANDed.
type Filter struct { type Filter struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
Website *string `protobuf:"bytes,1,opt,name=website,proto3,oneof" json:"website,omitempty"` Website *string `protobuf:"bytes,1,opt,name=website,proto3,oneof" json:"website,omitempty"`
ClientPrefix *string `protobuf:"bytes,2,opt,name=client_prefix,json=clientPrefix,proto3,oneof" json:"client_prefix,omitempty"` ClientPrefix *string `protobuf:"bytes,2,opt,name=client_prefix,json=clientPrefix,proto3,oneof" json:"client_prefix,omitempty"`
HttpRequestUri *string `protobuf:"bytes,3,opt,name=http_request_uri,json=httpRequestUri,proto3,oneof" json:"http_request_uri,omitempty"` HttpRequestUri *string `protobuf:"bytes,3,opt,name=http_request_uri,json=httpRequestUri,proto3,oneof" json:"http_request_uri,omitempty"`
HttpResponse *int32 `protobuf:"varint,4,opt,name=http_response,json=httpResponse,proto3,oneof" json:"http_response,omitempty"` HttpResponse *int32 `protobuf:"varint,4,opt,name=http_response,json=httpResponse,proto3,oneof" json:"http_response,omitempty"`
StatusOp StatusOp `protobuf:"varint,5,opt,name=status_op,json=statusOp,proto3,enum=logtail.StatusOp" json:"status_op,omitempty"` // operator for http_response; ignored when unset StatusOp StatusOp `protobuf:"varint,5,opt,name=status_op,json=statusOp,proto3,enum=logtail.StatusOp" json:"status_op,omitempty"` // operator for http_response; ignored when unset
WebsiteRegex *string `protobuf:"bytes,6,opt,name=website_regex,json=websiteRegex,proto3,oneof" json:"website_regex,omitempty"` // RE2 regex matched against website WebsiteRegex *string `protobuf:"bytes,6,opt,name=website_regex,json=websiteRegex,proto3,oneof" json:"website_regex,omitempty"` // RE2 regex matched against website
UriRegex *string `protobuf:"bytes,7,opt,name=uri_regex,json=uriRegex,proto3,oneof" json:"uri_regex,omitempty"` // RE2 regex matched against http_request_uri UriRegex *string `protobuf:"bytes,7,opt,name=uri_regex,json=uriRegex,proto3,oneof" json:"uri_regex,omitempty"` // RE2 regex matched against http_request_uri
Tor TorFilter `protobuf:"varint,8,opt,name=tor,proto3,enum=logtail.TorFilter" json:"tor,omitempty"` // restrict to TOR / non-TOR clients WebsiteRegexExclude *string `protobuf:"bytes,11,opt,name=website_regex_exclude,json=websiteRegexExclude,proto3,oneof" json:"website_regex_exclude,omitempty"` // RE2 regex; entries matching this are excluded
AsnNumber *int32 `protobuf:"varint,9,opt,name=asn_number,json=asnNumber,proto3,oneof" json:"asn_number,omitempty"` // filter by client ASN UriRegexExclude *string `protobuf:"bytes,12,opt,name=uri_regex_exclude,json=uriRegexExclude,proto3,oneof" json:"uri_regex_exclude,omitempty"` // RE2 regex; entries matching this are excluded
AsnOp StatusOp `protobuf:"varint,10,opt,name=asn_op,json=asnOp,proto3,enum=logtail.StatusOp" json:"asn_op,omitempty"` // operator for asn_number; ignored when unset Tor TorFilter `protobuf:"varint,8,opt,name=tor,proto3,enum=logtail.TorFilter" json:"tor,omitempty"` // restrict to TOR / non-TOR clients
unknownFields protoimpl.UnknownFields AsnNumber *int32 `protobuf:"varint,9,opt,name=asn_number,json=asnNumber,proto3,oneof" json:"asn_number,omitempty"` // filter by client ASN
sizeCache protoimpl.SizeCache AsnOp StatusOp `protobuf:"varint,10,opt,name=asn_op,json=asnOp,proto3,enum=logtail.StatusOp" json:"asn_op,omitempty"` // operator for asn_number; ignored when unset
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
} }
func (x *Filter) Reset() { func (x *Filter) Reset() {
@@ -342,6 +344,20 @@ func (x *Filter) GetUriRegex() string {
return "" return ""
} }
func (x *Filter) GetWebsiteRegexExclude() string {
if x != nil && x.WebsiteRegexExclude != nil {
return *x.WebsiteRegexExclude
}
return ""
}
func (x *Filter) GetUriRegexExclude() string {
if x != nil && x.UriRegexExclude != nil {
return *x.UriRegexExclude
}
return ""
}
func (x *Filter) GetTor() TorFilter { func (x *Filter) GetTor() TorFilter {
if x != nil { if x != nil {
return x.Tor return x.Tor
@@ -731,7 +747,8 @@ type Snapshot struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
Source string `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` Source string `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"`
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Entries []*TopNEntry `protobuf:"bytes,3,rep,name=entries,proto3" json:"entries,omitempty"` // top-50K for this 1-minute bucket, sorted desc Entries []*TopNEntry `protobuf:"bytes,3,rep,name=entries,proto3" json:"entries,omitempty"` // top-50K for this 1-minute bucket, sorted desc
IsCoarse bool `protobuf:"varint,4,opt,name=is_coarse,json=isCoarse,proto3" json:"is_coarse,omitempty"` // true for coarse-ring (5-min) buckets in DumpSnapshots
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
} }
@@ -787,6 +804,49 @@ func (x *Snapshot) GetEntries() []*TopNEntry {
return nil return nil
} }
func (x *Snapshot) GetIsCoarse() bool {
if x != nil {
return x.IsCoarse
}
return false
}
type DumpSnapshotsRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *DumpSnapshotsRequest) Reset() {
*x = DumpSnapshotsRequest{}
mi := &file_proto_logtail_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *DumpSnapshotsRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*DumpSnapshotsRequest) ProtoMessage() {}
func (x *DumpSnapshotsRequest) ProtoReflect() protoreflect.Message {
mi := &file_proto_logtail_proto_msgTypes[9]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use DumpSnapshotsRequest.ProtoReflect.Descriptor instead.
func (*DumpSnapshotsRequest) Descriptor() ([]byte, []int) {
return file_proto_logtail_proto_rawDescGZIP(), []int{9}
}
type ListTargetsRequest struct { type ListTargetsRequest struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
@@ -795,7 +855,7 @@ type ListTargetsRequest struct {
func (x *ListTargetsRequest) Reset() { func (x *ListTargetsRequest) Reset() {
*x = ListTargetsRequest{} *x = ListTargetsRequest{}
mi := &file_proto_logtail_proto_msgTypes[9] mi := &file_proto_logtail_proto_msgTypes[10]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -807,7 +867,7 @@ func (x *ListTargetsRequest) String() string {
func (*ListTargetsRequest) ProtoMessage() {} func (*ListTargetsRequest) ProtoMessage() {}
func (x *ListTargetsRequest) ProtoReflect() protoreflect.Message { func (x *ListTargetsRequest) ProtoReflect() protoreflect.Message {
mi := &file_proto_logtail_proto_msgTypes[9] mi := &file_proto_logtail_proto_msgTypes[10]
if x != nil { if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -820,7 +880,7 @@ func (x *ListTargetsRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use ListTargetsRequest.ProtoReflect.Descriptor instead. // Deprecated: Use ListTargetsRequest.ProtoReflect.Descriptor instead.
func (*ListTargetsRequest) Descriptor() ([]byte, []int) { func (*ListTargetsRequest) Descriptor() ([]byte, []int) {
return file_proto_logtail_proto_rawDescGZIP(), []int{9} return file_proto_logtail_proto_rawDescGZIP(), []int{10}
} }
type TargetInfo struct { type TargetInfo struct {
@@ -833,7 +893,7 @@ type TargetInfo struct {
func (x *TargetInfo) Reset() { func (x *TargetInfo) Reset() {
*x = TargetInfo{} *x = TargetInfo{}
mi := &file_proto_logtail_proto_msgTypes[10] mi := &file_proto_logtail_proto_msgTypes[11]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -845,7 +905,7 @@ func (x *TargetInfo) String() string {
func (*TargetInfo) ProtoMessage() {} func (*TargetInfo) ProtoMessage() {}
func (x *TargetInfo) ProtoReflect() protoreflect.Message { func (x *TargetInfo) ProtoReflect() protoreflect.Message {
mi := &file_proto_logtail_proto_msgTypes[10] mi := &file_proto_logtail_proto_msgTypes[11]
if x != nil { if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -858,7 +918,7 @@ func (x *TargetInfo) ProtoReflect() protoreflect.Message {
// Deprecated: Use TargetInfo.ProtoReflect.Descriptor instead. // Deprecated: Use TargetInfo.ProtoReflect.Descriptor instead.
func (*TargetInfo) Descriptor() ([]byte, []int) { func (*TargetInfo) Descriptor() ([]byte, []int) {
return file_proto_logtail_proto_rawDescGZIP(), []int{10} return file_proto_logtail_proto_rawDescGZIP(), []int{11}
} }
func (x *TargetInfo) GetName() string { func (x *TargetInfo) GetName() string {
@@ -884,7 +944,7 @@ type ListTargetsResponse struct {
func (x *ListTargetsResponse) Reset() { func (x *ListTargetsResponse) Reset() {
*x = ListTargetsResponse{} *x = ListTargetsResponse{}
mi := &file_proto_logtail_proto_msgTypes[11] mi := &file_proto_logtail_proto_msgTypes[12]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
} }
@@ -896,7 +956,7 @@ func (x *ListTargetsResponse) String() string {
func (*ListTargetsResponse) ProtoMessage() {} func (*ListTargetsResponse) ProtoMessage() {}
func (x *ListTargetsResponse) ProtoReflect() protoreflect.Message { func (x *ListTargetsResponse) ProtoReflect() protoreflect.Message {
mi := &file_proto_logtail_proto_msgTypes[11] mi := &file_proto_logtail_proto_msgTypes[12]
if x != nil { if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
@@ -909,7 +969,7 @@ func (x *ListTargetsResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use ListTargetsResponse.ProtoReflect.Descriptor instead. // Deprecated: Use ListTargetsResponse.ProtoReflect.Descriptor instead.
func (*ListTargetsResponse) Descriptor() ([]byte, []int) { func (*ListTargetsResponse) Descriptor() ([]byte, []int) {
return file_proto_logtail_proto_rawDescGZIP(), []int{11} return file_proto_logtail_proto_rawDescGZIP(), []int{12}
} }
func (x *ListTargetsResponse) GetTargets() []*TargetInfo { func (x *ListTargetsResponse) GetTargets() []*TargetInfo {
@@ -923,7 +983,7 @@ var File_proto_logtail_proto protoreflect.FileDescriptor
const file_proto_logtail_proto_rawDesc = "" + const file_proto_logtail_proto_rawDesc = "" +
"\n" + "\n" +
"\x13proto/logtail.proto\x12\alogtail\"\x8e\x04\n" + "\x13proto/logtail.proto\x12\alogtail\"\xa8\x05\n" +
"\x06Filter\x12\x1d\n" + "\x06Filter\x12\x1d\n" +
"\awebsite\x18\x01 \x01(\tH\x00R\awebsite\x88\x01\x01\x12(\n" + "\awebsite\x18\x01 \x01(\tH\x00R\awebsite\x88\x01\x01\x12(\n" +
"\rclient_prefix\x18\x02 \x01(\tH\x01R\fclientPrefix\x88\x01\x01\x12-\n" + "\rclient_prefix\x18\x02 \x01(\tH\x01R\fclientPrefix\x88\x01\x01\x12-\n" +
@@ -931,10 +991,12 @@ const file_proto_logtail_proto_rawDesc = "" +
"\rhttp_response\x18\x04 \x01(\x05H\x03R\fhttpResponse\x88\x01\x01\x12.\n" + "\rhttp_response\x18\x04 \x01(\x05H\x03R\fhttpResponse\x88\x01\x01\x12.\n" +
"\tstatus_op\x18\x05 \x01(\x0e2\x11.logtail.StatusOpR\bstatusOp\x12(\n" + "\tstatus_op\x18\x05 \x01(\x0e2\x11.logtail.StatusOpR\bstatusOp\x12(\n" +
"\rwebsite_regex\x18\x06 \x01(\tH\x04R\fwebsiteRegex\x88\x01\x01\x12 \n" + "\rwebsite_regex\x18\x06 \x01(\tH\x04R\fwebsiteRegex\x88\x01\x01\x12 \n" +
"\turi_regex\x18\a \x01(\tH\x05R\buriRegex\x88\x01\x01\x12$\n" + "\turi_regex\x18\a \x01(\tH\x05R\buriRegex\x88\x01\x01\x127\n" +
"\x15website_regex_exclude\x18\v \x01(\tH\x06R\x13websiteRegexExclude\x88\x01\x01\x12/\n" +
"\x11uri_regex_exclude\x18\f \x01(\tH\aR\x0furiRegexExclude\x88\x01\x01\x12$\n" +
"\x03tor\x18\b \x01(\x0e2\x12.logtail.TorFilterR\x03tor\x12\"\n" + "\x03tor\x18\b \x01(\x0e2\x12.logtail.TorFilterR\x03tor\x12\"\n" +
"\n" + "\n" +
"asn_number\x18\t \x01(\x05H\x06R\tasnNumber\x88\x01\x01\x12(\n" + "asn_number\x18\t \x01(\x05H\bR\tasnNumber\x88\x01\x01\x12(\n" +
"\x06asn_op\x18\n" + "\x06asn_op\x18\n" +
" \x01(\x0e2\x11.logtail.StatusOpR\x05asnOpB\n" + " \x01(\x0e2\x11.logtail.StatusOpR\x05asnOpB\n" +
"\n" + "\n" +
@@ -944,7 +1006,9 @@ const file_proto_logtail_proto_rawDesc = "" +
"\x0e_http_responseB\x10\n" + "\x0e_http_responseB\x10\n" +
"\x0e_website_regexB\f\n" + "\x0e_website_regexB\f\n" +
"\n" + "\n" +
"_uri_regexB\r\n" + "_uri_regexB\x18\n" +
"\x16_website_regex_excludeB\x14\n" +
"\x12_uri_regex_excludeB\r\n" +
"\v_asn_number\"\x9a\x01\n" + "\v_asn_number\"\x9a\x01\n" +
"\vTopNRequest\x12'\n" + "\vTopNRequest\x12'\n" +
"\x06filter\x18\x01 \x01(\v2\x0f.logtail.FilterR\x06filter\x12+\n" + "\x06filter\x18\x01 \x01(\v2\x0f.logtail.FilterR\x06filter\x12+\n" +
@@ -967,11 +1031,13 @@ const file_proto_logtail_proto_rawDesc = "" +
"\rTrendResponse\x12+\n" + "\rTrendResponse\x12+\n" +
"\x06points\x18\x01 \x03(\v2\x13.logtail.TrendPointR\x06points\x12\x16\n" + "\x06points\x18\x01 \x03(\v2\x13.logtail.TrendPointR\x06points\x12\x16\n" +
"\x06source\x18\x02 \x01(\tR\x06source\"\x11\n" + "\x06source\x18\x02 \x01(\tR\x06source\"\x11\n" +
"\x0fSnapshotRequest\"n\n" + "\x0fSnapshotRequest\"\x8b\x01\n" +
"\bSnapshot\x12\x16\n" + "\bSnapshot\x12\x16\n" +
"\x06source\x18\x01 \x01(\tR\x06source\x12\x1c\n" + "\x06source\x18\x01 \x01(\tR\x06source\x12\x1c\n" +
"\ttimestamp\x18\x02 \x01(\x03R\ttimestamp\x12,\n" + "\ttimestamp\x18\x02 \x01(\x03R\ttimestamp\x12,\n" +
"\aentries\x18\x03 \x03(\v2\x12.logtail.TopNEntryR\aentries\"\x14\n" + "\aentries\x18\x03 \x03(\v2\x12.logtail.TopNEntryR\aentries\x12\x1b\n" +
"\tis_coarse\x18\x04 \x01(\bR\bisCoarse\"\x16\n" +
"\x14DumpSnapshotsRequest\"\x14\n" +
"\x12ListTargetsRequest\"4\n" + "\x12ListTargetsRequest\"4\n" +
"\n" + "\n" +
"TargetInfo\x12\x12\n" + "TargetInfo\x12\x12\n" +
@@ -1004,12 +1070,13 @@ const file_proto_logtail_proto_rawDesc = "" +
"\x04W15M\x10\x02\x12\b\n" + "\x04W15M\x10\x02\x12\b\n" +
"\x04W60M\x10\x03\x12\a\n" + "\x04W60M\x10\x03\x12\a\n" +
"\x03W6H\x10\x04\x12\b\n" + "\x03W6H\x10\x04\x12\b\n" +
"\x04W24H\x10\x052\x89\x02\n" + "\x04W24H\x10\x052\xce\x02\n" +
"\x0eLogtailService\x123\n" + "\x0eLogtailService\x123\n" +
"\x04TopN\x12\x14.logtail.TopNRequest\x1a\x15.logtail.TopNResponse\x126\n" + "\x04TopN\x12\x14.logtail.TopNRequest\x1a\x15.logtail.TopNResponse\x126\n" +
"\x05Trend\x12\x15.logtail.TrendRequest\x1a\x16.logtail.TrendResponse\x12@\n" + "\x05Trend\x12\x15.logtail.TrendRequest\x1a\x16.logtail.TrendResponse\x12@\n" +
"\x0fStreamSnapshots\x12\x18.logtail.SnapshotRequest\x1a\x11.logtail.Snapshot0\x01\x12H\n" + "\x0fStreamSnapshots\x12\x18.logtail.SnapshotRequest\x1a\x11.logtail.Snapshot0\x01\x12H\n" +
"\vListTargets\x12\x1b.logtail.ListTargetsRequest\x1a\x1c.logtail.ListTargetsResponseB0Z.git.ipng.ch/ipng/nginx-logtail/proto/logtailpbb\x06proto3" "\vListTargets\x12\x1b.logtail.ListTargetsRequest\x1a\x1c.logtail.ListTargetsResponse\x12C\n" +
"\rDumpSnapshots\x12\x1d.logtail.DumpSnapshotsRequest\x1a\x11.logtail.Snapshot0\x01B0Z.git.ipng.ch/ipng/nginx-logtail/proto/logtailpbb\x06proto3"
var ( var (
file_proto_logtail_proto_rawDescOnce sync.Once file_proto_logtail_proto_rawDescOnce sync.Once
@@ -1024,24 +1091,25 @@ func file_proto_logtail_proto_rawDescGZIP() []byte {
} }
var file_proto_logtail_proto_enumTypes = make([]protoimpl.EnumInfo, 4) var file_proto_logtail_proto_enumTypes = make([]protoimpl.EnumInfo, 4)
var file_proto_logtail_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_proto_logtail_proto_msgTypes = make([]protoimpl.MessageInfo, 13)
var file_proto_logtail_proto_goTypes = []any{ var file_proto_logtail_proto_goTypes = []any{
(TorFilter)(0), // 0: logtail.TorFilter (TorFilter)(0), // 0: logtail.TorFilter
(StatusOp)(0), // 1: logtail.StatusOp (StatusOp)(0), // 1: logtail.StatusOp
(GroupBy)(0), // 2: logtail.GroupBy (GroupBy)(0), // 2: logtail.GroupBy
(Window)(0), // 3: logtail.Window (Window)(0), // 3: logtail.Window
(*Filter)(nil), // 4: logtail.Filter (*Filter)(nil), // 4: logtail.Filter
(*TopNRequest)(nil), // 5: logtail.TopNRequest (*TopNRequest)(nil), // 5: logtail.TopNRequest
(*TopNEntry)(nil), // 6: logtail.TopNEntry (*TopNEntry)(nil), // 6: logtail.TopNEntry
(*TopNResponse)(nil), // 7: logtail.TopNResponse (*TopNResponse)(nil), // 7: logtail.TopNResponse
(*TrendRequest)(nil), // 8: logtail.TrendRequest (*TrendRequest)(nil), // 8: logtail.TrendRequest
(*TrendPoint)(nil), // 9: logtail.TrendPoint (*TrendPoint)(nil), // 9: logtail.TrendPoint
(*TrendResponse)(nil), // 10: logtail.TrendResponse (*TrendResponse)(nil), // 10: logtail.TrendResponse
(*SnapshotRequest)(nil), // 11: logtail.SnapshotRequest (*SnapshotRequest)(nil), // 11: logtail.SnapshotRequest
(*Snapshot)(nil), // 12: logtail.Snapshot (*Snapshot)(nil), // 12: logtail.Snapshot
(*ListTargetsRequest)(nil), // 13: logtail.ListTargetsRequest (*DumpSnapshotsRequest)(nil), // 13: logtail.DumpSnapshotsRequest
(*TargetInfo)(nil), // 14: logtail.TargetInfo (*ListTargetsRequest)(nil), // 14: logtail.ListTargetsRequest
(*ListTargetsResponse)(nil), // 15: logtail.ListTargetsResponse (*TargetInfo)(nil), // 15: logtail.TargetInfo
(*ListTargetsResponse)(nil), // 16: logtail.ListTargetsResponse
} }
var file_proto_logtail_proto_depIdxs = []int32{ var file_proto_logtail_proto_depIdxs = []int32{
1, // 0: logtail.Filter.status_op:type_name -> logtail.StatusOp 1, // 0: logtail.Filter.status_op:type_name -> logtail.StatusOp
@@ -1055,17 +1123,19 @@ var file_proto_logtail_proto_depIdxs = []int32{
3, // 8: logtail.TrendRequest.window:type_name -> logtail.Window 3, // 8: logtail.TrendRequest.window:type_name -> logtail.Window
9, // 9: logtail.TrendResponse.points:type_name -> logtail.TrendPoint 9, // 9: logtail.TrendResponse.points:type_name -> logtail.TrendPoint
6, // 10: logtail.Snapshot.entries:type_name -> logtail.TopNEntry 6, // 10: logtail.Snapshot.entries:type_name -> logtail.TopNEntry
14, // 11: logtail.ListTargetsResponse.targets:type_name -> logtail.TargetInfo 15, // 11: logtail.ListTargetsResponse.targets:type_name -> logtail.TargetInfo
5, // 12: logtail.LogtailService.TopN:input_type -> logtail.TopNRequest 5, // 12: logtail.LogtailService.TopN:input_type -> logtail.TopNRequest
8, // 13: logtail.LogtailService.Trend:input_type -> logtail.TrendRequest 8, // 13: logtail.LogtailService.Trend:input_type -> logtail.TrendRequest
11, // 14: logtail.LogtailService.StreamSnapshots:input_type -> logtail.SnapshotRequest 11, // 14: logtail.LogtailService.StreamSnapshots:input_type -> logtail.SnapshotRequest
13, // 15: logtail.LogtailService.ListTargets:input_type -> logtail.ListTargetsRequest 14, // 15: logtail.LogtailService.ListTargets:input_type -> logtail.ListTargetsRequest
7, // 16: logtail.LogtailService.TopN:output_type -> logtail.TopNResponse 13, // 16: logtail.LogtailService.DumpSnapshots:input_type -> logtail.DumpSnapshotsRequest
10, // 17: logtail.LogtailService.Trend:output_type -> logtail.TrendResponse 7, // 17: logtail.LogtailService.TopN:output_type -> logtail.TopNResponse
12, // 18: logtail.LogtailService.StreamSnapshots:output_type -> logtail.Snapshot 10, // 18: logtail.LogtailService.Trend:output_type -> logtail.TrendResponse
15, // 19: logtail.LogtailService.ListTargets:output_type -> logtail.ListTargetsResponse 12, // 19: logtail.LogtailService.StreamSnapshots:output_type -> logtail.Snapshot
16, // [16:20] is the sub-list for method output_type 16, // 20: logtail.LogtailService.ListTargets:output_type -> logtail.ListTargetsResponse
12, // [12:16] is the sub-list for method input_type 12, // 21: logtail.LogtailService.DumpSnapshots:output_type -> logtail.Snapshot
17, // [17:22] is the sub-list for method output_type
12, // [12:17] is the sub-list for method input_type
12, // [12:12] is the sub-list for extension type_name 12, // [12:12] is the sub-list for extension type_name
12, // [12:12] is the sub-list for extension extendee 12, // [12:12] is the sub-list for extension extendee
0, // [0:12] is the sub-list for field type_name 0, // [0:12] is the sub-list for field type_name
@@ -1083,7 +1153,7 @@ func file_proto_logtail_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_logtail_proto_rawDesc), len(file_proto_logtail_proto_rawDesc)), RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_logtail_proto_rawDesc), len(file_proto_logtail_proto_rawDesc)),
NumEnums: 4, NumEnums: 4,
NumMessages: 12, NumMessages: 13,
NumExtensions: 0, NumExtensions: 0,
NumServices: 1, NumServices: 1,
}, },

View File

@@ -23,6 +23,7 @@ const (
LogtailService_Trend_FullMethodName = "/logtail.LogtailService/Trend" LogtailService_Trend_FullMethodName = "/logtail.LogtailService/Trend"
LogtailService_StreamSnapshots_FullMethodName = "/logtail.LogtailService/StreamSnapshots" LogtailService_StreamSnapshots_FullMethodName = "/logtail.LogtailService/StreamSnapshots"
LogtailService_ListTargets_FullMethodName = "/logtail.LogtailService/ListTargets" LogtailService_ListTargets_FullMethodName = "/logtail.LogtailService/ListTargets"
LogtailService_DumpSnapshots_FullMethodName = "/logtail.LogtailService/DumpSnapshots"
) )
// LogtailServiceClient is the client API for LogtailService service. // LogtailServiceClient is the client API for LogtailService service.
@@ -33,6 +34,7 @@ type LogtailServiceClient interface {
Trend(ctx context.Context, in *TrendRequest, opts ...grpc.CallOption) (*TrendResponse, error) Trend(ctx context.Context, in *TrendRequest, opts ...grpc.CallOption) (*TrendResponse, error)
StreamSnapshots(ctx context.Context, in *SnapshotRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Snapshot], error) StreamSnapshots(ctx context.Context, in *SnapshotRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Snapshot], error)
ListTargets(ctx context.Context, in *ListTargetsRequest, opts ...grpc.CallOption) (*ListTargetsResponse, error) ListTargets(ctx context.Context, in *ListTargetsRequest, opts ...grpc.CallOption) (*ListTargetsResponse, error)
DumpSnapshots(ctx context.Context, in *DumpSnapshotsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Snapshot], error)
} }
type logtailServiceClient struct { type logtailServiceClient struct {
@@ -92,6 +94,25 @@ func (c *logtailServiceClient) ListTargets(ctx context.Context, in *ListTargetsR
return out, nil return out, nil
} }
func (c *logtailServiceClient) DumpSnapshots(ctx context.Context, in *DumpSnapshotsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Snapshot], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &LogtailService_ServiceDesc.Streams[1], LogtailService_DumpSnapshots_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[DumpSnapshotsRequest, Snapshot]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type LogtailService_DumpSnapshotsClient = grpc.ServerStreamingClient[Snapshot]
// LogtailServiceServer is the server API for LogtailService service. // LogtailServiceServer is the server API for LogtailService service.
// All implementations must embed UnimplementedLogtailServiceServer // All implementations must embed UnimplementedLogtailServiceServer
// for forward compatibility. // for forward compatibility.
@@ -100,6 +121,7 @@ type LogtailServiceServer interface {
Trend(context.Context, *TrendRequest) (*TrendResponse, error) Trend(context.Context, *TrendRequest) (*TrendResponse, error)
StreamSnapshots(*SnapshotRequest, grpc.ServerStreamingServer[Snapshot]) error StreamSnapshots(*SnapshotRequest, grpc.ServerStreamingServer[Snapshot]) error
ListTargets(context.Context, *ListTargetsRequest) (*ListTargetsResponse, error) ListTargets(context.Context, *ListTargetsRequest) (*ListTargetsResponse, error)
DumpSnapshots(*DumpSnapshotsRequest, grpc.ServerStreamingServer[Snapshot]) error
mustEmbedUnimplementedLogtailServiceServer() mustEmbedUnimplementedLogtailServiceServer()
} }
@@ -122,6 +144,9 @@ func (UnimplementedLogtailServiceServer) StreamSnapshots(*SnapshotRequest, grpc.
func (UnimplementedLogtailServiceServer) ListTargets(context.Context, *ListTargetsRequest) (*ListTargetsResponse, error) { func (UnimplementedLogtailServiceServer) ListTargets(context.Context, *ListTargetsRequest) (*ListTargetsResponse, error) {
return nil, status.Error(codes.Unimplemented, "method ListTargets not implemented") return nil, status.Error(codes.Unimplemented, "method ListTargets not implemented")
} }
func (UnimplementedLogtailServiceServer) DumpSnapshots(*DumpSnapshotsRequest, grpc.ServerStreamingServer[Snapshot]) error {
return status.Error(codes.Unimplemented, "method DumpSnapshots not implemented")
}
func (UnimplementedLogtailServiceServer) mustEmbedUnimplementedLogtailServiceServer() {} func (UnimplementedLogtailServiceServer) mustEmbedUnimplementedLogtailServiceServer() {}
func (UnimplementedLogtailServiceServer) testEmbeddedByValue() {} func (UnimplementedLogtailServiceServer) testEmbeddedByValue() {}
@@ -208,6 +233,17 @@ func _LogtailService_ListTargets_Handler(srv interface{}, ctx context.Context, d
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _LogtailService_DumpSnapshots_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(DumpSnapshotsRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(LogtailServiceServer).DumpSnapshots(m, &grpc.GenericServerStream[DumpSnapshotsRequest, Snapshot]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type LogtailService_DumpSnapshotsServer = grpc.ServerStreamingServer[Snapshot]
// LogtailService_ServiceDesc is the grpc.ServiceDesc for LogtailService service. // LogtailService_ServiceDesc is the grpc.ServiceDesc for LogtailService service.
// It's only intended for direct use with grpc.RegisterService, // It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy) // and not to be introspected or modified (even as a copy)
@@ -234,6 +270,11 @@ var LogtailService_ServiceDesc = grpc.ServiceDesc{
Handler: _LogtailService_StreamSnapshots_Handler, Handler: _LogtailService_StreamSnapshots_Handler,
ServerStreams: true, ServerStreams: true,
}, },
{
StreamName: "DumpSnapshots",
Handler: _LogtailService_DumpSnapshots_Handler,
ServerStreams: true,
},
}, },
Metadata: "proto/logtail.proto", Metadata: "proto/logtail.proto",
} }