Compare commits
2 Commits
7f93466645
...
d3160c7dd4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d3160c7dd4 | ||
|
|
1c7637fbc3 |
@@ -364,7 +364,7 @@ pipe-to-`jq` use.
|
|||||||
### Output
|
### Output
|
||||||
|
|
||||||
Default: human-readable table with space-separated thousands (`18 432`).
|
Default: human-readable table with space-separated thousands (`18 432`).
|
||||||
`--json`: one JSON object per target (NDJSON for `stream`).
|
`--json`: a single JSON array (one object per target) for `topn` and `trend`; NDJSON for `stream` (unbounded).
|
||||||
|
|
||||||
`stream` reconnects automatically on error (5 s backoff). All other subcommands exit immediately
|
`stream` reconnects automatically on error (5 s backoff). All other subcommands exit immediately
|
||||||
with a non-zero code on gRPC error.
|
with a non-zero code on gRPC error.
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
|
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/peer"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -64,15 +65,23 @@ func (srv *Server) ListTargets(_ context.Context, _ *pb.ListTargetsRequest) (*pb
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func peerAddr(ctx context.Context) string {
|
||||||
|
if p, ok := peer.FromContext(ctx); ok {
|
||||||
|
return p.Addr.String()
|
||||||
|
}
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
|
||||||
func (srv *Server) StreamSnapshots(_ *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error {
|
func (srv *Server) StreamSnapshots(_ *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error {
|
||||||
ch := srv.cache.Subscribe()
|
ch := srv.cache.Subscribe()
|
||||||
defer srv.cache.Unsubscribe(ch)
|
defer srv.cache.Unsubscribe(ch)
|
||||||
|
|
||||||
log.Printf("server: new StreamSnapshots subscriber")
|
addr := peerAddr(stream.Context())
|
||||||
|
log.Printf("server: new StreamSnapshots subscriber from %s", addr)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stream.Context().Done():
|
case <-stream.Context().Done():
|
||||||
log.Printf("server: StreamSnapshots subscriber disconnected")
|
log.Printf("server: StreamSnapshots subscriber disconnected: %s", addr)
|
||||||
return nil
|
return nil
|
||||||
case snap, ok := <-ch:
|
case snap, ok := <-ch:
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|||||||
@@ -33,6 +33,10 @@ func runTopN(args []string) {
|
|||||||
|
|
||||||
results := fanOutTopN(sf.targets, filter, grp, *n, win)
|
results := fanOutTopN(sf.targets, filter, grp, *n, win)
|
||||||
|
|
||||||
|
if sf.jsonOut {
|
||||||
|
printTopNJSONArray(results)
|
||||||
|
return
|
||||||
|
}
|
||||||
for _, r := range results {
|
for _, r := range results {
|
||||||
if hdr := targetHeader(r.target, r.resp.GetSource(), len(sf.targets)); hdr != "" {
|
if hdr := targetHeader(r.target, r.resp.GetSource(), len(sf.targets)); hdr != "" {
|
||||||
fmt.Println(hdr)
|
fmt.Println(hdr)
|
||||||
@@ -41,11 +45,7 @@ func runTopN(args []string) {
|
|||||||
fmt.Fprintf(os.Stderr, "error from %s: %v\n", r.target, r.err)
|
fmt.Fprintf(os.Stderr, "error from %s: %v\n", r.target, r.err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if sf.jsonOut {
|
|
||||||
printTopNJSON(r)
|
|
||||||
} else {
|
|
||||||
printTopNTable(r)
|
printTopNTable(r)
|
||||||
}
|
|
||||||
if len(sf.targets) > 1 {
|
if len(sf.targets) > 1 {
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
}
|
}
|
||||||
@@ -99,7 +99,7 @@ func printTopNTable(r topNResult) {
|
|||||||
printTable(os.Stdout, rows)
|
printTable(os.Stdout, rows)
|
||||||
}
|
}
|
||||||
|
|
||||||
func printTopNJSON(r topNResult) {
|
func printTopNJSONArray(results []topNResult) {
|
||||||
type entry struct {
|
type entry struct {
|
||||||
Label string `json:"label"`
|
Label string `json:"label"`
|
||||||
Count int64 `json:"count"`
|
Count int64 `json:"count"`
|
||||||
@@ -109,6 +109,12 @@ func printTopNJSON(r topNResult) {
|
|||||||
Target string `json:"target"`
|
Target string `json:"target"`
|
||||||
Entries []entry `json:"entries"`
|
Entries []entry `json:"entries"`
|
||||||
}
|
}
|
||||||
|
rows := make([]out, 0, len(results))
|
||||||
|
for _, r := range results {
|
||||||
|
if r.err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "error from %s: %v\n", r.target, r.err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
o := out{
|
o := out{
|
||||||
Source: r.resp.Source,
|
Source: r.resp.Source,
|
||||||
Target: r.target,
|
Target: r.target,
|
||||||
@@ -117,6 +123,8 @@ func printTopNJSON(r topNResult) {
|
|||||||
for i, e := range r.resp.Entries {
|
for i, e := range r.resp.Entries {
|
||||||
o.Entries[i] = entry{Label: e.Label, Count: e.Count}
|
o.Entries[i] = entry{Label: e.Label, Count: e.Count}
|
||||||
}
|
}
|
||||||
b, _ := json.Marshal(o)
|
rows = append(rows, o)
|
||||||
|
}
|
||||||
|
b, _ := json.Marshal(rows)
|
||||||
fmt.Println(string(b))
|
fmt.Println(string(b))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,6 +30,10 @@ func runTrend(args []string) {
|
|||||||
|
|
||||||
results := fanOutTrend(sf.targets, filter, win)
|
results := fanOutTrend(sf.targets, filter, win)
|
||||||
|
|
||||||
|
if sf.jsonOut {
|
||||||
|
printTrendJSONArray(results)
|
||||||
|
return
|
||||||
|
}
|
||||||
for _, r := range results {
|
for _, r := range results {
|
||||||
if hdr := targetHeader(r.target, r.resp.GetSource(), len(sf.targets)); hdr != "" {
|
if hdr := targetHeader(r.target, r.resp.GetSource(), len(sf.targets)); hdr != "" {
|
||||||
fmt.Println(hdr)
|
fmt.Println(hdr)
|
||||||
@@ -38,11 +42,7 @@ func runTrend(args []string) {
|
|||||||
fmt.Fprintf(os.Stderr, "error from %s: %v\n", r.target, r.err)
|
fmt.Fprintf(os.Stderr, "error from %s: %v\n", r.target, r.err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if sf.jsonOut {
|
|
||||||
printTrendJSON(r)
|
|
||||||
} else {
|
|
||||||
printTrendTable(r)
|
printTrendTable(r)
|
||||||
}
|
|
||||||
if len(sf.targets) > 1 {
|
if len(sf.targets) > 1 {
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
}
|
}
|
||||||
@@ -90,7 +90,7 @@ func printTrendTable(r trendResult) {
|
|||||||
printTable(os.Stdout, rows)
|
printTable(os.Stdout, rows)
|
||||||
}
|
}
|
||||||
|
|
||||||
func printTrendJSON(r trendResult) {
|
func printTrendJSONArray(results []trendResult) {
|
||||||
type point struct {
|
type point struct {
|
||||||
Ts int64 `json:"ts"`
|
Ts int64 `json:"ts"`
|
||||||
Count int64 `json:"count"`
|
Count int64 `json:"count"`
|
||||||
@@ -100,6 +100,12 @@ func printTrendJSON(r trendResult) {
|
|||||||
Target string `json:"target"`
|
Target string `json:"target"`
|
||||||
Points []point `json:"points"`
|
Points []point `json:"points"`
|
||||||
}
|
}
|
||||||
|
rows := make([]out, 0, len(results))
|
||||||
|
for _, r := range results {
|
||||||
|
if r.err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "error from %s: %v\n", r.target, r.err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
o := out{
|
o := out{
|
||||||
Source: r.resp.Source,
|
Source: r.resp.Source,
|
||||||
Target: r.target,
|
Target: r.target,
|
||||||
@@ -108,6 +114,8 @@ func printTrendJSON(r trendResult) {
|
|||||||
for i, p := range r.resp.Points {
|
for i, p := range r.resp.Points {
|
||||||
o.Points[i] = point{Ts: p.TimestampUnix, Count: p.Count}
|
o.Points[i] = point{Ts: p.TimestampUnix, Count: p.Count}
|
||||||
}
|
}
|
||||||
b, _ := json.Marshal(o)
|
rows = append(rows, o)
|
||||||
|
}
|
||||||
|
b, _ := json.Marshal(rows)
|
||||||
fmt.Println(string(b))
|
fmt.Println(string(b))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
|
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/peer"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -62,15 +63,23 @@ func (srv *Server) ListTargets(_ context.Context, _ *pb.ListTargetsRequest) (*pb
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func peerAddr(ctx context.Context) string {
|
||||||
|
if p, ok := peer.FromContext(ctx); ok {
|
||||||
|
return p.Addr.String()
|
||||||
|
}
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
|
||||||
func (srv *Server) StreamSnapshots(req *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error {
|
func (srv *Server) StreamSnapshots(req *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error {
|
||||||
ch := srv.store.Subscribe()
|
ch := srv.store.Subscribe()
|
||||||
defer srv.store.Unsubscribe(ch)
|
defer srv.store.Unsubscribe(ch)
|
||||||
|
|
||||||
log.Printf("server: new StreamSnapshots subscriber from %v", stream.Context().Value("peer"))
|
addr := peerAddr(stream.Context())
|
||||||
|
log.Printf("server: new StreamSnapshots subscriber from %s", addr)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stream.Context().Done():
|
case <-stream.Context().Done():
|
||||||
log.Printf("server: StreamSnapshots subscriber disconnected")
|
log.Printf("server: StreamSnapshots subscriber disconnected: %s", addr)
|
||||||
return nil
|
return nil
|
||||||
case snap, ok := <-ch:
|
case snap, ok := <-ch:
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|||||||
@@ -394,9 +394,9 @@ RANK COUNT LABEL
|
|||||||
1 18 432 example.com
|
1 18 432 example.com
|
||||||
```
|
```
|
||||||
|
|
||||||
**JSON** (`--json`) — one object per target, suitable for `jq`:
|
**JSON** (`--json`) — a single JSON array with one object per target, suitable for `jq`:
|
||||||
```json
|
```json
|
||||||
{"source":"agg-prod","target":"agg:9091","entries":[{"label":"example.com","count":18432},...]}
|
[{"source":"agg-prod","target":"agg:9091","entries":[{"label":"example.com","count":18432},...]}]
|
||||||
```
|
```
|
||||||
|
|
||||||
**`stream` JSON** — one object per snapshot received (NDJSON), runs until interrupted:
|
**`stream` JSON** — one object per snapshot received (NDJSON), runs until interrupted:
|
||||||
@@ -438,7 +438,7 @@ logtail-cli topn --target agg:9091 --window 1m --group-by prefix --status 429 --
|
|||||||
|
|
||||||
# Same query, pipe to jq for scripting
|
# Same query, pipe to jq for scripting
|
||||||
logtail-cli topn --target agg:9091 --window 1m --group-by prefix --status 429 --n 20 \
|
logtail-cli topn --target agg:9091 --window 1m --group-by prefix --status 429 --n 20 \
|
||||||
--json | jq '.entries[0]'
|
--json | jq '.[0].entries[0]'
|
||||||
|
|
||||||
# Which website has the most errors (4xx or 5xx) over the last 24h?
|
# Which website has the most errors (4xx or 5xx) over the last 24h?
|
||||||
logtail-cli topn --target agg:9091 --window 24h --group-by website --status '>=400'
|
logtail-cli topn --target agg:9091 --window 24h --group-by website --status '>=400'
|
||||||
@@ -462,7 +462,7 @@ logtail-cli topn --target nginx1:9090,nginx2:9090 --window 5m
|
|||||||
logtail-cli topn --target nginx3:9090,agg:9091 --window 5m --group-by prefix
|
logtail-cli topn --target nginx3:9090,agg:9091 --window 5m --group-by prefix
|
||||||
|
|
||||||
# Trend of total traffic over 6h (for a quick sparkline in the terminal)
|
# Trend of total traffic over 6h (for a quick sparkline in the terminal)
|
||||||
logtail-cli trend --target agg:9091 --window 6h --json | jq '[.points[] | .count]'
|
logtail-cli trend --target agg:9091 --window 6h --json | jq '.[0].points | [.[].count]'
|
||||||
|
|
||||||
# Watch live merged snapshots from the aggregator
|
# Watch live merged snapshots from the aggregator
|
||||||
logtail-cli stream --target agg:9091
|
logtail-cli stream --target agg:9091
|
||||||
|
|||||||
Reference in New Issue
Block a user