package main import ( "context" "log" "time" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) // CollectorSub maintains a persistent StreamSnapshots connection to one // collector. It reconnects with exponential backoff on any error and marks // the collector degraded (zeroing its contribution) after 3 consecutive // failures. type CollectorSub struct { addr string merger *Merger registry *TargetRegistry } func NewCollectorSub(addr string, merger *Merger, registry *TargetRegistry) *CollectorSub { return &CollectorSub{addr: addr, merger: merger, registry: registry} } // Run blocks until ctx is cancelled. func (cs *CollectorSub) Run(ctx context.Context) { backoff := 100 * time.Millisecond const maxBackoff = 30 * time.Second fails := 0 degraded := false for { if ctx.Err() != nil { return } gotOne, err := cs.stream(ctx) if ctx.Err() != nil { return } if gotOne && degraded { // Recovered: contribution is already flowing in again via Apply. degraded = false fails = 0 backoff = 100 * time.Millisecond log.Printf("subscriber: collector %s recovered", cs.addr) } if err != nil { fails++ log.Printf("subscriber: collector %s error (fail %d): %v", cs.addr, fails, err) if fails >= 3 && !degraded { degraded = true cs.merger.Zero(cs.addr) log.Printf("subscriber: collector %s degraded — contribution zeroed", cs.addr) } } select { case <-ctx.Done(): return case <-time.After(backoff): } backoff = min(backoff*2, maxBackoff) } } // stream opens a single StreamSnapshots RPC and feeds snapshots into the // merger until the stream errors or ctx is cancelled. Returns (gotAtLeastOne, err). func (cs *CollectorSub) stream(ctx context.Context) (bool, error) { conn, err := grpc.NewClient(cs.addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return false, err } defer conn.Close() client := pb.NewLogtailServiceClient(conn) stream, err := client.StreamSnapshots(ctx, &pb.SnapshotRequest{}) if err != nil { return false, err } log.Printf("subscriber: connected to collector %s", cs.addr) gotOne := false for { snap, err := stream.Recv() if err != nil { return gotOne, err } gotOne = true cs.registry.SetName(cs.addr, snap.Source) cs.merger.Apply(snap) } }