Compare commits
2 Commits
7f93466645
...
d3160c7dd4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d3160c7dd4 | ||
|
|
1c7637fbc3 |
@@ -364,7 +364,7 @@ pipe-to-`jq` use.
|
||||
### Output
|
||||
|
||||
Default: human-readable table with space-separated thousands (`18 432`).
|
||||
`--json`: one JSON object per target (NDJSON for `stream`).
|
||||
`--json`: a single JSON array (one object per target) for `topn` and `trend`; NDJSON for `stream` (unbounded).
|
||||
|
||||
`stream` reconnects automatically on error (5 s backoff). All other subcommands exit immediately
|
||||
with a non-zero code on gRPC error.
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
@@ -64,15 +65,23 @@ func (srv *Server) ListTargets(_ context.Context, _ *pb.ListTargetsRequest) (*pb
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func peerAddr(ctx context.Context) string {
|
||||
if p, ok := peer.FromContext(ctx); ok {
|
||||
return p.Addr.String()
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
func (srv *Server) StreamSnapshots(_ *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error {
|
||||
ch := srv.cache.Subscribe()
|
||||
defer srv.cache.Unsubscribe(ch)
|
||||
|
||||
log.Printf("server: new StreamSnapshots subscriber")
|
||||
addr := peerAddr(stream.Context())
|
||||
log.Printf("server: new StreamSnapshots subscriber from %s", addr)
|
||||
for {
|
||||
select {
|
||||
case <-stream.Context().Done():
|
||||
log.Printf("server: StreamSnapshots subscriber disconnected")
|
||||
log.Printf("server: StreamSnapshots subscriber disconnected: %s", addr)
|
||||
return nil
|
||||
case snap, ok := <-ch:
|
||||
if !ok {
|
||||
|
||||
@@ -33,6 +33,10 @@ func runTopN(args []string) {
|
||||
|
||||
results := fanOutTopN(sf.targets, filter, grp, *n, win)
|
||||
|
||||
if sf.jsonOut {
|
||||
printTopNJSONArray(results)
|
||||
return
|
||||
}
|
||||
for _, r := range results {
|
||||
if hdr := targetHeader(r.target, r.resp.GetSource(), len(sf.targets)); hdr != "" {
|
||||
fmt.Println(hdr)
|
||||
@@ -41,11 +45,7 @@ func runTopN(args []string) {
|
||||
fmt.Fprintf(os.Stderr, "error from %s: %v\n", r.target, r.err)
|
||||
continue
|
||||
}
|
||||
if sf.jsonOut {
|
||||
printTopNJSON(r)
|
||||
} else {
|
||||
printTopNTable(r)
|
||||
}
|
||||
printTopNTable(r)
|
||||
if len(sf.targets) > 1 {
|
||||
fmt.Println()
|
||||
}
|
||||
@@ -99,7 +99,7 @@ func printTopNTable(r topNResult) {
|
||||
printTable(os.Stdout, rows)
|
||||
}
|
||||
|
||||
func printTopNJSON(r topNResult) {
|
||||
func printTopNJSONArray(results []topNResult) {
|
||||
type entry struct {
|
||||
Label string `json:"label"`
|
||||
Count int64 `json:"count"`
|
||||
@@ -109,14 +109,22 @@ func printTopNJSON(r topNResult) {
|
||||
Target string `json:"target"`
|
||||
Entries []entry `json:"entries"`
|
||||
}
|
||||
o := out{
|
||||
Source: r.resp.Source,
|
||||
Target: r.target,
|
||||
Entries: make([]entry, len(r.resp.Entries)),
|
||||
rows := make([]out, 0, len(results))
|
||||
for _, r := range results {
|
||||
if r.err != nil {
|
||||
fmt.Fprintf(os.Stderr, "error from %s: %v\n", r.target, r.err)
|
||||
continue
|
||||
}
|
||||
o := out{
|
||||
Source: r.resp.Source,
|
||||
Target: r.target,
|
||||
Entries: make([]entry, len(r.resp.Entries)),
|
||||
}
|
||||
for i, e := range r.resp.Entries {
|
||||
o.Entries[i] = entry{Label: e.Label, Count: e.Count}
|
||||
}
|
||||
rows = append(rows, o)
|
||||
}
|
||||
for i, e := range r.resp.Entries {
|
||||
o.Entries[i] = entry{Label: e.Label, Count: e.Count}
|
||||
}
|
||||
b, _ := json.Marshal(o)
|
||||
b, _ := json.Marshal(rows)
|
||||
fmt.Println(string(b))
|
||||
}
|
||||
|
||||
@@ -30,6 +30,10 @@ func runTrend(args []string) {
|
||||
|
||||
results := fanOutTrend(sf.targets, filter, win)
|
||||
|
||||
if sf.jsonOut {
|
||||
printTrendJSONArray(results)
|
||||
return
|
||||
}
|
||||
for _, r := range results {
|
||||
if hdr := targetHeader(r.target, r.resp.GetSource(), len(sf.targets)); hdr != "" {
|
||||
fmt.Println(hdr)
|
||||
@@ -38,11 +42,7 @@ func runTrend(args []string) {
|
||||
fmt.Fprintf(os.Stderr, "error from %s: %v\n", r.target, r.err)
|
||||
continue
|
||||
}
|
||||
if sf.jsonOut {
|
||||
printTrendJSON(r)
|
||||
} else {
|
||||
printTrendTable(r)
|
||||
}
|
||||
printTrendTable(r)
|
||||
if len(sf.targets) > 1 {
|
||||
fmt.Println()
|
||||
}
|
||||
@@ -90,7 +90,7 @@ func printTrendTable(r trendResult) {
|
||||
printTable(os.Stdout, rows)
|
||||
}
|
||||
|
||||
func printTrendJSON(r trendResult) {
|
||||
func printTrendJSONArray(results []trendResult) {
|
||||
type point struct {
|
||||
Ts int64 `json:"ts"`
|
||||
Count int64 `json:"count"`
|
||||
@@ -100,14 +100,22 @@ func printTrendJSON(r trendResult) {
|
||||
Target string `json:"target"`
|
||||
Points []point `json:"points"`
|
||||
}
|
||||
o := out{
|
||||
Source: r.resp.Source,
|
||||
Target: r.target,
|
||||
Points: make([]point, len(r.resp.Points)),
|
||||
rows := make([]out, 0, len(results))
|
||||
for _, r := range results {
|
||||
if r.err != nil {
|
||||
fmt.Fprintf(os.Stderr, "error from %s: %v\n", r.target, r.err)
|
||||
continue
|
||||
}
|
||||
o := out{
|
||||
Source: r.resp.Source,
|
||||
Target: r.target,
|
||||
Points: make([]point, len(r.resp.Points)),
|
||||
}
|
||||
for i, p := range r.resp.Points {
|
||||
o.Points[i] = point{Ts: p.TimestampUnix, Count: p.Count}
|
||||
}
|
||||
rows = append(rows, o)
|
||||
}
|
||||
for i, p := range r.resp.Points {
|
||||
o.Points[i] = point{Ts: p.TimestampUnix, Count: p.Count}
|
||||
}
|
||||
b, _ := json.Marshal(o)
|
||||
b, _ := json.Marshal(rows)
|
||||
fmt.Println(string(b))
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
@@ -62,15 +63,23 @@ func (srv *Server) ListTargets(_ context.Context, _ *pb.ListTargetsRequest) (*pb
|
||||
}, nil
|
||||
}
|
||||
|
||||
func peerAddr(ctx context.Context) string {
|
||||
if p, ok := peer.FromContext(ctx); ok {
|
||||
return p.Addr.String()
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
func (srv *Server) StreamSnapshots(req *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error {
|
||||
ch := srv.store.Subscribe()
|
||||
defer srv.store.Unsubscribe(ch)
|
||||
|
||||
log.Printf("server: new StreamSnapshots subscriber from %v", stream.Context().Value("peer"))
|
||||
addr := peerAddr(stream.Context())
|
||||
log.Printf("server: new StreamSnapshots subscriber from %s", addr)
|
||||
for {
|
||||
select {
|
||||
case <-stream.Context().Done():
|
||||
log.Printf("server: StreamSnapshots subscriber disconnected")
|
||||
log.Printf("server: StreamSnapshots subscriber disconnected: %s", addr)
|
||||
return nil
|
||||
case snap, ok := <-ch:
|
||||
if !ok {
|
||||
|
||||
@@ -394,9 +394,9 @@ RANK COUNT LABEL
|
||||
1 18 432 example.com
|
||||
```
|
||||
|
||||
**JSON** (`--json`) — one object per target, suitable for `jq`:
|
||||
**JSON** (`--json`) — a single JSON array with one object per target, suitable for `jq`:
|
||||
```json
|
||||
{"source":"agg-prod","target":"agg:9091","entries":[{"label":"example.com","count":18432},...]}
|
||||
[{"source":"agg-prod","target":"agg:9091","entries":[{"label":"example.com","count":18432},...]}]
|
||||
```
|
||||
|
||||
**`stream` JSON** — one object per snapshot received (NDJSON), runs until interrupted:
|
||||
@@ -438,7 +438,7 @@ logtail-cli topn --target agg:9091 --window 1m --group-by prefix --status 429 --
|
||||
|
||||
# Same query, pipe to jq for scripting
|
||||
logtail-cli topn --target agg:9091 --window 1m --group-by prefix --status 429 --n 20 \
|
||||
--json | jq '.entries[0]'
|
||||
--json | jq '.[0].entries[0]'
|
||||
|
||||
# Which website has the most errors (4xx or 5xx) over the last 24h?
|
||||
logtail-cli topn --target agg:9091 --window 24h --group-by website --status '>=400'
|
||||
@@ -462,7 +462,7 @@ logtail-cli topn --target nginx1:9090,nginx2:9090 --window 5m
|
||||
logtail-cli topn --target nginx3:9090,agg:9091 --window 5m --group-by prefix
|
||||
|
||||
# Trend of total traffic over 6h (for a quick sparkline in the terminal)
|
||||
logtail-cli trend --target agg:9091 --window 6h --json | jq '[.points[] | .count]'
|
||||
logtail-cli trend --target agg:9091 --window 6h --json | jq '.[0].points | [.[].count]'
|
||||
|
||||
# Watch live merged snapshots from the aggregator
|
||||
logtail-cli stream --target agg:9091
|
||||
|
||||
Reference in New Issue
Block a user