diff --git a/README.md b/README.md index 957a626..90c4f79 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,10 @@ DESIGN ``` nginx-logtail/ ├── proto/ -│ └── logtail.proto # shared protobuf definitions +│ ├── logtail.proto # shared protobuf definitions +│ └── logtailpb/ +│ ├── logtail.pb.go # generated: messages, enums +│ └── logtail_grpc.pb.go # generated: service stubs ├── internal/ │ └── store/ │ └── store.go # shared types: Tuple4, Entry, Snapshot, ring helpers @@ -48,6 +51,7 @@ nginx-logtail/ │ ├── subscriber.go # one goroutine per collector; StreamSnapshots with backoff │ ├── merger.go # delta-merge: O(snapshot_size) per update │ ├── cache.go # tick-based ring buffer cache served to clients + │ ├── registry.go # TargetRegistry: addr→name map updated from snapshot sources │ └── server.go # gRPC server (same surface as collector) ├── frontend/ │ ├── main.go @@ -66,7 +70,8 @@ nginx-logtail/ ├── format.go # printTable, fmtCount, fmtTime, targetHeader ├── cmd_topn.go # topn: concurrent fan-out, table + JSON output ├── cmd_trend.go # trend: concurrent fan-out, table + JSON output - └── cmd_stream.go # stream: multiplexed streams, auto-reconnect + ├── cmd_stream.go # stream: multiplexed streams, auto-reconnect + └── cmd_targets.go # targets: list collectors known to the endpoint ``` ## Data Model @@ -178,13 +183,23 @@ message Snapshot { repeated TopNEntry entries = 3; // full top-50K for this bucket } +// Target discovery: list the collectors behind the queried endpoint +message ListTargetsRequest {} +message TargetInfo { + string name = 1; // display name (--source value from the collector) + string addr = 2; // gRPC address; empty string means "this endpoint itself" +} +message ListTargetsResponse { repeated TargetInfo targets = 1; } + service LogtailService { rpc TopN(TopNRequest) returns (TopNResponse); rpc Trend(TrendRequest) returns (TrendResponse); rpc StreamSnapshots(SnapshotRequest) returns (stream Snapshot); + rpc ListTargets(ListTargetsRequest) returns (ListTargetsResponse); } // Both collector and aggregator implement LogtailService. // The aggregator's StreamSnapshots re-streams the merged view. +// ListTargets: aggregator returns all configured collectors; collector returns itself. ``` ## Program 1 — Collector @@ -259,10 +274,18 @@ service LogtailService { - Same tiered ring structure as the collector store; populated from `merger.TopK()` each tick. - `QueryTopN`, `QueryTrend`, `Subscribe`/`Unsubscribe` — identical interface to collector store. +### registry.go +- **`TargetRegistry`**: `sync.RWMutex`-protected `map[addr → name]`. Initialised with the + configured collector addresses; display names are updated from the `source` field of the first + snapshot received from each collector. +- `Targets()` returns a stable sorted slice of `{name, addr}` pairs for `ListTargets` responses. + ### server.go - Implements `LogtailService` backed by the cache (not live fan-out). - `StreamSnapshots` re-streams merged fine snapshots; usable by a second-tier aggregator or monitoring system. +- `ListTargets` returns the current `TargetRegistry` contents — all configured collectors with + their display names and gRPC addresses. ## Program 3 — Frontend @@ -278,8 +301,13 @@ service LogtailService { `store.ParseStatusExpr` into `(value, StatusOp)` for the filter protobuf. - **Regex filters**: `f_website_re` and `f_uri_re` hold RE2 patterns; compiled once per request into `store.CompiledFilter` before the query-loop iteration. Invalid regexes match nothing. -- `TopN` and `Trend` RPCs issued **concurrently** (both with a 5 s deadline); page renders with - whatever completes. Trend failure suppresses the sparkline without erroring the page. +- `TopN`, `Trend`, and `ListTargets` RPCs issued **concurrently** (all with a 5 s deadline); page + renders with whatever completes. Trend failure suppresses the sparkline; `ListTargets` failure + hides the source picker — both are non-fatal. +- **Source picker**: `ListTargets` result drives a `source:` tab row. Clicking a collector tab + sets `target=` to that collector's address, querying it directly. The "all" tab resets to the + default aggregator. Picker is hidden when `ListTargets` returns ≤0 collectors (direct collector + mode). - **Drilldown**: clicking a table row adds the current dimension's filter and advances `by` through `website → prefix → uri → status → website` (cycles). - **`raw=1`**: returns the TopN result as JSON — same URL, no CLI needed for scripting. @@ -305,6 +333,7 @@ service LogtailService { logtail-cli topn [flags] ranked label → count table (exits after one response) logtail-cli trend [flags] per-bucket time series (exits after one response) logtail-cli stream [flags] live snapshot feed (runs until Ctrl-C, auto-reconnects) +logtail-cli targets [flags] list targets known to the queried endpoint ``` ### Flags @@ -364,3 +393,4 @@ with a non-zero code on gRPC error. | Status filter as expression string (`!=200`, `>=400`) | Operator-friendly; parsed once at query boundary, encoded as `(int32, StatusOp)` in proto | | Regex filters compiled once per query (`CompiledFilter`) | Up to 288 × 5 000 per-entry calls — compiling per-entry would dominate query latency | | Filter expression box (`q=`) redirects to canonical URL | Filter state stays in individual `f_*` params; URLs remain shareable and bookmarkable | +| `ListTargets` + frontend source picker (no Tuple5) | "Which nginx is busiest?" answered by switching `target=` to a collector; no data model changes, no extra memory | diff --git a/cmd/aggregator/aggregator_test.go b/cmd/aggregator/aggregator_test.go index c610299..f6d84cc 100644 --- a/cmd/aggregator/aggregator_test.go +++ b/cmd/aggregator/aggregator_test.go @@ -287,8 +287,8 @@ func TestGRPCEndToEnd(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go NewCollectorSub(addr1, merger).Run(ctx) - go NewCollectorSub(addr2, merger).Run(ctx) + go NewCollectorSub(addr1, merger, NewTargetRegistry(nil)).Run(ctx) + go NewCollectorSub(addr2, merger, NewTargetRegistry(nil)).Run(ctx) // Wait for both snapshots to be applied. deadline := time.Now().Add(3 * time.Second) @@ -309,7 +309,7 @@ func TestGRPCEndToEnd(t *testing.T) { t.Fatal(err) } grpcSrv := grpc.NewServer() - pb.RegisterLogtailServiceServer(grpcSrv, NewServer(cache, "agg-test")) + pb.RegisterLogtailServiceServer(grpcSrv, NewServer(cache, "agg-test", NewTargetRegistry(nil))) go grpcSrv.Serve(lis) defer grpcSrv.Stop() @@ -399,8 +399,8 @@ func TestDegradedCollector(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go NewCollectorSub(addr1, merger).Run(ctx) - go NewCollectorSub(addr2, merger).Run(ctx) + go NewCollectorSub(addr1, merger, NewTargetRegistry(nil)).Run(ctx) + go NewCollectorSub(addr2, merger, NewTargetRegistry(nil)).Run(ctx) // Wait for col1's data to appear. deadline := time.Now().Add(3 * time.Second) diff --git a/cmd/aggregator/main.go b/cmd/aggregator/main.go index 0b6b896..c581e59 100644 --- a/cmd/aggregator/main.go +++ b/cmd/aggregator/main.go @@ -27,16 +27,21 @@ func main() { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() - merger := NewMerger() - cache := NewCache(merger, *source) - go cache.Run(ctx) - + var collectorAddrs []string for _, addr := range strings.Split(*collectors, ",") { addr = strings.TrimSpace(addr) - if addr == "" { - continue + if addr != "" { + collectorAddrs = append(collectorAddrs, addr) } - sub := NewCollectorSub(addr, merger) + } + + merger := NewMerger() + cache := NewCache(merger, *source) + registry := NewTargetRegistry(collectorAddrs) + go cache.Run(ctx) + + for _, addr := range collectorAddrs { + sub := NewCollectorSub(addr, merger, registry) go sub.Run(ctx) log.Printf("aggregator: subscribing to collector %s", addr) } @@ -46,7 +51,7 @@ func main() { log.Fatalf("aggregator: failed to listen on %s: %v", *listen, err) } grpcServer := grpc.NewServer() - pb.RegisterLogtailServiceServer(grpcServer, NewServer(cache, *source)) + pb.RegisterLogtailServiceServer(grpcServer, NewServer(cache, *source, registry)) go func() { log.Printf("aggregator: gRPC listening on %s (source=%s)", *listen, *source) diff --git a/cmd/aggregator/registry.go b/cmd/aggregator/registry.go new file mode 100644 index 0000000..82d814d --- /dev/null +++ b/cmd/aggregator/registry.go @@ -0,0 +1,47 @@ +package main + +import ( + "sort" + "sync" +) + +// TargetInfo holds the display name and gRPC address of one collector target. +type TargetInfo struct { + Name string // collector --source value, falls back to addr until first snapshot + Addr string // configured gRPC address +} + +// TargetRegistry tracks addr → display name for all configured collectors. +// Names default to the addr and are updated to the collector's --source value +// when the first snapshot arrives. +type TargetRegistry struct { + mu sync.RWMutex + names map[string]string // addr → current name +} + +func NewTargetRegistry(addrs []string) *TargetRegistry { + names := make(map[string]string, len(addrs)) + for _, a := range addrs { + names[a] = a // default until first snapshot + } + return &TargetRegistry{names: names} +} + +// SetName updates the display name for addr (called when a snapshot arrives). +func (r *TargetRegistry) SetName(addr, name string) { + r.mu.Lock() + r.names[addr] = name + r.mu.Unlock() +} + +// Targets returns all registered targets sorted by addr. +func (r *TargetRegistry) Targets() []TargetInfo { + r.mu.RLock() + defer r.mu.RUnlock() + out := make([]TargetInfo, 0, len(r.names)) + for addr, name := range r.names { + out = append(out, TargetInfo{Name: name, Addr: addr}) + } + sort.Slice(out, func(i, j int) bool { return out[i].Addr < out[j].Addr }) + return out +} diff --git a/cmd/aggregator/server.go b/cmd/aggregator/server.go index c0f846b..29c6659 100644 --- a/cmd/aggregator/server.go +++ b/cmd/aggregator/server.go @@ -14,12 +14,13 @@ import ( // Server implements pb.LogtailServiceServer backed by the aggregator Cache. type Server struct { pb.UnimplementedLogtailServiceServer - cache *Cache - source string + cache *Cache + source string + registry *TargetRegistry } -func NewServer(cache *Cache, source string) *Server { - return &Server{cache: cache, source: source} +func NewServer(cache *Cache, source string, registry *TargetRegistry) *Server { + return &Server{cache: cache, source: source, registry: registry} } func (srv *Server) TopN(_ context.Context, req *pb.TopNRequest) (*pb.TopNResponse, error) { @@ -53,6 +54,16 @@ func (srv *Server) Trend(_ context.Context, req *pb.TrendRequest) (*pb.TrendResp return resp, nil } +func (srv *Server) ListTargets(_ context.Context, _ *pb.ListTargetsRequest) (*pb.ListTargetsResponse, error) { + resp := &pb.ListTargetsResponse{} + if srv.registry != nil { + for _, t := range srv.registry.Targets() { + resp.Targets = append(resp.Targets, &pb.TargetInfo{Name: t.Name, Addr: t.Addr}) + } + } + return resp, nil +} + func (srv *Server) StreamSnapshots(_ *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error { ch := srv.cache.Subscribe() defer srv.cache.Unsubscribe(ch) diff --git a/cmd/aggregator/subscriber.go b/cmd/aggregator/subscriber.go index 077728c..7e0c497 100644 --- a/cmd/aggregator/subscriber.go +++ b/cmd/aggregator/subscriber.go @@ -15,12 +15,13 @@ import ( // the collector degraded (zeroing its contribution) after 3 consecutive // failures. type CollectorSub struct { - addr string - merger *Merger + addr string + merger *Merger + registry *TargetRegistry } -func NewCollectorSub(addr string, merger *Merger) *CollectorSub { - return &CollectorSub{addr: addr, merger: merger} +func NewCollectorSub(addr string, merger *Merger, registry *TargetRegistry) *CollectorSub { + return &CollectorSub{addr: addr, merger: merger, registry: registry} } // Run blocks until ctx is cancelled. @@ -92,6 +93,7 @@ func (cs *CollectorSub) stream(ctx context.Context) (bool, error) { return gotOne, err } gotOne = true + cs.registry.SetName(cs.addr, snap.Source) cs.merger.Apply(snap) } } diff --git a/cmd/cli/cmd_targets.go b/cmd/cli/cmd_targets.go new file mode 100644 index 0000000..a1eab4e --- /dev/null +++ b/cmd/cli/cmd_targets.go @@ -0,0 +1,61 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "os" + "time" + + pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" +) + +func runTargets(args []string) { + fs := flag.NewFlagSet("targets", flag.ExitOnError) + sf, target := bindShared(fs) + fs.Usage = func() { + fmt.Fprintln(os.Stderr, "usage: logtail-cli targets [--target host:port] [--json]") + fs.PrintDefaults() + } + fs.Parse(args) + sf.resolve(*target) + + for _, addr := range sf.targets { + conn, client, err := dial(addr) + if err != nil { + fmt.Fprintf(os.Stderr, "targets: cannot connect to %s: %v\n", addr, err) + continue + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + resp, err := client.ListTargets(ctx, &pb.ListTargetsRequest{}) + cancel() + conn.Close() + if err != nil { + fmt.Fprintf(os.Stderr, "targets: %s: %v\n", addr, err) + continue + } + + if sf.jsonOut { + type row struct { + QueryTarget string `json:"query_target"` + Name string `json:"name"` + Addr string `json:"addr"` + } + for _, t := range resp.Targets { + json.NewEncoder(os.Stdout).Encode(row{QueryTarget: addr, Name: t.Name, Addr: t.Addr}) + } + } else { + if len(sf.targets) > 1 { + fmt.Println(targetHeader(addr, "", len(sf.targets))) + } + for _, t := range resp.Targets { + addrCol := t.Addr + if addrCol == "" { + addrCol = "(self)" + } + fmt.Printf("%-40s %s\n", t.Name, addrCol) + } + } + } +} diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 978c84c..3e08121 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -11,6 +11,7 @@ Usage: logtail-cli topn [flags] ranked label → count list logtail-cli trend [flags] per-minute time series logtail-cli stream [flags] live snapshot feed + logtail-cli targets [flags] list targets known to the queried endpoint Subcommand flags (all subcommands): --target host:port[,host:port,...] endpoints to query (default: localhost:9090) @@ -43,6 +44,8 @@ func main() { runTrend(os.Args[2:]) case "stream": runStream(os.Args[2:]) + case "targets": + runTargets(os.Args[2:]) case "-h", "--help", "help": fmt.Print(usage) default: diff --git a/cmd/collector/server.go b/cmd/collector/server.go index a5dd813..45d6e6f 100644 --- a/cmd/collector/server.go +++ b/cmd/collector/server.go @@ -56,6 +56,12 @@ func (srv *Server) Trend(_ context.Context, req *pb.TrendRequest) (*pb.TrendResp return resp, nil } +func (srv *Server) ListTargets(_ context.Context, _ *pb.ListTargetsRequest) (*pb.ListTargetsResponse, error) { + return &pb.ListTargetsResponse{ + Targets: []*pb.TargetInfo{{Name: srv.source, Addr: ""}}, + }, nil +} + func (srv *Server) StreamSnapshots(req *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error { ch := srv.store.Subscribe() defer srv.store.Unsubscribe(ch) diff --git a/cmd/frontend/handler.go b/cmd/frontend/handler.go index bb8e6fe..9a41d69 100644 --- a/cmd/frontend/handler.go +++ b/cmd/frontend/handler.go @@ -76,6 +76,7 @@ type PageData struct { Breadcrumbs []Crumb Windows []Tab GroupBys []Tab + Targets []Tab // source/target picker; empty when only one target available RefreshSecs int Error string FilterExpr string // current filter serialised to mini-language for the input box @@ -340,6 +341,38 @@ func buildGroupByTabs(p QueryParams) []Tab { return tabs } +// buildTargetTabs builds the source/target picker tabs from a ListTargets response. +// Returns nil (hide picker) when only one endpoint is reachable. +func (h *Handler) buildTargetTabs(p QueryParams, lt *pb.ListTargetsResponse) []Tab { + // "all" always points at the configured aggregator default. + allTab := Tab{ + Label: "all", + URL: p.buildURL(map[string]string{"target": h.defaultTarget}), + Active: p.Target == h.defaultTarget, + } + + var collectorTabs []Tab + if lt != nil { + for _, t := range lt.Targets { + addr := t.Addr + if addr == "" { + addr = p.Target // collector reporting itself; addr is the current target + } + collectorTabs = append(collectorTabs, Tab{ + Label: t.Name, + URL: p.buildURL(map[string]string{"target": addr}), + Active: p.Target == addr, + }) + } + } + + // Only render the picker when there is more than one choice. + if len(collectorTabs) == 0 { + return nil + } + return append([]Tab{allTab}, collectorTabs...) +} + func buildTableRows(entries []*pb.TopNEntry, p QueryParams) ([]TableRow, int64) { if len(entries) == 0 { return nil, 0 @@ -410,6 +443,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } topNCh := make(chan topNResult, 1) trendCh := make(chan trendResult, 1) + ltCh := make(chan *pb.ListTargetsResponse, 1) go func() { resp, err := client.TopN(ctx, &pb.TopNRequest{ @@ -427,9 +461,18 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { }) trendCh <- trendResult{resp, err} }() + go func() { + resp, err := client.ListTargets(ctx, &pb.ListTargetsRequest{}) + if err != nil { + ltCh <- nil + } else { + ltCh <- resp + } + }() tn := <-topNCh tr := <-trendCh + lt := <-ltCh if tn.err != nil { h.render(w, http.StatusBadGateway, h.errorPage(params, @@ -459,6 +502,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { Breadcrumbs: buildCrumbs(params), Windows: buildWindowTabs(params), GroupBys: buildGroupByTabs(params), + Targets: h.buildTargetTabs(params, lt), RefreshSecs: h.refreshSecs, FilterExpr: filterExprInput, FilterErr: filterErr, diff --git a/cmd/frontend/templates/base.html b/cmd/frontend/templates/base.html index 614bb24..a7ecfd7 100644 --- a/cmd/frontend/templates/base.html +++ b/cmd/frontend/templates/base.html @@ -34,6 +34,8 @@ a:hover { text-decoration: underline; } .error { color: #c00; border: 1px solid #fbb; background: #fff5f5; padding: 0.7em 1em; margin: 1em 0; border-radius: 3px; } .nodata { color: #999; margin: 2em 0; font-style: italic; } footer { margin-top: 2em; padding-top: 0.6em; border-top: 1px solid #e0e0e0; font-size: 0.8em; color: #999; } +.tabs-targets { margin-top: -0.4em; } +.tabs-label { font-size: 0.85em; color: #888; margin-right: 0.2em; align-self: center; } .filter-form { display: flex; gap: 0.4em; align-items: center; margin-bottom: 0.7em; } .filter-input { flex: 1; font-family: monospace; font-size: 13px; padding: 0.25em 0.5em; border: 1px solid #aaa; } .filter-form button { padding: 0.25em 0.8em; border: 1px solid #aaa; background: #f4f4f4; cursor: pointer; font-family: monospace; } diff --git a/cmd/frontend/templates/index.html b/cmd/frontend/templates/index.html index ec223df..b0b20b3 100644 --- a/cmd/frontend/templates/index.html +++ b/cmd/frontend/templates/index.html @@ -13,6 +13,13 @@ {{- end}} +{{if .Targets}}
+ source: +{{- range .Targets}} + {{.Label}} +{{- end}} +
{{end}} +
diff --git a/docs/USERGUIDE.md b/docs/USERGUIDE.md index a94b053..398d1af 100644 --- a/docs/USERGUIDE.md +++ b/docs/USERGUIDE.md @@ -324,6 +324,12 @@ endpoint for that request (useful for comparing a single collector against the a http://frontend:8080/?target=nginx3:9090&w=5m ``` +**Source picker** — when the frontend is pointed at an aggregator, a `source:` tab row appears +below the dimension tabs listing each individual collector alongside an **all** tab (the default +merged view). Clicking a collector tab switches the frontend to query that collector directly for +the current request, letting you answer "which nginx machine is the busiest?" without leaving the +dashboard. The picker is hidden when querying a collector directly (it has no sub-sources to list). + --- ## CLI @@ -338,6 +344,7 @@ Default output is a human-readable table; add `--json` for machine-readable NDJS logtail-cli topn [flags] ranked label → count table logtail-cli trend [flags] per-bucket time series logtail-cli stream [flags] live snapshot feed (runs until Ctrl-C) +logtail-cli targets [flags] list targets known to the queried endpoint ``` ### Shared flags (all subcommands) @@ -397,6 +404,32 @@ RANK COUNT LABEL {"ts":1773516180,"source":"col-1","target":"nginx1:9090","total_entries":823,"top_label":"example.com","top_count":10000} ``` +### `targets` subcommand + +Lists the targets (collectors) known to the queried endpoint. When querying an aggregator, returns +all configured collectors with their display names and addresses. When querying a collector, +returns the collector itself (address shown as `(self)`). + +```bash +# List collectors behind the aggregator +logtail-cli targets --target agg:9091 + +# Machine-readable output +logtail-cli targets --target agg:9091 --json +``` + +Table output example: +``` +nginx1.prod nginx1:9090 +nginx2.prod nginx2:9090 +nginx3.prod (self) +``` + +JSON output (`--json`) — one object per target: +```json +{"query_target":"agg:9091","name":"nginx1.prod","addr":"nginx1:9090"} +``` + ### Examples ```bash diff --git a/proto/logtail.proto b/proto/logtail.proto index 93519f7..f39aecb 100644 --- a/proto/logtail.proto +++ b/proto/logtail.proto @@ -89,8 +89,23 @@ message Snapshot { repeated TopNEntry entries = 3; // top-50K for this 1-minute bucket, sorted desc } -service LogtailService { - rpc TopN (TopNRequest) returns (TopNResponse); - rpc Trend (TrendRequest) returns (TrendResponse); - rpc StreamSnapshots (SnapshotRequest) returns (stream Snapshot); +// ListTargets — returns the targets this node knows about. +// The aggregator returns all configured collectors; a collector returns itself. + +message ListTargetsRequest {} + +message TargetInfo { + string name = 1; // display name (the --source value of the collector) + string addr = 2; // gRPC address to use as target=; empty means "this endpoint" +} + +message ListTargetsResponse { + repeated TargetInfo targets = 1; +} + +service LogtailService { + rpc TopN (TopNRequest) returns (TopNResponse); + rpc Trend (TrendRequest) returns (TrendResponse); + rpc StreamSnapshots (SnapshotRequest) returns (stream Snapshot); + rpc ListTargets (ListTargetsRequest) returns (ListTargetsResponse); } diff --git a/proto/logtailpb/logtail.pb.go b/proto/logtailpb/logtail.pb.go index ae41747..753df0f 100644 --- a/proto/logtailpb/logtail.pb.go +++ b/proto/logtailpb/logtail.pb.go @@ -709,6 +709,138 @@ func (x *Snapshot) GetEntries() []*TopNEntry { return nil } +type ListTargetsRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ListTargetsRequest) Reset() { + *x = ListTargetsRequest{} + mi := &file_logtail_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ListTargetsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListTargetsRequest) ProtoMessage() {} + +func (x *ListTargetsRequest) ProtoReflect() protoreflect.Message { + mi := &file_logtail_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListTargetsRequest.ProtoReflect.Descriptor instead. +func (*ListTargetsRequest) Descriptor() ([]byte, []int) { + return file_logtail_proto_rawDescGZIP(), []int{9} +} + +type TargetInfo struct { + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` // display name (the --source value of the collector) + Addr string `protobuf:"bytes,2,opt,name=addr,proto3" json:"addr,omitempty"` // gRPC address to use as target=; empty means "this endpoint" + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TargetInfo) Reset() { + *x = TargetInfo{} + mi := &file_logtail_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TargetInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TargetInfo) ProtoMessage() {} + +func (x *TargetInfo) ProtoReflect() protoreflect.Message { + mi := &file_logtail_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TargetInfo.ProtoReflect.Descriptor instead. +func (*TargetInfo) Descriptor() ([]byte, []int) { + return file_logtail_proto_rawDescGZIP(), []int{10} +} + +func (x *TargetInfo) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *TargetInfo) GetAddr() string { + if x != nil { + return x.Addr + } + return "" +} + +type ListTargetsResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Targets []*TargetInfo `protobuf:"bytes,1,rep,name=targets,proto3" json:"targets,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ListTargetsResponse) Reset() { + *x = ListTargetsResponse{} + mi := &file_logtail_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ListTargetsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListTargetsResponse) ProtoMessage() {} + +func (x *ListTargetsResponse) ProtoReflect() protoreflect.Message { + mi := &file_logtail_proto_msgTypes[11] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListTargetsResponse.ProtoReflect.Descriptor instead. +func (*ListTargetsResponse) Descriptor() ([]byte, []int) { + return file_logtail_proto_rawDescGZIP(), []int{11} +} + +func (x *ListTargetsResponse) GetTargets() []*TargetInfo { + if x != nil { + return x.Targets + } + return nil +} + var File_logtail_proto protoreflect.FileDescriptor const file_logtail_proto_rawDesc = "" + @@ -755,7 +887,14 @@ const file_logtail_proto_rawDesc = "" + "\bSnapshot\x12\x16\n" + "\x06source\x18\x01 \x01(\tR\x06source\x12\x1c\n" + "\ttimestamp\x18\x02 \x01(\x03R\ttimestamp\x12,\n" + - "\aentries\x18\x03 \x03(\v2\x12.logtail.TopNEntryR\aentries*:\n" + + "\aentries\x18\x03 \x03(\v2\x12.logtail.TopNEntryR\aentries\"\x14\n" + + "\x12ListTargetsRequest\"4\n" + + "\n" + + "TargetInfo\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\x12\x12\n" + + "\x04addr\x18\x02 \x01(\tR\x04addr\"D\n" + + "\x13ListTargetsResponse\x12-\n" + + "\atargets\x18\x01 \x03(\v2\x13.logtail.TargetInfoR\atargets*:\n" + "\bStatusOp\x12\x06\n" + "\x02EQ\x10\x00\x12\x06\n" + "\x02NE\x10\x01\x12\x06\n" + @@ -774,11 +913,12 @@ const file_logtail_proto_rawDesc = "" + "\x04W15M\x10\x02\x12\b\n" + "\x04W60M\x10\x03\x12\a\n" + "\x03W6H\x10\x04\x12\b\n" + - "\x04W24H\x10\x052\xbf\x01\n" + + "\x04W24H\x10\x052\x89\x02\n" + "\x0eLogtailService\x123\n" + "\x04TopN\x12\x14.logtail.TopNRequest\x1a\x15.logtail.TopNResponse\x126\n" + "\x05Trend\x12\x15.logtail.TrendRequest\x1a\x16.logtail.TrendResponse\x12@\n" + - "\x0fStreamSnapshots\x12\x18.logtail.SnapshotRequest\x1a\x11.logtail.Snapshot0\x01B0Z.git.ipng.ch/ipng/nginx-logtail/proto/logtailpbb\x06proto3" + "\x0fStreamSnapshots\x12\x18.logtail.SnapshotRequest\x1a\x11.logtail.Snapshot0\x01\x12H\n" + + "\vListTargets\x12\x1b.logtail.ListTargetsRequest\x1a\x1c.logtail.ListTargetsResponseB0Z.git.ipng.ch/ipng/nginx-logtail/proto/logtailpbb\x06proto3" var ( file_logtail_proto_rawDescOnce sync.Once @@ -793,20 +933,23 @@ func file_logtail_proto_rawDescGZIP() []byte { } var file_logtail_proto_enumTypes = make([]protoimpl.EnumInfo, 3) -var file_logtail_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_logtail_proto_msgTypes = make([]protoimpl.MessageInfo, 12) var file_logtail_proto_goTypes = []any{ - (StatusOp)(0), // 0: logtail.StatusOp - (GroupBy)(0), // 1: logtail.GroupBy - (Window)(0), // 2: logtail.Window - (*Filter)(nil), // 3: logtail.Filter - (*TopNRequest)(nil), // 4: logtail.TopNRequest - (*TopNEntry)(nil), // 5: logtail.TopNEntry - (*TopNResponse)(nil), // 6: logtail.TopNResponse - (*TrendRequest)(nil), // 7: logtail.TrendRequest - (*TrendPoint)(nil), // 8: logtail.TrendPoint - (*TrendResponse)(nil), // 9: logtail.TrendResponse - (*SnapshotRequest)(nil), // 10: logtail.SnapshotRequest - (*Snapshot)(nil), // 11: logtail.Snapshot + (StatusOp)(0), // 0: logtail.StatusOp + (GroupBy)(0), // 1: logtail.GroupBy + (Window)(0), // 2: logtail.Window + (*Filter)(nil), // 3: logtail.Filter + (*TopNRequest)(nil), // 4: logtail.TopNRequest + (*TopNEntry)(nil), // 5: logtail.TopNEntry + (*TopNResponse)(nil), // 6: logtail.TopNResponse + (*TrendRequest)(nil), // 7: logtail.TrendRequest + (*TrendPoint)(nil), // 8: logtail.TrendPoint + (*TrendResponse)(nil), // 9: logtail.TrendResponse + (*SnapshotRequest)(nil), // 10: logtail.SnapshotRequest + (*Snapshot)(nil), // 11: logtail.Snapshot + (*ListTargetsRequest)(nil), // 12: logtail.ListTargetsRequest + (*TargetInfo)(nil), // 13: logtail.TargetInfo + (*ListTargetsResponse)(nil), // 14: logtail.ListTargetsResponse } var file_logtail_proto_depIdxs = []int32{ 0, // 0: logtail.Filter.status_op:type_name -> logtail.StatusOp @@ -818,17 +961,20 @@ var file_logtail_proto_depIdxs = []int32{ 2, // 6: logtail.TrendRequest.window:type_name -> logtail.Window 8, // 7: logtail.TrendResponse.points:type_name -> logtail.TrendPoint 5, // 8: logtail.Snapshot.entries:type_name -> logtail.TopNEntry - 4, // 9: logtail.LogtailService.TopN:input_type -> logtail.TopNRequest - 7, // 10: logtail.LogtailService.Trend:input_type -> logtail.TrendRequest - 10, // 11: logtail.LogtailService.StreamSnapshots:input_type -> logtail.SnapshotRequest - 6, // 12: logtail.LogtailService.TopN:output_type -> logtail.TopNResponse - 9, // 13: logtail.LogtailService.Trend:output_type -> logtail.TrendResponse - 11, // 14: logtail.LogtailService.StreamSnapshots:output_type -> logtail.Snapshot - 12, // [12:15] is the sub-list for method output_type - 9, // [9:12] is the sub-list for method input_type - 9, // [9:9] is the sub-list for extension type_name - 9, // [9:9] is the sub-list for extension extendee - 0, // [0:9] is the sub-list for field type_name + 13, // 9: logtail.ListTargetsResponse.targets:type_name -> logtail.TargetInfo + 4, // 10: logtail.LogtailService.TopN:input_type -> logtail.TopNRequest + 7, // 11: logtail.LogtailService.Trend:input_type -> logtail.TrendRequest + 10, // 12: logtail.LogtailService.StreamSnapshots:input_type -> logtail.SnapshotRequest + 12, // 13: logtail.LogtailService.ListTargets:input_type -> logtail.ListTargetsRequest + 6, // 14: logtail.LogtailService.TopN:output_type -> logtail.TopNResponse + 9, // 15: logtail.LogtailService.Trend:output_type -> logtail.TrendResponse + 11, // 16: logtail.LogtailService.StreamSnapshots:output_type -> logtail.Snapshot + 14, // 17: logtail.LogtailService.ListTargets:output_type -> logtail.ListTargetsResponse + 14, // [14:18] is the sub-list for method output_type + 10, // [10:14] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name } func init() { file_logtail_proto_init() } @@ -843,7 +989,7 @@ func file_logtail_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_logtail_proto_rawDesc), len(file_logtail_proto_rawDesc)), NumEnums: 3, - NumMessages: 9, + NumMessages: 12, NumExtensions: 0, NumServices: 1, }, diff --git a/proto/logtailpb/logtail_grpc.pb.go b/proto/logtailpb/logtail_grpc.pb.go index 20736c5..3a77c62 100644 --- a/proto/logtailpb/logtail_grpc.pb.go +++ b/proto/logtailpb/logtail_grpc.pb.go @@ -22,6 +22,7 @@ const ( LogtailService_TopN_FullMethodName = "/logtail.LogtailService/TopN" LogtailService_Trend_FullMethodName = "/logtail.LogtailService/Trend" LogtailService_StreamSnapshots_FullMethodName = "/logtail.LogtailService/StreamSnapshots" + LogtailService_ListTargets_FullMethodName = "/logtail.LogtailService/ListTargets" ) // LogtailServiceClient is the client API for LogtailService service. @@ -31,6 +32,7 @@ type LogtailServiceClient interface { TopN(ctx context.Context, in *TopNRequest, opts ...grpc.CallOption) (*TopNResponse, error) Trend(ctx context.Context, in *TrendRequest, opts ...grpc.CallOption) (*TrendResponse, error) StreamSnapshots(ctx context.Context, in *SnapshotRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Snapshot], error) + ListTargets(ctx context.Context, in *ListTargetsRequest, opts ...grpc.CallOption) (*ListTargetsResponse, error) } type logtailServiceClient struct { @@ -80,6 +82,16 @@ func (c *logtailServiceClient) StreamSnapshots(ctx context.Context, in *Snapshot // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type LogtailService_StreamSnapshotsClient = grpc.ServerStreamingClient[Snapshot] +func (c *logtailServiceClient) ListTargets(ctx context.Context, in *ListTargetsRequest, opts ...grpc.CallOption) (*ListTargetsResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ListTargetsResponse) + err := c.cc.Invoke(ctx, LogtailService_ListTargets_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // LogtailServiceServer is the server API for LogtailService service. // All implementations must embed UnimplementedLogtailServiceServer // for forward compatibility. @@ -87,6 +99,7 @@ type LogtailServiceServer interface { TopN(context.Context, *TopNRequest) (*TopNResponse, error) Trend(context.Context, *TrendRequest) (*TrendResponse, error) StreamSnapshots(*SnapshotRequest, grpc.ServerStreamingServer[Snapshot]) error + ListTargets(context.Context, *ListTargetsRequest) (*ListTargetsResponse, error) mustEmbedUnimplementedLogtailServiceServer() } @@ -106,6 +119,9 @@ func (UnimplementedLogtailServiceServer) Trend(context.Context, *TrendRequest) ( func (UnimplementedLogtailServiceServer) StreamSnapshots(*SnapshotRequest, grpc.ServerStreamingServer[Snapshot]) error { return status.Error(codes.Unimplemented, "method StreamSnapshots not implemented") } +func (UnimplementedLogtailServiceServer) ListTargets(context.Context, *ListTargetsRequest) (*ListTargetsResponse, error) { + return nil, status.Error(codes.Unimplemented, "method ListTargets not implemented") +} func (UnimplementedLogtailServiceServer) mustEmbedUnimplementedLogtailServiceServer() {} func (UnimplementedLogtailServiceServer) testEmbeddedByValue() {} @@ -174,6 +190,24 @@ func _LogtailService_StreamSnapshots_Handler(srv interface{}, stream grpc.Server // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type LogtailService_StreamSnapshotsServer = grpc.ServerStreamingServer[Snapshot] +func _LogtailService_ListTargets_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ListTargetsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(LogtailServiceServer).ListTargets(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: LogtailService_ListTargets_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(LogtailServiceServer).ListTargets(ctx, req.(*ListTargetsRequest)) + } + return interceptor(ctx, in, info, handler) +} + // LogtailService_ServiceDesc is the grpc.ServiceDesc for LogtailService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -189,6 +223,10 @@ var LogtailService_ServiceDesc = grpc.ServiceDesc{ MethodName: "Trend", Handler: _LogtailService_Trend_Handler, }, + { + MethodName: "ListTargets", + Handler: _LogtailService_ListTargets_Handler, + }, }, Streams: []grpc.StreamDesc{ {