diff --git a/README.md b/README.md index d8030a5..b87ac87 100644 --- a/README.md +++ b/README.md @@ -264,6 +264,7 @@ message Snapshot { string source = 1; int64 timestamp = 2; repeated TopNEntry entries = 3; // full top-50K for this bucket + bool is_coarse = 4; // true for 5-min coarse buckets (DumpSnapshots only) } // Target discovery: list the collectors behind the queried endpoint @@ -274,15 +275,22 @@ message TargetInfo { } message ListTargetsResponse { repeated TargetInfo targets = 1; } +// Backfill: dump full ring buffer contents for aggregator restart recovery +message DumpSnapshotsRequest {} +// Response reuses Snapshot; is_coarse distinguishes fine (1-min) from coarse (5-min) buckets. +// Stream closes after all historical data is sent (unlike StreamSnapshots which stays open). + service LogtailService { rpc TopN(TopNRequest) returns (TopNResponse); rpc Trend(TrendRequest) returns (TrendResponse); rpc StreamSnapshots(SnapshotRequest) returns (stream Snapshot); rpc ListTargets(ListTargetsRequest) returns (ListTargetsResponse); + rpc DumpSnapshots(DumpSnapshotsRequest) returns (stream Snapshot); } // Both collector and aggregator implement LogtailService. // The aggregator's StreamSnapshots re-streams the merged view. // ListTargets: aggregator returns all configured collectors; collector returns itself. +// DumpSnapshots: collector only; aggregator calls this on startup to backfill its ring. ``` ## Program 1 — Collector @@ -334,11 +342,16 @@ service LogtailService { - **TopN query**: RLock ring, sum bucket range, apply filter, group by dimension, heap-select top N. - **Trend query**: per-bucket filtered sum, returns one `TrendPoint` per bucket. - **Subscriber fan-out**: per-subscriber buffered channel; `Subscribe`/`Unsubscribe` for streaming. +- **`DumpRings()`**: acquires `RLock`, copies both ring arrays and their head/filled pointers + (just slice headers — microseconds), releases lock, then returns chronologically-ordered fine + and coarse snapshot slices. The lock is never held during serialisation or network I/O. ### server.go - gRPC server on configurable port (default `:9090`). - `TopN` and `Trend`: unary, answered from the ring buffer under RLock. - `StreamSnapshots`: registers a subscriber channel; loops `Recv` on it; 30 s keepalive ticker. +- `DumpSnapshots`: calls `DumpRings()`, streams all fine buckets (`is_coarse=false`) then all + coarse buckets (`is_coarse=true`), then closes the stream. No lock held during streaming. ## Program 2 — Aggregator @@ -362,6 +375,23 @@ service LogtailService { to the same 1-minute cadence as collectors regardless of how many collectors are connected. - Same tiered ring structure as the collector store; populated from `merger.TopK()` each tick. - `QueryTopN`, `QueryTrend`, `Subscribe`/`Unsubscribe` — identical interface to collector store. +- **`LoadHistorical(fine, coarse []Snapshot)`**: writes pre-merged backfill snapshots directly into + the ring arrays under `mu.Lock()`, sets head and filled counters, then returns. Safe to call + concurrently with queries. The live ticker continues from the updated head after this returns. + +### backfill.go +- **`Backfill(ctx, collectorAddrs, cache)`**: called once at aggregator startup (in a goroutine, + after the gRPC server is already listening so the frontend is never blocked). +- Dials all collectors concurrently and calls `DumpSnapshots` on each. +- Accumulates entries per timestamp in `map[unix-second]map[label]count`; multiple collectors' + contributions for the same bucket are summed — the same delta-merge semantics as the live path. +- Sorts timestamps chronologically, runs `TopKFromMap` per bucket, caps to ring size. +- Calls `cache.LoadHistorical` once with the merged results. +- **Graceful degradation**: if a collector returns `Unimplemented` (old binary without + `DumpSnapshots`), logs an informational message and skips it — live streaming still starts + normally. Any other error is logged with timing and also skipped. Partial backfill (some + collectors succeed, some fail) is supported. +- Logs per-collector stats: bucket counts, total entry counts, and wall-clock duration. ### registry.go - **`TargetRegistry`**: `sync.RWMutex`-protected `map[addr → name]`. Initialised with the @@ -489,3 +519,6 @@ with a non-zero code on gRPC error. | Regex filters compiled once per query (`CompiledFilter`) | Up to 288 × 5 000 per-entry calls — compiling per-entry would dominate query latency | | Filter expression box (`q=`) redirects to canonical URL | Filter state stays in individual `f_*` params; URLs remain shareable and bookmarkable | | `ListTargets` + frontend source picker | "Which nginx is busiest?" answered by switching `target=` to a collector; no data model changes, no extra memory | +| Backfill via `DumpSnapshots` on restart | Aggregator recovers full 24h ring from collectors on restart; gRPC server starts first so frontend is never blocked during backfill | +| `DumpRings()` copies under lock, streams without lock | Lock held for microseconds (slice-header copy only); network I/O happens outside the lock so minute rotation is never delayed | +| Backfill merges per-timestamp across collectors | Correct cross-collector sums per bucket, same semantics as live delta-merge; collectors that don't support `DumpSnapshots` are skipped gracefully | diff --git a/cmd/aggregator/backfill.go b/cmd/aggregator/backfill.go new file mode 100644 index 0000000..4955b7a --- /dev/null +++ b/cmd/aggregator/backfill.go @@ -0,0 +1,162 @@ +package main + +import ( + "context" + "io" + "log" + "sort" + "time" + + st "git.ipng.ch/ipng/nginx-logtail/internal/store" + pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" +) + +// Backfill calls DumpSnapshots on all collectors concurrently, merges their +// data per timestamp, and loads the result into the cache. It blocks until all +// collectors have responded or the context is cancelled. +func Backfill(ctx context.Context, collectorAddrs []string, cache *Cache) { + type result struct { + fine []st.Snapshot + coarse []st.Snapshot + } + + ch := make(chan result, len(collectorAddrs)) + for _, addr := range collectorAddrs { + addr := addr + go func() { + start := time.Now() + fine, coarse, err := dumpCollector(ctx, addr) + if err != nil { + if status.Code(err) == codes.Unimplemented { + log.Printf("backfill: %s: collector does not support DumpSnapshots (old binary), skipping", addr) + } else { + log.Printf("backfill: %s: failed after %s: %v", addr, time.Since(start).Round(time.Millisecond), err) + } + ch <- result{} + return + } + var fineEntries, coarseEntries int + for _, s := range fine { + fineEntries += len(s.Entries) + } + for _, s := range coarse { + coarseEntries += len(s.Entries) + } + log.Printf("backfill: %s: %d fine buckets (%d entries) + %d coarse buckets (%d entries) in %s", + addr, len(fine), fineEntries, len(coarse), coarseEntries, time.Since(start).Round(time.Millisecond)) + ch <- result{fine, coarse} + }() + } + + // Collect per-timestamp maps: unix-minute → label → total count. + fineByTS := make(map[int64]map[string]int64) + coarseByTS := make(map[int64]map[string]int64) + + for range collectorAddrs { + r := <-ch + mergeDump(r.fine, fineByTS) + mergeDump(r.coarse, coarseByTS) + } + + fine := buildSnapshots(fineByTS, st.FineTopK, st.FineRingSize) + coarse := buildSnapshots(coarseByTS, st.CoarseTopK, st.CoarseRingSize) + + if len(fine)+len(coarse) == 0 { + log.Printf("backfill: no data received from any collector") + return + } + loadStart := time.Now() + cache.LoadHistorical(fine, coarse) + log.Printf("backfill: loaded %d fine + %d coarse buckets in %s total", + len(fine), len(coarse), time.Since(loadStart).Round(time.Microsecond)) +} + +// dumpCollector calls DumpSnapshots on one collector and returns the fine and +// coarse ring snapshots as separate slices. +func dumpCollector(ctx context.Context, addr string) (fine, coarse []st.Snapshot, err error) { + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, nil, err + } + defer conn.Close() + + client := pb.NewLogtailServiceClient(conn) + stream, err := client.DumpSnapshots(ctx, &pb.DumpSnapshotsRequest{}) + if err != nil { + return nil, nil, err + } + + for { + msg, err := stream.Recv() + if err == io.EOF { + return fine, coarse, nil + } + if err != nil { + return fine, coarse, err + } + snap := st.Snapshot{ + Timestamp: time.Unix(msg.Timestamp, 0), + Entries: pbEntriesToStore(msg.Entries), + } + if msg.IsCoarse { + coarse = append(coarse, snap) + } else { + fine = append(fine, snap) + } + } +} + +// mergeDump adds all snapshots from one collector's dump into the per-timestamp +// accumulator map. Multiple collectors' entries for the same timestamp are summed. +func mergeDump(snaps []st.Snapshot, byTS map[int64]map[string]int64) { + for _, snap := range snaps { + ts := snap.Timestamp.Unix() + m := byTS[ts] + if m == nil { + m = make(map[string]int64, len(snap.Entries)) + byTS[ts] = m + } + for _, e := range snap.Entries { + m[e.Label] += e.Count + } + } +} + +// buildSnapshots sorts the per-timestamp map chronologically, runs TopK on each +// bucket, and returns a slice capped to ringSize oldest-first snapshots. +func buildSnapshots(byTS map[int64]map[string]int64, topK, ringSize int) []st.Snapshot { + if len(byTS) == 0 { + return nil + } + timestamps := make([]int64, 0, len(byTS)) + for ts := range byTS { + timestamps = append(timestamps, ts) + } + sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] }) + + // Keep only the most recent ringSize buckets. + if len(timestamps) > ringSize { + timestamps = timestamps[len(timestamps)-ringSize:] + } + + snaps := make([]st.Snapshot, len(timestamps)) + for i, ts := range timestamps { + snaps[i] = st.Snapshot{ + Timestamp: time.Unix(ts, 0), + Entries: st.TopKFromMap(byTS[ts], topK), + } + } + return snaps +} + +func pbEntriesToStore(entries []*pb.TopNEntry) []st.Entry { + out := make([]st.Entry, len(entries)) + for i, e := range entries { + out[i] = st.Entry{Label: e.Label, Count: e.Count} + } + return out +} diff --git a/cmd/aggregator/cache.go b/cmd/aggregator/cache.go index dcc4810..00be9e7 100644 --- a/cmd/aggregator/cache.go +++ b/cmd/aggregator/cache.go @@ -90,6 +90,26 @@ func (c *Cache) mergeFineBuckets(now time.Time) st.Snapshot { return st.Snapshot{Timestamp: now, Entries: st.TopKFromMap(merged, st.CoarseTopK)} } +// LoadHistorical pre-populates the ring buffers from backfill data before live +// streaming begins. fine and coarse must be sorted oldest-first; each slice +// must not exceed the respective ring size. Called once at startup, before Run. +func (c *Cache) LoadHistorical(fine, coarse []st.Snapshot) { + c.mu.Lock() + defer c.mu.Unlock() + + for i, snap := range fine { + c.fineRing[i] = snap + } + c.fineFilled = len(fine) + c.fineHead = len(fine) % st.FineRingSize + + for i, snap := range coarse { + c.coarseRing[i] = snap + } + c.coarseFilled = len(coarse) + c.coarseHead = len(coarse) % st.CoarseRingSize +} + // QueryTopN answers a TopN request from the ring buffers. func (c *Cache) QueryTopN(filter *pb.Filter, groupBy pb.GroupBy, n int, window pb.Window) []st.Entry { cf := st.CompileFilter(filter) diff --git a/cmd/aggregator/main.go b/cmd/aggregator/main.go index 930837a..67ba62e 100644 --- a/cmd/aggregator/main.go +++ b/cmd/aggregator/main.go @@ -38,13 +38,6 @@ func main() { merger := NewMerger() cache := NewCache(merger, *source) registry := NewTargetRegistry(collectorAddrs) - go cache.Run(ctx) - - for _, addr := range collectorAddrs { - sub := NewCollectorSub(addr, merger, registry) - go sub.Run(ctx) - log.Printf("aggregator: subscribing to collector %s", addr) - } lis, err := net.Listen("tcp", *listen) if err != nil { @@ -60,6 +53,17 @@ func main() { } }() + go cache.Run(ctx) + + for _, addr := range collectorAddrs { + sub := NewCollectorSub(addr, merger, registry) + go sub.Run(ctx) + log.Printf("aggregator: subscribing to collector %s", addr) + } + + log.Printf("aggregator: backfilling from %d collector(s)", len(collectorAddrs)) + go Backfill(ctx, collectorAddrs, cache) + <-ctx.Done() log.Printf("aggregator: shutting down") grpcServer.GracefulStop() diff --git a/cmd/collector/server.go b/cmd/collector/server.go index 2ebcfc5..f87fed6 100644 --- a/cmd/collector/server.go +++ b/cmd/collector/server.go @@ -5,6 +5,7 @@ import ( "log" "time" + st "git.ipng.ch/ipng/nginx-logtail/internal/store" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -70,6 +71,33 @@ func peerAddr(ctx context.Context) string { return "unknown" } +func (srv *Server) DumpSnapshots(_ *pb.DumpSnapshotsRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error { + fine, coarse := srv.store.DumpRings() + for _, snap := range fine { + if err := stream.Send(storeSnapshotToProto(snap, srv.source, false)); err != nil { + return err + } + } + for _, snap := range coarse { + if err := stream.Send(storeSnapshotToProto(snap, srv.source, true)); err != nil { + return err + } + } + return nil +} + +func storeSnapshotToProto(snap st.Snapshot, source string, isCoarse bool) *pb.Snapshot { + msg := &pb.Snapshot{ + Source: source, + Timestamp: snap.Timestamp.Unix(), + IsCoarse: isCoarse, + } + for _, e := range snap.Entries { + msg.Entries = append(msg.Entries, &pb.TopNEntry{Label: e.Label, Count: e.Count}) + } + return msg +} + func (srv *Server) StreamSnapshots(req *pb.SnapshotRequest, stream grpc.ServerStreamingServer[pb.Snapshot]) error { ch := srv.store.Subscribe() defer srv.store.Unsubscribe(ch) diff --git a/cmd/collector/store.go b/cmd/collector/store.go index 28abbcb..e17e26d 100644 --- a/cmd/collector/store.go +++ b/cmd/collector/store.go @@ -154,6 +154,32 @@ func (s *Store) coarseView() st.RingView { return st.RingView{Ring: ring, Head: s.coarseHead, Size: st.CoarseRingSize} } +// DumpRings returns copies of all non-empty fine and coarse ring snapshots in +// chronological order. The lock is held only for the duration of the copy. +func (s *Store) DumpRings() (fine, coarse []st.Snapshot) { + s.mu.RLock() + fineRing := s.fineRing + fineHead := s.fineHead + fineFilled := s.fineFilled + coarseRing := s.coarseRing + coarseHead := s.coarseHead + coarseFilled := s.coarseFilled + s.mu.RUnlock() + + fine = make([]st.Snapshot, 0, fineFilled) + for i := 0; i < fineFilled; i++ { + idx := (fineHead - fineFilled + i + st.FineRingSize) % st.FineRingSize + fine = append(fine, fineRing[idx]) + } + + coarse = make([]st.Snapshot, 0, coarseFilled) + for i := 0; i < coarseFilled; i++ { + idx := (coarseHead - coarseFilled + i + st.CoarseRingSize) % st.CoarseRingSize + coarse = append(coarse, coarseRing[idx]) + } + return fine, coarse +} + func (s *Store) Subscribe() chan st.Snapshot { ch := make(chan st.Snapshot, 4) s.subMu.Lock() diff --git a/proto/logtail.pb.go b/proto/logtail.pb.go deleted file mode 100644 index 40515cd..0000000 --- a/proto/logtail.pb.go +++ /dev/null @@ -1,1098 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.36.11 -// protoc v3.21.12 -// source: proto/logtail.proto - -package logtailpb - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" - unsafe "unsafe" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -// TorFilter restricts results by whether the client is a TOR exit node. -// TOR_ANY (0) is the default and means no filtering. -type TorFilter int32 - -const ( - TorFilter_TOR_ANY TorFilter = 0 // no filter - TorFilter_TOR_YES TorFilter = 1 // only TOR traffic (is_tor=1) - TorFilter_TOR_NO TorFilter = 2 // only non-TOR traffic (is_tor=0) -) - -// Enum value maps for TorFilter. -var ( - TorFilter_name = map[int32]string{ - 0: "TOR_ANY", - 1: "TOR_YES", - 2: "TOR_NO", - } - TorFilter_value = map[string]int32{ - "TOR_ANY": 0, - "TOR_YES": 1, - "TOR_NO": 2, - } -) - -func (x TorFilter) Enum() *TorFilter { - p := new(TorFilter) - *p = x - return p -} - -func (x TorFilter) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (TorFilter) Descriptor() protoreflect.EnumDescriptor { - return file_proto_logtail_proto_enumTypes[0].Descriptor() -} - -func (TorFilter) Type() protoreflect.EnumType { - return &file_proto_logtail_proto_enumTypes[0] -} - -func (x TorFilter) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use TorFilter.Descriptor instead. -func (TorFilter) EnumDescriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{0} -} - -// StatusOp is the comparison operator applied to http_response in a Filter. -// Defaults to EQ (exact match) for backward compatibility. -type StatusOp int32 - -const ( - StatusOp_EQ StatusOp = 0 // == - StatusOp_NE StatusOp = 1 // != - StatusOp_GT StatusOp = 2 // > - StatusOp_GE StatusOp = 3 // >= - StatusOp_LT StatusOp = 4 // < - StatusOp_LE StatusOp = 5 // <= -) - -// Enum value maps for StatusOp. -var ( - StatusOp_name = map[int32]string{ - 0: "EQ", - 1: "NE", - 2: "GT", - 3: "GE", - 4: "LT", - 5: "LE", - } - StatusOp_value = map[string]int32{ - "EQ": 0, - "NE": 1, - "GT": 2, - "GE": 3, - "LT": 4, - "LE": 5, - } -) - -func (x StatusOp) Enum() *StatusOp { - p := new(StatusOp) - *p = x - return p -} - -func (x StatusOp) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (StatusOp) Descriptor() protoreflect.EnumDescriptor { - return file_proto_logtail_proto_enumTypes[1].Descriptor() -} - -func (StatusOp) Type() protoreflect.EnumType { - return &file_proto_logtail_proto_enumTypes[1] -} - -func (x StatusOp) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use StatusOp.Descriptor instead. -func (StatusOp) EnumDescriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{1} -} - -type GroupBy int32 - -const ( - GroupBy_WEBSITE GroupBy = 0 - GroupBy_CLIENT_PREFIX GroupBy = 1 - GroupBy_REQUEST_URI GroupBy = 2 - GroupBy_HTTP_RESPONSE GroupBy = 3 - GroupBy_ASN_NUMBER GroupBy = 4 -) - -// Enum value maps for GroupBy. -var ( - GroupBy_name = map[int32]string{ - 0: "WEBSITE", - 1: "CLIENT_PREFIX", - 2: "REQUEST_URI", - 3: "HTTP_RESPONSE", - 4: "ASN_NUMBER", - } - GroupBy_value = map[string]int32{ - "WEBSITE": 0, - "CLIENT_PREFIX": 1, - "REQUEST_URI": 2, - "HTTP_RESPONSE": 3, - "ASN_NUMBER": 4, - } -) - -func (x GroupBy) Enum() *GroupBy { - p := new(GroupBy) - *p = x - return p -} - -func (x GroupBy) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (GroupBy) Descriptor() protoreflect.EnumDescriptor { - return file_proto_logtail_proto_enumTypes[2].Descriptor() -} - -func (GroupBy) Type() protoreflect.EnumType { - return &file_proto_logtail_proto_enumTypes[2] -} - -func (x GroupBy) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use GroupBy.Descriptor instead. -func (GroupBy) EnumDescriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{2} -} - -type Window int32 - -const ( - Window_W1M Window = 0 // last 1 minute - Window_W5M Window = 1 // last 5 minutes - Window_W15M Window = 2 // last 15 minutes - Window_W60M Window = 3 // last 60 minutes - Window_W6H Window = 4 // last 6 hours - Window_W24H Window = 5 // last 24 hours -) - -// Enum value maps for Window. -var ( - Window_name = map[int32]string{ - 0: "W1M", - 1: "W5M", - 2: "W15M", - 3: "W60M", - 4: "W6H", - 5: "W24H", - } - Window_value = map[string]int32{ - "W1M": 0, - "W5M": 1, - "W15M": 2, - "W60M": 3, - "W6H": 4, - "W24H": 5, - } -) - -func (x Window) Enum() *Window { - p := new(Window) - *p = x - return p -} - -func (x Window) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (Window) Descriptor() protoreflect.EnumDescriptor { - return file_proto_logtail_proto_enumTypes[3].Descriptor() -} - -func (Window) Type() protoreflect.EnumType { - return &file_proto_logtail_proto_enumTypes[3] -} - -func (x Window) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use Window.Descriptor instead. -func (Window) EnumDescriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{3} -} - -// Filter restricts results to entries matching all specified fields. -// Unset fields match everything. Exact-match and regex fields are ANDed. -type Filter struct { - state protoimpl.MessageState `protogen:"open.v1"` - Website *string `protobuf:"bytes,1,opt,name=website,proto3,oneof" json:"website,omitempty"` - ClientPrefix *string `protobuf:"bytes,2,opt,name=client_prefix,json=clientPrefix,proto3,oneof" json:"client_prefix,omitempty"` - HttpRequestUri *string `protobuf:"bytes,3,opt,name=http_request_uri,json=httpRequestUri,proto3,oneof" json:"http_request_uri,omitempty"` - HttpResponse *int32 `protobuf:"varint,4,opt,name=http_response,json=httpResponse,proto3,oneof" json:"http_response,omitempty"` - StatusOp StatusOp `protobuf:"varint,5,opt,name=status_op,json=statusOp,proto3,enum=logtail.StatusOp" json:"status_op,omitempty"` // operator for http_response; ignored when unset - WebsiteRegex *string `protobuf:"bytes,6,opt,name=website_regex,json=websiteRegex,proto3,oneof" json:"website_regex,omitempty"` // RE2 regex matched against website - UriRegex *string `protobuf:"bytes,7,opt,name=uri_regex,json=uriRegex,proto3,oneof" json:"uri_regex,omitempty"` // RE2 regex matched against http_request_uri - Tor TorFilter `protobuf:"varint,8,opt,name=tor,proto3,enum=logtail.TorFilter" json:"tor,omitempty"` // restrict to TOR / non-TOR clients - AsnNumber *int32 `protobuf:"varint,9,opt,name=asn_number,json=asnNumber,proto3,oneof" json:"asn_number,omitempty"` // filter by client ASN - AsnOp StatusOp `protobuf:"varint,10,opt,name=asn_op,json=asnOp,proto3,enum=logtail.StatusOp" json:"asn_op,omitempty"` // operator for asn_number; ignored when unset - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *Filter) Reset() { - *x = Filter{} - mi := &file_proto_logtail_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *Filter) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Filter) ProtoMessage() {} - -func (x *Filter) ProtoReflect() protoreflect.Message { - mi := &file_proto_logtail_proto_msgTypes[0] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Filter.ProtoReflect.Descriptor instead. -func (*Filter) Descriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{0} -} - -func (x *Filter) GetWebsite() string { - if x != nil && x.Website != nil { - return *x.Website - } - return "" -} - -func (x *Filter) GetClientPrefix() string { - if x != nil && x.ClientPrefix != nil { - return *x.ClientPrefix - } - return "" -} - -func (x *Filter) GetHttpRequestUri() string { - if x != nil && x.HttpRequestUri != nil { - return *x.HttpRequestUri - } - return "" -} - -func (x *Filter) GetHttpResponse() int32 { - if x != nil && x.HttpResponse != nil { - return *x.HttpResponse - } - return 0 -} - -func (x *Filter) GetStatusOp() StatusOp { - if x != nil { - return x.StatusOp - } - return StatusOp_EQ -} - -func (x *Filter) GetWebsiteRegex() string { - if x != nil && x.WebsiteRegex != nil { - return *x.WebsiteRegex - } - return "" -} - -func (x *Filter) GetUriRegex() string { - if x != nil && x.UriRegex != nil { - return *x.UriRegex - } - return "" -} - -func (x *Filter) GetTor() TorFilter { - if x != nil { - return x.Tor - } - return TorFilter_TOR_ANY -} - -func (x *Filter) GetAsnNumber() int32 { - if x != nil && x.AsnNumber != nil { - return *x.AsnNumber - } - return 0 -} - -func (x *Filter) GetAsnOp() StatusOp { - if x != nil { - return x.AsnOp - } - return StatusOp_EQ -} - -type TopNRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - Filter *Filter `protobuf:"bytes,1,opt,name=filter,proto3" json:"filter,omitempty"` - GroupBy GroupBy `protobuf:"varint,2,opt,name=group_by,json=groupBy,proto3,enum=logtail.GroupBy" json:"group_by,omitempty"` - N int32 `protobuf:"varint,3,opt,name=n,proto3" json:"n,omitempty"` - Window Window `protobuf:"varint,4,opt,name=window,proto3,enum=logtail.Window" json:"window,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *TopNRequest) Reset() { - *x = TopNRequest{} - mi := &file_proto_logtail_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *TopNRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*TopNRequest) ProtoMessage() {} - -func (x *TopNRequest) ProtoReflect() protoreflect.Message { - mi := &file_proto_logtail_proto_msgTypes[1] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use TopNRequest.ProtoReflect.Descriptor instead. -func (*TopNRequest) Descriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{1} -} - -func (x *TopNRequest) GetFilter() *Filter { - if x != nil { - return x.Filter - } - return nil -} - -func (x *TopNRequest) GetGroupBy() GroupBy { - if x != nil { - return x.GroupBy - } - return GroupBy_WEBSITE -} - -func (x *TopNRequest) GetN() int32 { - if x != nil { - return x.N - } - return 0 -} - -func (x *TopNRequest) GetWindow() Window { - if x != nil { - return x.Window - } - return Window_W1M -} - -type TopNEntry struct { - state protoimpl.MessageState `protogen:"open.v1"` - Label string `protobuf:"bytes,1,opt,name=label,proto3" json:"label,omitempty"` - Count int64 `protobuf:"varint,2,opt,name=count,proto3" json:"count,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *TopNEntry) Reset() { - *x = TopNEntry{} - mi := &file_proto_logtail_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *TopNEntry) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*TopNEntry) ProtoMessage() {} - -func (x *TopNEntry) ProtoReflect() protoreflect.Message { - mi := &file_proto_logtail_proto_msgTypes[2] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use TopNEntry.ProtoReflect.Descriptor instead. -func (*TopNEntry) Descriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{2} -} - -func (x *TopNEntry) GetLabel() string { - if x != nil { - return x.Label - } - return "" -} - -func (x *TopNEntry) GetCount() int64 { - if x != nil { - return x.Count - } - return 0 -} - -type TopNResponse struct { - state protoimpl.MessageState `protogen:"open.v1"` - Entries []*TopNEntry `protobuf:"bytes,1,rep,name=entries,proto3" json:"entries,omitempty"` - Source string `protobuf:"bytes,2,opt,name=source,proto3" json:"source,omitempty"` // hostname of the responding node - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *TopNResponse) Reset() { - *x = TopNResponse{} - mi := &file_proto_logtail_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *TopNResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*TopNResponse) ProtoMessage() {} - -func (x *TopNResponse) ProtoReflect() protoreflect.Message { - mi := &file_proto_logtail_proto_msgTypes[3] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use TopNResponse.ProtoReflect.Descriptor instead. -func (*TopNResponse) Descriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{3} -} - -func (x *TopNResponse) GetEntries() []*TopNEntry { - if x != nil { - return x.Entries - } - return nil -} - -func (x *TopNResponse) GetSource() string { - if x != nil { - return x.Source - } - return "" -} - -type TrendRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - Filter *Filter `protobuf:"bytes,1,opt,name=filter,proto3" json:"filter,omitempty"` - Window Window `protobuf:"varint,2,opt,name=window,proto3,enum=logtail.Window" json:"window,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *TrendRequest) Reset() { - *x = TrendRequest{} - mi := &file_proto_logtail_proto_msgTypes[4] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *TrendRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*TrendRequest) ProtoMessage() {} - -func (x *TrendRequest) ProtoReflect() protoreflect.Message { - mi := &file_proto_logtail_proto_msgTypes[4] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use TrendRequest.ProtoReflect.Descriptor instead. -func (*TrendRequest) Descriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{4} -} - -func (x *TrendRequest) GetFilter() *Filter { - if x != nil { - return x.Filter - } - return nil -} - -func (x *TrendRequest) GetWindow() Window { - if x != nil { - return x.Window - } - return Window_W1M -} - -type TrendPoint struct { - state protoimpl.MessageState `protogen:"open.v1"` - TimestampUnix int64 `protobuf:"varint,1,opt,name=timestamp_unix,json=timestampUnix,proto3" json:"timestamp_unix,omitempty"` - Count int64 `protobuf:"varint,2,opt,name=count,proto3" json:"count,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *TrendPoint) Reset() { - *x = TrendPoint{} - mi := &file_proto_logtail_proto_msgTypes[5] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *TrendPoint) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*TrendPoint) ProtoMessage() {} - -func (x *TrendPoint) ProtoReflect() protoreflect.Message { - mi := &file_proto_logtail_proto_msgTypes[5] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use TrendPoint.ProtoReflect.Descriptor instead. -func (*TrendPoint) Descriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{5} -} - -func (x *TrendPoint) GetTimestampUnix() int64 { - if x != nil { - return x.TimestampUnix - } - return 0 -} - -func (x *TrendPoint) GetCount() int64 { - if x != nil { - return x.Count - } - return 0 -} - -type TrendResponse struct { - state protoimpl.MessageState `protogen:"open.v1"` - Points []*TrendPoint `protobuf:"bytes,1,rep,name=points,proto3" json:"points,omitempty"` - Source string `protobuf:"bytes,2,opt,name=source,proto3" json:"source,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *TrendResponse) Reset() { - *x = TrendResponse{} - mi := &file_proto_logtail_proto_msgTypes[6] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *TrendResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*TrendResponse) ProtoMessage() {} - -func (x *TrendResponse) ProtoReflect() protoreflect.Message { - mi := &file_proto_logtail_proto_msgTypes[6] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use TrendResponse.ProtoReflect.Descriptor instead. -func (*TrendResponse) Descriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{6} -} - -func (x *TrendResponse) GetPoints() []*TrendPoint { - if x != nil { - return x.Points - } - return nil -} - -func (x *TrendResponse) GetSource() string { - if x != nil { - return x.Source - } - return "" -} - -type SnapshotRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *SnapshotRequest) Reset() { - *x = SnapshotRequest{} - mi := &file_proto_logtail_proto_msgTypes[7] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *SnapshotRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*SnapshotRequest) ProtoMessage() {} - -func (x *SnapshotRequest) ProtoReflect() protoreflect.Message { - mi := &file_proto_logtail_proto_msgTypes[7] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use SnapshotRequest.ProtoReflect.Descriptor instead. -func (*SnapshotRequest) Descriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{7} -} - -type Snapshot struct { - state protoimpl.MessageState `protogen:"open.v1"` - Source string `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` - Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - Entries []*TopNEntry `protobuf:"bytes,3,rep,name=entries,proto3" json:"entries,omitempty"` // top-50K for this 1-minute bucket, sorted desc - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *Snapshot) Reset() { - *x = Snapshot{} - mi := &file_proto_logtail_proto_msgTypes[8] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *Snapshot) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Snapshot) ProtoMessage() {} - -func (x *Snapshot) ProtoReflect() protoreflect.Message { - mi := &file_proto_logtail_proto_msgTypes[8] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Snapshot.ProtoReflect.Descriptor instead. -func (*Snapshot) Descriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{8} -} - -func (x *Snapshot) GetSource() string { - if x != nil { - return x.Source - } - return "" -} - -func (x *Snapshot) GetTimestamp() int64 { - if x != nil { - return x.Timestamp - } - return 0 -} - -func (x *Snapshot) GetEntries() []*TopNEntry { - if x != nil { - return x.Entries - } - return nil -} - -type ListTargetsRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *ListTargetsRequest) Reset() { - *x = ListTargetsRequest{} - mi := &file_proto_logtail_proto_msgTypes[9] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *ListTargetsRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ListTargetsRequest) ProtoMessage() {} - -func (x *ListTargetsRequest) ProtoReflect() protoreflect.Message { - mi := &file_proto_logtail_proto_msgTypes[9] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ListTargetsRequest.ProtoReflect.Descriptor instead. -func (*ListTargetsRequest) Descriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{9} -} - -type TargetInfo struct { - state protoimpl.MessageState `protogen:"open.v1"` - Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` // display name (the --source value of the collector) - Addr string `protobuf:"bytes,2,opt,name=addr,proto3" json:"addr,omitempty"` // gRPC address to use as target=; empty means "this endpoint" - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *TargetInfo) Reset() { - *x = TargetInfo{} - mi := &file_proto_logtail_proto_msgTypes[10] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *TargetInfo) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*TargetInfo) ProtoMessage() {} - -func (x *TargetInfo) ProtoReflect() protoreflect.Message { - mi := &file_proto_logtail_proto_msgTypes[10] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use TargetInfo.ProtoReflect.Descriptor instead. -func (*TargetInfo) Descriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{10} -} - -func (x *TargetInfo) GetName() string { - if x != nil { - return x.Name - } - return "" -} - -func (x *TargetInfo) GetAddr() string { - if x != nil { - return x.Addr - } - return "" -} - -type ListTargetsResponse struct { - state protoimpl.MessageState `protogen:"open.v1"` - Targets []*TargetInfo `protobuf:"bytes,1,rep,name=targets,proto3" json:"targets,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *ListTargetsResponse) Reset() { - *x = ListTargetsResponse{} - mi := &file_proto_logtail_proto_msgTypes[11] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *ListTargetsResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ListTargetsResponse) ProtoMessage() {} - -func (x *ListTargetsResponse) ProtoReflect() protoreflect.Message { - mi := &file_proto_logtail_proto_msgTypes[11] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ListTargetsResponse.ProtoReflect.Descriptor instead. -func (*ListTargetsResponse) Descriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{11} -} - -func (x *ListTargetsResponse) GetTargets() []*TargetInfo { - if x != nil { - return x.Targets - } - return nil -} - -var File_proto_logtail_proto protoreflect.FileDescriptor - -const file_proto_logtail_proto_rawDesc = "" + - "\n" + - "\x13proto/logtail.proto\x12\alogtail\"\x8e\x04\n" + - "\x06Filter\x12\x1d\n" + - "\awebsite\x18\x01 \x01(\tH\x00R\awebsite\x88\x01\x01\x12(\n" + - "\rclient_prefix\x18\x02 \x01(\tH\x01R\fclientPrefix\x88\x01\x01\x12-\n" + - "\x10http_request_uri\x18\x03 \x01(\tH\x02R\x0ehttpRequestUri\x88\x01\x01\x12(\n" + - "\rhttp_response\x18\x04 \x01(\x05H\x03R\fhttpResponse\x88\x01\x01\x12.\n" + - "\tstatus_op\x18\x05 \x01(\x0e2\x11.logtail.StatusOpR\bstatusOp\x12(\n" + - "\rwebsite_regex\x18\x06 \x01(\tH\x04R\fwebsiteRegex\x88\x01\x01\x12 \n" + - "\turi_regex\x18\a \x01(\tH\x05R\buriRegex\x88\x01\x01\x12$\n" + - "\x03tor\x18\b \x01(\x0e2\x12.logtail.TorFilterR\x03tor\x12\"\n" + - "\n" + - "asn_number\x18\t \x01(\x05H\x06R\tasnNumber\x88\x01\x01\x12(\n" + - "\x06asn_op\x18\n" + - " \x01(\x0e2\x11.logtail.StatusOpR\x05asnOpB\n" + - "\n" + - "\b_websiteB\x10\n" + - "\x0e_client_prefixB\x13\n" + - "\x11_http_request_uriB\x10\n" + - "\x0e_http_responseB\x10\n" + - "\x0e_website_regexB\f\n" + - "\n" + - "_uri_regexB\r\n" + - "\v_asn_number\"\x9a\x01\n" + - "\vTopNRequest\x12'\n" + - "\x06filter\x18\x01 \x01(\v2\x0f.logtail.FilterR\x06filter\x12+\n" + - "\bgroup_by\x18\x02 \x01(\x0e2\x10.logtail.GroupByR\agroupBy\x12\f\n" + - "\x01n\x18\x03 \x01(\x05R\x01n\x12'\n" + - "\x06window\x18\x04 \x01(\x0e2\x0f.logtail.WindowR\x06window\"7\n" + - "\tTopNEntry\x12\x14\n" + - "\x05label\x18\x01 \x01(\tR\x05label\x12\x14\n" + - "\x05count\x18\x02 \x01(\x03R\x05count\"T\n" + - "\fTopNResponse\x12,\n" + - "\aentries\x18\x01 \x03(\v2\x12.logtail.TopNEntryR\aentries\x12\x16\n" + - "\x06source\x18\x02 \x01(\tR\x06source\"`\n" + - "\fTrendRequest\x12'\n" + - "\x06filter\x18\x01 \x01(\v2\x0f.logtail.FilterR\x06filter\x12'\n" + - "\x06window\x18\x02 \x01(\x0e2\x0f.logtail.WindowR\x06window\"I\n" + - "\n" + - "TrendPoint\x12%\n" + - "\x0etimestamp_unix\x18\x01 \x01(\x03R\rtimestampUnix\x12\x14\n" + - "\x05count\x18\x02 \x01(\x03R\x05count\"T\n" + - "\rTrendResponse\x12+\n" + - "\x06points\x18\x01 \x03(\v2\x13.logtail.TrendPointR\x06points\x12\x16\n" + - "\x06source\x18\x02 \x01(\tR\x06source\"\x11\n" + - "\x0fSnapshotRequest\"n\n" + - "\bSnapshot\x12\x16\n" + - "\x06source\x18\x01 \x01(\tR\x06source\x12\x1c\n" + - "\ttimestamp\x18\x02 \x01(\x03R\ttimestamp\x12,\n" + - "\aentries\x18\x03 \x03(\v2\x12.logtail.TopNEntryR\aentries\"\x14\n" + - "\x12ListTargetsRequest\"4\n" + - "\n" + - "TargetInfo\x12\x12\n" + - "\x04name\x18\x01 \x01(\tR\x04name\x12\x12\n" + - "\x04addr\x18\x02 \x01(\tR\x04addr\"D\n" + - "\x13ListTargetsResponse\x12-\n" + - "\atargets\x18\x01 \x03(\v2\x13.logtail.TargetInfoR\atargets*1\n" + - "\tTorFilter\x12\v\n" + - "\aTOR_ANY\x10\x00\x12\v\n" + - "\aTOR_YES\x10\x01\x12\n" + - "\n" + - "\x06TOR_NO\x10\x02*:\n" + - "\bStatusOp\x12\x06\n" + - "\x02EQ\x10\x00\x12\x06\n" + - "\x02NE\x10\x01\x12\x06\n" + - "\x02GT\x10\x02\x12\x06\n" + - "\x02GE\x10\x03\x12\x06\n" + - "\x02LT\x10\x04\x12\x06\n" + - "\x02LE\x10\x05*]\n" + - "\aGroupBy\x12\v\n" + - "\aWEBSITE\x10\x00\x12\x11\n" + - "\rCLIENT_PREFIX\x10\x01\x12\x0f\n" + - "\vREQUEST_URI\x10\x02\x12\x11\n" + - "\rHTTP_RESPONSE\x10\x03\x12\x0e\n" + - "\n" + - "ASN_NUMBER\x10\x04*A\n" + - "\x06Window\x12\a\n" + - "\x03W1M\x10\x00\x12\a\n" + - "\x03W5M\x10\x01\x12\b\n" + - "\x04W15M\x10\x02\x12\b\n" + - "\x04W60M\x10\x03\x12\a\n" + - "\x03W6H\x10\x04\x12\b\n" + - "\x04W24H\x10\x052\x89\x02\n" + - "\x0eLogtailService\x123\n" + - "\x04TopN\x12\x14.logtail.TopNRequest\x1a\x15.logtail.TopNResponse\x126\n" + - "\x05Trend\x12\x15.logtail.TrendRequest\x1a\x16.logtail.TrendResponse\x12@\n" + - "\x0fStreamSnapshots\x12\x18.logtail.SnapshotRequest\x1a\x11.logtail.Snapshot0\x01\x12H\n" + - "\vListTargets\x12\x1b.logtail.ListTargetsRequest\x1a\x1c.logtail.ListTargetsResponseB0Z.git.ipng.ch/ipng/nginx-logtail/proto/logtailpbb\x06proto3" - -var ( - file_proto_logtail_proto_rawDescOnce sync.Once - file_proto_logtail_proto_rawDescData []byte -) - -func file_proto_logtail_proto_rawDescGZIP() []byte { - file_proto_logtail_proto_rawDescOnce.Do(func() { - file_proto_logtail_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_proto_logtail_proto_rawDesc), len(file_proto_logtail_proto_rawDesc))) - }) - return file_proto_logtail_proto_rawDescData -} - -var file_proto_logtail_proto_enumTypes = make([]protoimpl.EnumInfo, 4) -var file_proto_logtail_proto_msgTypes = make([]protoimpl.MessageInfo, 12) -var file_proto_logtail_proto_goTypes = []any{ - (TorFilter)(0), // 0: logtail.TorFilter - (StatusOp)(0), // 1: logtail.StatusOp - (GroupBy)(0), // 2: logtail.GroupBy - (Window)(0), // 3: logtail.Window - (*Filter)(nil), // 4: logtail.Filter - (*TopNRequest)(nil), // 5: logtail.TopNRequest - (*TopNEntry)(nil), // 6: logtail.TopNEntry - (*TopNResponse)(nil), // 7: logtail.TopNResponse - (*TrendRequest)(nil), // 8: logtail.TrendRequest - (*TrendPoint)(nil), // 9: logtail.TrendPoint - (*TrendResponse)(nil), // 10: logtail.TrendResponse - (*SnapshotRequest)(nil), // 11: logtail.SnapshotRequest - (*Snapshot)(nil), // 12: logtail.Snapshot - (*ListTargetsRequest)(nil), // 13: logtail.ListTargetsRequest - (*TargetInfo)(nil), // 14: logtail.TargetInfo - (*ListTargetsResponse)(nil), // 15: logtail.ListTargetsResponse -} -var file_proto_logtail_proto_depIdxs = []int32{ - 1, // 0: logtail.Filter.status_op:type_name -> logtail.StatusOp - 0, // 1: logtail.Filter.tor:type_name -> logtail.TorFilter - 1, // 2: logtail.Filter.asn_op:type_name -> logtail.StatusOp - 4, // 3: logtail.TopNRequest.filter:type_name -> logtail.Filter - 2, // 4: logtail.TopNRequest.group_by:type_name -> logtail.GroupBy - 3, // 5: logtail.TopNRequest.window:type_name -> logtail.Window - 6, // 6: logtail.TopNResponse.entries:type_name -> logtail.TopNEntry - 4, // 7: logtail.TrendRequest.filter:type_name -> logtail.Filter - 3, // 8: logtail.TrendRequest.window:type_name -> logtail.Window - 9, // 9: logtail.TrendResponse.points:type_name -> logtail.TrendPoint - 6, // 10: logtail.Snapshot.entries:type_name -> logtail.TopNEntry - 14, // 11: logtail.ListTargetsResponse.targets:type_name -> logtail.TargetInfo - 5, // 12: logtail.LogtailService.TopN:input_type -> logtail.TopNRequest - 8, // 13: logtail.LogtailService.Trend:input_type -> logtail.TrendRequest - 11, // 14: logtail.LogtailService.StreamSnapshots:input_type -> logtail.SnapshotRequest - 13, // 15: logtail.LogtailService.ListTargets:input_type -> logtail.ListTargetsRequest - 7, // 16: logtail.LogtailService.TopN:output_type -> logtail.TopNResponse - 10, // 17: logtail.LogtailService.Trend:output_type -> logtail.TrendResponse - 12, // 18: logtail.LogtailService.StreamSnapshots:output_type -> logtail.Snapshot - 15, // 19: logtail.LogtailService.ListTargets:output_type -> logtail.ListTargetsResponse - 16, // [16:20] is the sub-list for method output_type - 12, // [12:16] is the sub-list for method input_type - 12, // [12:12] is the sub-list for extension type_name - 12, // [12:12] is the sub-list for extension extendee - 0, // [0:12] is the sub-list for field type_name -} - -func init() { file_proto_logtail_proto_init() } -func file_proto_logtail_proto_init() { - if File_proto_logtail_proto != nil { - return - } - file_proto_logtail_proto_msgTypes[0].OneofWrappers = []any{} - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_logtail_proto_rawDesc), len(file_proto_logtail_proto_rawDesc)), - NumEnums: 4, - NumMessages: 12, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_proto_logtail_proto_goTypes, - DependencyIndexes: file_proto_logtail_proto_depIdxs, - EnumInfos: file_proto_logtail_proto_enumTypes, - MessageInfos: file_proto_logtail_proto_msgTypes, - }.Build() - File_proto_logtail_proto = out.File - file_proto_logtail_proto_goTypes = nil - file_proto_logtail_proto_depIdxs = nil -} diff --git a/proto/logtail.proto b/proto/logtail.proto index 1549d59..7338efd 100644 --- a/proto/logtail.proto +++ b/proto/logtail.proto @@ -99,8 +99,15 @@ message Snapshot { string source = 1; int64 timestamp = 2; repeated TopNEntry entries = 3; // top-50K for this 1-minute bucket, sorted desc + bool is_coarse = 4; // true for coarse-ring (5-min) buckets in DumpSnapshots } +// DumpSnapshots — returns all ring buffer contents for backfill on aggregator restart. +// Streams fine-ring buckets (is_coarse=false) followed by coarse-ring buckets +// (is_coarse=true), then closes. The lock is held only for the initial copy. + +message DumpSnapshotsRequest {} + // ListTargets — returns the targets this node knows about. // The aggregator returns all configured collectors; a collector returns itself. @@ -120,4 +127,5 @@ service LogtailService { rpc Trend (TrendRequest) returns (TrendResponse); rpc StreamSnapshots (SnapshotRequest) returns (stream Snapshot); rpc ListTargets (ListTargetsRequest) returns (ListTargetsResponse); + rpc DumpSnapshots (DumpSnapshotsRequest) returns (stream Snapshot); } diff --git a/proto/logtail_grpc.pb.go b/proto/logtail_grpc.pb.go deleted file mode 100644 index 520faea..0000000 --- a/proto/logtail_grpc.pb.go +++ /dev/null @@ -1,239 +0,0 @@ -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.6.1 -// - protoc v3.21.12 -// source: proto/logtail.proto - -package logtailpb - -import ( - context "context" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.64.0 or later. -const _ = grpc.SupportPackageIsVersion9 - -const ( - LogtailService_TopN_FullMethodName = "/logtail.LogtailService/TopN" - LogtailService_Trend_FullMethodName = "/logtail.LogtailService/Trend" - LogtailService_StreamSnapshots_FullMethodName = "/logtail.LogtailService/StreamSnapshots" - LogtailService_ListTargets_FullMethodName = "/logtail.LogtailService/ListTargets" -) - -// LogtailServiceClient is the client API for LogtailService service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type LogtailServiceClient interface { - TopN(ctx context.Context, in *TopNRequest, opts ...grpc.CallOption) (*TopNResponse, error) - Trend(ctx context.Context, in *TrendRequest, opts ...grpc.CallOption) (*TrendResponse, error) - StreamSnapshots(ctx context.Context, in *SnapshotRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Snapshot], error) - ListTargets(ctx context.Context, in *ListTargetsRequest, opts ...grpc.CallOption) (*ListTargetsResponse, error) -} - -type logtailServiceClient struct { - cc grpc.ClientConnInterface -} - -func NewLogtailServiceClient(cc grpc.ClientConnInterface) LogtailServiceClient { - return &logtailServiceClient{cc} -} - -func (c *logtailServiceClient) TopN(ctx context.Context, in *TopNRequest, opts ...grpc.CallOption) (*TopNResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(TopNResponse) - err := c.cc.Invoke(ctx, LogtailService_TopN_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *logtailServiceClient) Trend(ctx context.Context, in *TrendRequest, opts ...grpc.CallOption) (*TrendResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(TrendResponse) - err := c.cc.Invoke(ctx, LogtailService_Trend_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *logtailServiceClient) StreamSnapshots(ctx context.Context, in *SnapshotRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Snapshot], error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - stream, err := c.cc.NewStream(ctx, &LogtailService_ServiceDesc.Streams[0], LogtailService_StreamSnapshots_FullMethodName, cOpts...) - if err != nil { - return nil, err - } - x := &grpc.GenericClientStream[SnapshotRequest, Snapshot]{ClientStream: stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. -type LogtailService_StreamSnapshotsClient = grpc.ServerStreamingClient[Snapshot] - -func (c *logtailServiceClient) ListTargets(ctx context.Context, in *ListTargetsRequest, opts ...grpc.CallOption) (*ListTargetsResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(ListTargetsResponse) - err := c.cc.Invoke(ctx, LogtailService_ListTargets_FullMethodName, in, out, cOpts...) - if err != nil { - return nil, err - } - return out, nil -} - -// LogtailServiceServer is the server API for LogtailService service. -// All implementations must embed UnimplementedLogtailServiceServer -// for forward compatibility. -type LogtailServiceServer interface { - TopN(context.Context, *TopNRequest) (*TopNResponse, error) - Trend(context.Context, *TrendRequest) (*TrendResponse, error) - StreamSnapshots(*SnapshotRequest, grpc.ServerStreamingServer[Snapshot]) error - ListTargets(context.Context, *ListTargetsRequest) (*ListTargetsResponse, error) - mustEmbedUnimplementedLogtailServiceServer() -} - -// UnimplementedLogtailServiceServer must be embedded to have -// forward compatible implementations. -// -// NOTE: this should be embedded by value instead of pointer to avoid a nil -// pointer dereference when methods are called. -type UnimplementedLogtailServiceServer struct{} - -func (UnimplementedLogtailServiceServer) TopN(context.Context, *TopNRequest) (*TopNResponse, error) { - return nil, status.Error(codes.Unimplemented, "method TopN not implemented") -} -func (UnimplementedLogtailServiceServer) Trend(context.Context, *TrendRequest) (*TrendResponse, error) { - return nil, status.Error(codes.Unimplemented, "method Trend not implemented") -} -func (UnimplementedLogtailServiceServer) StreamSnapshots(*SnapshotRequest, grpc.ServerStreamingServer[Snapshot]) error { - return status.Error(codes.Unimplemented, "method StreamSnapshots not implemented") -} -func (UnimplementedLogtailServiceServer) ListTargets(context.Context, *ListTargetsRequest) (*ListTargetsResponse, error) { - return nil, status.Error(codes.Unimplemented, "method ListTargets not implemented") -} -func (UnimplementedLogtailServiceServer) mustEmbedUnimplementedLogtailServiceServer() {} -func (UnimplementedLogtailServiceServer) testEmbeddedByValue() {} - -// UnsafeLogtailServiceServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to LogtailServiceServer will -// result in compilation errors. -type UnsafeLogtailServiceServer interface { - mustEmbedUnimplementedLogtailServiceServer() -} - -func RegisterLogtailServiceServer(s grpc.ServiceRegistrar, srv LogtailServiceServer) { - // If the following call panics, it indicates UnimplementedLogtailServiceServer was - // embedded by pointer and is nil. This will cause panics if an - // unimplemented method is ever invoked, so we test this at initialization - // time to prevent it from happening at runtime later due to I/O. - if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { - t.testEmbeddedByValue() - } - s.RegisterService(&LogtailService_ServiceDesc, srv) -} - -func _LogtailService_TopN_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(TopNRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(LogtailServiceServer).TopN(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: LogtailService_TopN_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(LogtailServiceServer).TopN(ctx, req.(*TopNRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _LogtailService_Trend_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(TrendRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(LogtailServiceServer).Trend(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: LogtailService_Trend_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(LogtailServiceServer).Trend(ctx, req.(*TrendRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _LogtailService_StreamSnapshots_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(SnapshotRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(LogtailServiceServer).StreamSnapshots(m, &grpc.GenericServerStream[SnapshotRequest, Snapshot]{ServerStream: stream}) -} - -// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. -type LogtailService_StreamSnapshotsServer = grpc.ServerStreamingServer[Snapshot] - -func _LogtailService_ListTargets_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ListTargetsRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(LogtailServiceServer).ListTargets(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: LogtailService_ListTargets_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(LogtailServiceServer).ListTargets(ctx, req.(*ListTargetsRequest)) - } - return interceptor(ctx, in, info, handler) -} - -// LogtailService_ServiceDesc is the grpc.ServiceDesc for LogtailService service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var LogtailService_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "logtail.LogtailService", - HandlerType: (*LogtailServiceServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "TopN", - Handler: _LogtailService_TopN_Handler, - }, - { - MethodName: "Trend", - Handler: _LogtailService_Trend_Handler, - }, - { - MethodName: "ListTargets", - Handler: _LogtailService_ListTargets_Handler, - }, - }, - Streams: []grpc.StreamDesc{ - { - StreamName: "StreamSnapshots", - Handler: _LogtailService_StreamSnapshots_Handler, - ServerStreams: true, - }, - }, - Metadata: "proto/logtail.proto", -} diff --git a/proto/logtailpb/logtail.pb.go b/proto/logtailpb/logtail.pb.go index 40515cd..d39ce4d 100644 --- a/proto/logtailpb/logtail.pb.go +++ b/proto/logtailpb/logtail.pb.go @@ -731,7 +731,8 @@ type Snapshot struct { state protoimpl.MessageState `protogen:"open.v1"` Source string `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - Entries []*TopNEntry `protobuf:"bytes,3,rep,name=entries,proto3" json:"entries,omitempty"` // top-50K for this 1-minute bucket, sorted desc + Entries []*TopNEntry `protobuf:"bytes,3,rep,name=entries,proto3" json:"entries,omitempty"` // top-50K for this 1-minute bucket, sorted desc + IsCoarse bool `protobuf:"varint,4,opt,name=is_coarse,json=isCoarse,proto3" json:"is_coarse,omitempty"` // true for coarse-ring (5-min) buckets in DumpSnapshots unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -787,6 +788,49 @@ func (x *Snapshot) GetEntries() []*TopNEntry { return nil } +func (x *Snapshot) GetIsCoarse() bool { + if x != nil { + return x.IsCoarse + } + return false +} + +type DumpSnapshotsRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *DumpSnapshotsRequest) Reset() { + *x = DumpSnapshotsRequest{} + mi := &file_proto_logtail_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DumpSnapshotsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DumpSnapshotsRequest) ProtoMessage() {} + +func (x *DumpSnapshotsRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_logtail_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DumpSnapshotsRequest.ProtoReflect.Descriptor instead. +func (*DumpSnapshotsRequest) Descriptor() ([]byte, []int) { + return file_proto_logtail_proto_rawDescGZIP(), []int{9} +} + type ListTargetsRequest struct { state protoimpl.MessageState `protogen:"open.v1"` unknownFields protoimpl.UnknownFields @@ -795,7 +839,7 @@ type ListTargetsRequest struct { func (x *ListTargetsRequest) Reset() { *x = ListTargetsRequest{} - mi := &file_proto_logtail_proto_msgTypes[9] + mi := &file_proto_logtail_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -807,7 +851,7 @@ func (x *ListTargetsRequest) String() string { func (*ListTargetsRequest) ProtoMessage() {} func (x *ListTargetsRequest) ProtoReflect() protoreflect.Message { - mi := &file_proto_logtail_proto_msgTypes[9] + mi := &file_proto_logtail_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -820,7 +864,7 @@ func (x *ListTargetsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ListTargetsRequest.ProtoReflect.Descriptor instead. func (*ListTargetsRequest) Descriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{9} + return file_proto_logtail_proto_rawDescGZIP(), []int{10} } type TargetInfo struct { @@ -833,7 +877,7 @@ type TargetInfo struct { func (x *TargetInfo) Reset() { *x = TargetInfo{} - mi := &file_proto_logtail_proto_msgTypes[10] + mi := &file_proto_logtail_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -845,7 +889,7 @@ func (x *TargetInfo) String() string { func (*TargetInfo) ProtoMessage() {} func (x *TargetInfo) ProtoReflect() protoreflect.Message { - mi := &file_proto_logtail_proto_msgTypes[10] + mi := &file_proto_logtail_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -858,7 +902,7 @@ func (x *TargetInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use TargetInfo.ProtoReflect.Descriptor instead. func (*TargetInfo) Descriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{10} + return file_proto_logtail_proto_rawDescGZIP(), []int{11} } func (x *TargetInfo) GetName() string { @@ -884,7 +928,7 @@ type ListTargetsResponse struct { func (x *ListTargetsResponse) Reset() { *x = ListTargetsResponse{} - mi := &file_proto_logtail_proto_msgTypes[11] + mi := &file_proto_logtail_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -896,7 +940,7 @@ func (x *ListTargetsResponse) String() string { func (*ListTargetsResponse) ProtoMessage() {} func (x *ListTargetsResponse) ProtoReflect() protoreflect.Message { - mi := &file_proto_logtail_proto_msgTypes[11] + mi := &file_proto_logtail_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -909,7 +953,7 @@ func (x *ListTargetsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ListTargetsResponse.ProtoReflect.Descriptor instead. func (*ListTargetsResponse) Descriptor() ([]byte, []int) { - return file_proto_logtail_proto_rawDescGZIP(), []int{11} + return file_proto_logtail_proto_rawDescGZIP(), []int{12} } func (x *ListTargetsResponse) GetTargets() []*TargetInfo { @@ -967,11 +1011,13 @@ const file_proto_logtail_proto_rawDesc = "" + "\rTrendResponse\x12+\n" + "\x06points\x18\x01 \x03(\v2\x13.logtail.TrendPointR\x06points\x12\x16\n" + "\x06source\x18\x02 \x01(\tR\x06source\"\x11\n" + - "\x0fSnapshotRequest\"n\n" + + "\x0fSnapshotRequest\"\x8b\x01\n" + "\bSnapshot\x12\x16\n" + "\x06source\x18\x01 \x01(\tR\x06source\x12\x1c\n" + "\ttimestamp\x18\x02 \x01(\x03R\ttimestamp\x12,\n" + - "\aentries\x18\x03 \x03(\v2\x12.logtail.TopNEntryR\aentries\"\x14\n" + + "\aentries\x18\x03 \x03(\v2\x12.logtail.TopNEntryR\aentries\x12\x1b\n" + + "\tis_coarse\x18\x04 \x01(\bR\bisCoarse\"\x16\n" + + "\x14DumpSnapshotsRequest\"\x14\n" + "\x12ListTargetsRequest\"4\n" + "\n" + "TargetInfo\x12\x12\n" + @@ -1004,12 +1050,13 @@ const file_proto_logtail_proto_rawDesc = "" + "\x04W15M\x10\x02\x12\b\n" + "\x04W60M\x10\x03\x12\a\n" + "\x03W6H\x10\x04\x12\b\n" + - "\x04W24H\x10\x052\x89\x02\n" + + "\x04W24H\x10\x052\xce\x02\n" + "\x0eLogtailService\x123\n" + "\x04TopN\x12\x14.logtail.TopNRequest\x1a\x15.logtail.TopNResponse\x126\n" + "\x05Trend\x12\x15.logtail.TrendRequest\x1a\x16.logtail.TrendResponse\x12@\n" + "\x0fStreamSnapshots\x12\x18.logtail.SnapshotRequest\x1a\x11.logtail.Snapshot0\x01\x12H\n" + - "\vListTargets\x12\x1b.logtail.ListTargetsRequest\x1a\x1c.logtail.ListTargetsResponseB0Z.git.ipng.ch/ipng/nginx-logtail/proto/logtailpbb\x06proto3" + "\vListTargets\x12\x1b.logtail.ListTargetsRequest\x1a\x1c.logtail.ListTargetsResponse\x12C\n" + + "\rDumpSnapshots\x12\x1d.logtail.DumpSnapshotsRequest\x1a\x11.logtail.Snapshot0\x01B0Z.git.ipng.ch/ipng/nginx-logtail/proto/logtailpbb\x06proto3" var ( file_proto_logtail_proto_rawDescOnce sync.Once @@ -1024,24 +1071,25 @@ func file_proto_logtail_proto_rawDescGZIP() []byte { } var file_proto_logtail_proto_enumTypes = make([]protoimpl.EnumInfo, 4) -var file_proto_logtail_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_proto_logtail_proto_msgTypes = make([]protoimpl.MessageInfo, 13) var file_proto_logtail_proto_goTypes = []any{ - (TorFilter)(0), // 0: logtail.TorFilter - (StatusOp)(0), // 1: logtail.StatusOp - (GroupBy)(0), // 2: logtail.GroupBy - (Window)(0), // 3: logtail.Window - (*Filter)(nil), // 4: logtail.Filter - (*TopNRequest)(nil), // 5: logtail.TopNRequest - (*TopNEntry)(nil), // 6: logtail.TopNEntry - (*TopNResponse)(nil), // 7: logtail.TopNResponse - (*TrendRequest)(nil), // 8: logtail.TrendRequest - (*TrendPoint)(nil), // 9: logtail.TrendPoint - (*TrendResponse)(nil), // 10: logtail.TrendResponse - (*SnapshotRequest)(nil), // 11: logtail.SnapshotRequest - (*Snapshot)(nil), // 12: logtail.Snapshot - (*ListTargetsRequest)(nil), // 13: logtail.ListTargetsRequest - (*TargetInfo)(nil), // 14: logtail.TargetInfo - (*ListTargetsResponse)(nil), // 15: logtail.ListTargetsResponse + (TorFilter)(0), // 0: logtail.TorFilter + (StatusOp)(0), // 1: logtail.StatusOp + (GroupBy)(0), // 2: logtail.GroupBy + (Window)(0), // 3: logtail.Window + (*Filter)(nil), // 4: logtail.Filter + (*TopNRequest)(nil), // 5: logtail.TopNRequest + (*TopNEntry)(nil), // 6: logtail.TopNEntry + (*TopNResponse)(nil), // 7: logtail.TopNResponse + (*TrendRequest)(nil), // 8: logtail.TrendRequest + (*TrendPoint)(nil), // 9: logtail.TrendPoint + (*TrendResponse)(nil), // 10: logtail.TrendResponse + (*SnapshotRequest)(nil), // 11: logtail.SnapshotRequest + (*Snapshot)(nil), // 12: logtail.Snapshot + (*DumpSnapshotsRequest)(nil), // 13: logtail.DumpSnapshotsRequest + (*ListTargetsRequest)(nil), // 14: logtail.ListTargetsRequest + (*TargetInfo)(nil), // 15: logtail.TargetInfo + (*ListTargetsResponse)(nil), // 16: logtail.ListTargetsResponse } var file_proto_logtail_proto_depIdxs = []int32{ 1, // 0: logtail.Filter.status_op:type_name -> logtail.StatusOp @@ -1055,17 +1103,19 @@ var file_proto_logtail_proto_depIdxs = []int32{ 3, // 8: logtail.TrendRequest.window:type_name -> logtail.Window 9, // 9: logtail.TrendResponse.points:type_name -> logtail.TrendPoint 6, // 10: logtail.Snapshot.entries:type_name -> logtail.TopNEntry - 14, // 11: logtail.ListTargetsResponse.targets:type_name -> logtail.TargetInfo + 15, // 11: logtail.ListTargetsResponse.targets:type_name -> logtail.TargetInfo 5, // 12: logtail.LogtailService.TopN:input_type -> logtail.TopNRequest 8, // 13: logtail.LogtailService.Trend:input_type -> logtail.TrendRequest 11, // 14: logtail.LogtailService.StreamSnapshots:input_type -> logtail.SnapshotRequest - 13, // 15: logtail.LogtailService.ListTargets:input_type -> logtail.ListTargetsRequest - 7, // 16: logtail.LogtailService.TopN:output_type -> logtail.TopNResponse - 10, // 17: logtail.LogtailService.Trend:output_type -> logtail.TrendResponse - 12, // 18: logtail.LogtailService.StreamSnapshots:output_type -> logtail.Snapshot - 15, // 19: logtail.LogtailService.ListTargets:output_type -> logtail.ListTargetsResponse - 16, // [16:20] is the sub-list for method output_type - 12, // [12:16] is the sub-list for method input_type + 14, // 15: logtail.LogtailService.ListTargets:input_type -> logtail.ListTargetsRequest + 13, // 16: logtail.LogtailService.DumpSnapshots:input_type -> logtail.DumpSnapshotsRequest + 7, // 17: logtail.LogtailService.TopN:output_type -> logtail.TopNResponse + 10, // 18: logtail.LogtailService.Trend:output_type -> logtail.TrendResponse + 12, // 19: logtail.LogtailService.StreamSnapshots:output_type -> logtail.Snapshot + 16, // 20: logtail.LogtailService.ListTargets:output_type -> logtail.ListTargetsResponse + 12, // 21: logtail.LogtailService.DumpSnapshots:output_type -> logtail.Snapshot + 17, // [17:22] is the sub-list for method output_type + 12, // [12:17] is the sub-list for method input_type 12, // [12:12] is the sub-list for extension type_name 12, // [12:12] is the sub-list for extension extendee 0, // [0:12] is the sub-list for field type_name @@ -1083,7 +1133,7 @@ func file_proto_logtail_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_logtail_proto_rawDesc), len(file_proto_logtail_proto_rawDesc)), NumEnums: 4, - NumMessages: 12, + NumMessages: 13, NumExtensions: 0, NumServices: 1, }, diff --git a/proto/logtailpb/logtail_grpc.pb.go b/proto/logtailpb/logtail_grpc.pb.go index 520faea..66a48bd 100644 --- a/proto/logtailpb/logtail_grpc.pb.go +++ b/proto/logtailpb/logtail_grpc.pb.go @@ -23,6 +23,7 @@ const ( LogtailService_Trend_FullMethodName = "/logtail.LogtailService/Trend" LogtailService_StreamSnapshots_FullMethodName = "/logtail.LogtailService/StreamSnapshots" LogtailService_ListTargets_FullMethodName = "/logtail.LogtailService/ListTargets" + LogtailService_DumpSnapshots_FullMethodName = "/logtail.LogtailService/DumpSnapshots" ) // LogtailServiceClient is the client API for LogtailService service. @@ -33,6 +34,7 @@ type LogtailServiceClient interface { Trend(ctx context.Context, in *TrendRequest, opts ...grpc.CallOption) (*TrendResponse, error) StreamSnapshots(ctx context.Context, in *SnapshotRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Snapshot], error) ListTargets(ctx context.Context, in *ListTargetsRequest, opts ...grpc.CallOption) (*ListTargetsResponse, error) + DumpSnapshots(ctx context.Context, in *DumpSnapshotsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Snapshot], error) } type logtailServiceClient struct { @@ -92,6 +94,25 @@ func (c *logtailServiceClient) ListTargets(ctx context.Context, in *ListTargetsR return out, nil } +func (c *logtailServiceClient) DumpSnapshots(ctx context.Context, in *DumpSnapshotsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Snapshot], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &LogtailService_ServiceDesc.Streams[1], LogtailService_DumpSnapshots_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[DumpSnapshotsRequest, Snapshot]{ClientStream: stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type LogtailService_DumpSnapshotsClient = grpc.ServerStreamingClient[Snapshot] + // LogtailServiceServer is the server API for LogtailService service. // All implementations must embed UnimplementedLogtailServiceServer // for forward compatibility. @@ -100,6 +121,7 @@ type LogtailServiceServer interface { Trend(context.Context, *TrendRequest) (*TrendResponse, error) StreamSnapshots(*SnapshotRequest, grpc.ServerStreamingServer[Snapshot]) error ListTargets(context.Context, *ListTargetsRequest) (*ListTargetsResponse, error) + DumpSnapshots(*DumpSnapshotsRequest, grpc.ServerStreamingServer[Snapshot]) error mustEmbedUnimplementedLogtailServiceServer() } @@ -122,6 +144,9 @@ func (UnimplementedLogtailServiceServer) StreamSnapshots(*SnapshotRequest, grpc. func (UnimplementedLogtailServiceServer) ListTargets(context.Context, *ListTargetsRequest) (*ListTargetsResponse, error) { return nil, status.Error(codes.Unimplemented, "method ListTargets not implemented") } +func (UnimplementedLogtailServiceServer) DumpSnapshots(*DumpSnapshotsRequest, grpc.ServerStreamingServer[Snapshot]) error { + return status.Error(codes.Unimplemented, "method DumpSnapshots not implemented") +} func (UnimplementedLogtailServiceServer) mustEmbedUnimplementedLogtailServiceServer() {} func (UnimplementedLogtailServiceServer) testEmbeddedByValue() {} @@ -208,6 +233,17 @@ func _LogtailService_ListTargets_Handler(srv interface{}, ctx context.Context, d return interceptor(ctx, in, info, handler) } +func _LogtailService_DumpSnapshots_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(DumpSnapshotsRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(LogtailServiceServer).DumpSnapshots(m, &grpc.GenericServerStream[DumpSnapshotsRequest, Snapshot]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type LogtailService_DumpSnapshotsServer = grpc.ServerStreamingServer[Snapshot] + // LogtailService_ServiceDesc is the grpc.ServiceDesc for LogtailService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -234,6 +270,11 @@ var LogtailService_ServiceDesc = grpc.ServiceDesc{ Handler: _LogtailService_StreamSnapshots_Handler, ServerStreams: true, }, + { + StreamName: "DumpSnapshots", + Handler: _LogtailService_DumpSnapshots_Handler, + ServerStreams: true, + }, }, Metadata: "proto/logtail.proto", }