Files
nginx-logtail/cmd/aggregator/subscriber.go

98 lines
2.2 KiB
Go

package main
import (
"context"
"log"
"time"
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// CollectorSub maintains a persistent StreamSnapshots connection to one
// collector. It reconnects with exponential backoff on any error and marks
// the collector degraded (zeroing its contribution) after 3 consecutive
// failures.
type CollectorSub struct {
addr string
merger *Merger
}
func NewCollectorSub(addr string, merger *Merger) *CollectorSub {
return &CollectorSub{addr: addr, merger: merger}
}
// Run blocks until ctx is cancelled.
func (cs *CollectorSub) Run(ctx context.Context) {
backoff := 100 * time.Millisecond
const maxBackoff = 30 * time.Second
fails := 0
degraded := false
for {
if ctx.Err() != nil {
return
}
gotOne, err := cs.stream(ctx)
if ctx.Err() != nil {
return
}
if gotOne && degraded {
// Recovered: contribution is already flowing in again via Apply.
degraded = false
fails = 0
backoff = 100 * time.Millisecond
log.Printf("subscriber: collector %s recovered", cs.addr)
}
if err != nil {
fails++
log.Printf("subscriber: collector %s error (fail %d): %v", cs.addr, fails, err)
if fails >= 3 && !degraded {
degraded = true
cs.merger.Zero(cs.addr)
log.Printf("subscriber: collector %s degraded — contribution zeroed", cs.addr)
}
}
select {
case <-ctx.Done():
return
case <-time.After(backoff):
}
backoff = min(backoff*2, maxBackoff)
}
}
// stream opens a single StreamSnapshots RPC and feeds snapshots into the
// merger until the stream errors or ctx is cancelled. Returns (gotAtLeastOne, err).
func (cs *CollectorSub) stream(ctx context.Context) (bool, error) {
conn, err := grpc.NewClient(cs.addr,
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return false, err
}
defer conn.Close()
client := pb.NewLogtailServiceClient(conn)
stream, err := client.StreamSnapshots(ctx, &pb.SnapshotRequest{})
if err != nil {
return false, err
}
log.Printf("subscriber: connected to collector %s", cs.addr)
gotOne := false
for {
snap, err := stream.Recv()
if err != nil {
return gotOne, err
}
gotOne = true
cs.merger.Apply(snap)
}
}