Files
nginx-logtail/cmd/aggregator/subscriber.go
T
pim 6647f95be4 RELEASE 1.0.1: v2 log format, source_tag-labeled metrics, lint cleanup
Wire-format and metric overhaul. Both file and UDP ingest now share one
versioned ParseLine that dispatches on the v<N>\t prefix; v1 stays
unchanged, v2 adds $bytes_sent (replacing $body_bytes_sent),
$request_length, $upstream_response_time, and $upstream_status. File
ingest gains the same versioning, and the legacy positional file format
is removed (no live deployments).

Prometheus exposition is rewritten:

  - nginx_http_bytes_sent and nginx_http_request_duration_seconds gain
    a source_tag label.
  - nginx_http_requests_by_source_total gains status_class.
  - New v2-only metrics: nginx_http_request_bytes,
    nginx_http_upstream_duration_seconds,
    nginx_http_upstream_requests_total{status_class}.
  - Dropped nginx_http_response_body_bytes_by_source (subsumed by the
    dual-labeled bytes_sent metric).

Adds 'make fixstyle' (gofmt -w) and clears all golangci-lint findings
across the repo (errcheck, S1001, ST1005, unused).

Docs in design.md FR-2/FR-8 and user-guide.md are rewritten to present
v2 as the recommended log format.
2026-05-01 15:40:53 +02:00

100 lines
2.4 KiB
Go

package main
import (
"context"
"log"
"time"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// CollectorSub maintains a persistent StreamSnapshots connection to one
// collector. It reconnects with exponential backoff on any error and marks
// the collector degraded (zeroing its contribution) after 3 consecutive
// failures.
type CollectorSub struct {
addr string
merger *Merger
registry *TargetRegistry
}
func NewCollectorSub(addr string, merger *Merger, registry *TargetRegistry) *CollectorSub {
return &CollectorSub{addr: addr, merger: merger, registry: registry}
}
// Run blocks until ctx is cancelled.
func (cs *CollectorSub) Run(ctx context.Context) {
backoff := 100 * time.Millisecond
const maxBackoff = 30 * time.Second
fails := 0
degraded := false
for {
if ctx.Err() != nil {
return
}
gotOne, err := cs.stream(ctx)
if ctx.Err() != nil {
return
}
if gotOne && degraded {
// Recovered: contribution is already flowing in again via Apply.
degraded = false
fails = 0
backoff = 100 * time.Millisecond
log.Printf("subscriber: collector %s recovered", cs.addr)
}
if err != nil {
fails++
log.Printf("subscriber: collector %s error (fail %d): %v", cs.addr, fails, err)
if fails >= 3 && !degraded {
degraded = true
cs.merger.Zero(cs.addr)
log.Printf("subscriber: collector %s degraded — contribution zeroed", cs.addr)
}
}
select {
case <-ctx.Done():
return
case <-time.After(backoff):
}
backoff = min(backoff*2, maxBackoff)
}
}
// stream opens a single StreamSnapshots RPC and feeds snapshots into the
// merger until the stream errors or ctx is cancelled. Returns (gotAtLeastOne, err).
func (cs *CollectorSub) stream(ctx context.Context) (bool, error) {
conn, err := grpc.NewClient(cs.addr,
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return false, err
}
defer func() { _ = conn.Close() }()
client := pb.NewLogtailServiceClient(conn)
stream, err := client.StreamSnapshots(ctx, &pb.SnapshotRequest{})
if err != nil {
return false, err
}
log.Printf("subscriber: connected to collector %s", cs.addr)
gotOne := false
for {
snap, err := stream.Recv()
if err != nil {
return gotOne, err
}
gotOne = true
cs.registry.SetName(cs.addr, snap.Source)
cs.merger.Apply(snap)
}
}