package main import ( "context" "encoding/json" "flag" "fmt" "os" "sync" "time" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" ) type trendResult struct { target string resp *pb.TrendResponse err error } func runTrend(args []string) { fs := flag.NewFlagSet("trend", flag.ExitOnError) sf, targetFlag := bindShared(fs) window := fs.String("window", "5m", "time window: 1m 5m 15m 60m 6h 24h") fs.Parse(args) sf.resolve(*targetFlag) win := parseWindow(*window) filter := buildFilter(sf) results := fanOutTrend(sf.targets, filter, win) for _, r := range results { if hdr := targetHeader(r.target, r.resp.GetSource(), len(sf.targets)); hdr != "" { fmt.Println(hdr) } if r.err != nil { fmt.Fprintf(os.Stderr, "error from %s: %v\n", r.target, r.err) continue } if sf.jsonOut { printTrendJSON(r) } else { printTrendTable(r) } if len(sf.targets) > 1 { fmt.Println() } } } func fanOutTrend(targets []string, filter *pb.Filter, window pb.Window) []trendResult { results := make([]trendResult, len(targets)) var wg sync.WaitGroup for i, t := range targets { wg.Add(1) go func(i int, addr string) { defer wg.Done() results[i].target = addr conn, client, err := dial(addr) if err != nil { results[i].err = err results[i].resp = &pb.TrendResponse{} return } defer conn.Close() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() resp, err := client.Trend(ctx, &pb.TrendRequest{ Filter: filter, Window: window, }) results[i].resp = resp results[i].err = err }(i, t) } wg.Wait() return results } func printTrendTable(r trendResult) { if len(r.resp.Points) == 0 { fmt.Println("(no data)") return } rows := [][]string{{"TIME (UTC)", "COUNT"}} for _, p := range r.resp.Points { rows = append(rows, []string{fmtTime(p.TimestampUnix), fmtCount(p.Count)}) } printTable(os.Stdout, rows) } func printTrendJSON(r trendResult) { type point struct { Ts int64 `json:"ts"` Count int64 `json:"count"` } type out struct { Source string `json:"source"` Target string `json:"target"` Points []point `json:"points"` } o := out{ Source: r.resp.Source, Target: r.target, Points: make([]point, len(r.resp.Points)), } for i, p := range r.resp.Points { o.Points[i] = point{Ts: p.TimestampUnix, Count: p.Count} } b, _ := json.Marshal(o) fmt.Println(string(b)) }