package main import ( "context" "encoding/json" "flag" "fmt" "io" "log" "os" "os/signal" "syscall" "time" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" ) type streamEvent struct { target string snap *pb.Snapshot err error // non-nil means the stream for this target died } func runStream(args []string) { fs := flag.NewFlagSet("stream", flag.ExitOnError) sf, targetFlag := bindShared(fs) fs.Parse(args) sf.resolve(*targetFlag) filter := buildFilter(sf) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() events := make(chan streamEvent, 64) for _, t := range sf.targets { go streamTarget(ctx, t, filter, events) } nTargets := len(sf.targets) for { select { case <-ctx.Done(): return case ev := <-events: if ev.err != nil { if ev.err != io.EOF && ctx.Err() == nil { fmt.Fprintf(os.Stderr, "stream %s: %v\n", ev.target, ev.err) } continue } if sf.jsonOut { printStreamJSON(ev) } else { printStreamLine(ev, nTargets) } } } } // streamTarget connects to addr and forwards received snapshots to events. // On error it reconnects with a 5 s backoff until ctx is cancelled. func streamTarget(ctx context.Context, addr string, filter *pb.Filter, events chan<- streamEvent) { for { if ctx.Err() != nil { return } err := streamOnce(ctx, addr, filter, events) if ctx.Err() != nil { return } if err != nil { log.Printf("stream %s: %v — reconnecting in 5s", addr, err) select { case <-ctx.Done(): return case <-time.After(5 * time.Second): } } } } func streamOnce(ctx context.Context, addr string, filter *pb.Filter, events chan<- streamEvent) error { conn, client, err := dial(addr) if err != nil { return err } defer conn.Close() stream, err := client.StreamSnapshots(ctx, &pb.SnapshotRequest{}) if err != nil { return err } for { snap, err := stream.Recv() if err != nil { return err } select { case events <- streamEvent{target: addr, snap: snap}: case <-ctx.Done(): return nil } } } func printStreamLine(ev streamEvent, nTargets int) { ts := fmtTime(ev.snap.Timestamp) entries := len(ev.snap.Entries) topLabel := "" var topCount int64 if entries > 0 { topLabel = ev.snap.Entries[0].Label topCount = ev.snap.Entries[0].Count } if nTargets > 1 { src := ev.snap.Source if src == "" { src = ev.target } fmt.Printf("%s %-24s %5d entries top: %s=%s\n", ts, src, entries, topLabel, fmtCount(topCount)) } else { fmt.Printf("%s %5d entries top: %s=%s\n", ts, entries, topLabel, fmtCount(topCount)) } } func printStreamJSON(ev streamEvent) { topLabel := "" var topCount int64 if len(ev.snap.Entries) > 0 { topLabel = ev.snap.Entries[0].Label topCount = ev.snap.Entries[0].Count } obj := map[string]any{ "ts": ev.snap.Timestamp, "source": ev.snap.Source, "target": ev.target, "total_entries": len(ev.snap.Entries), "top_label": topLabel, "top_count": topCount, } b, _ := json.Marshal(obj) fmt.Println(string(b)) }