From 58391f5463ddbff478a0559d6426c813c03f9900 Mon Sep 17 00:00:00 2001 From: Pim van Pelt Date: Sat, 11 Apr 2026 16:42:11 +0200 Subject: [PATCH] Add WatchEvents, enable/disable/weight RPCs, and config check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit gRPC / proto - Rename WatchBackendEvents → WatchEvents; return a stream of Event oneof (LogEvent, BackendEvent, FrontendEvent) with optional filter flags (log, log_level, backend, frontend) - Add EnableBackend, DisableBackend, SetFrontendPoolBackendWeight RPCs - Rename PauseResumeRequest → BackendRequest - Add CheckConfig RPC returning ok/parse_error/semantic_error maglevd - Route slog through a LogBroadcaster (slog.Handler) so WatchEvents subscribers can receive structured log records independently of the daemon's own --log-level - Add --reflection flag (default true) to toggle gRPC server reflection - Add --check flag: validates config file and exits 0/1/2 - SIGHUP: use config.Check before applying reload; log parse vs semantic error separately; refuse reload on any error - Rename default config path /etc/maglev → /etc/vpp-maglev maglevc - Add 'watch events [num ] [log [level ]] [backend] [frontend]' command; prints compact protojson, stops on any keypress or Ctrl-C; uses cbreak mode (not raw) so output post-processing is preserved - Add 'set backend enable|disable' - Add 'set frontend pool backend weight <0-100>' - Add 'config check' command Debian packaging - Rename service unit to vpp-maglevd.service - Rename conffiles to /etc/default/vpp-maglev and /etc/vpp-maglev/ - Create maglevd system user/group in postinst; add to vpp group if present - Add postrm; add adduser to Depends --- README.md | 67 +- cmd/maglevc/commands.go | 158 +++- cmd/maglevc/watch.go | 174 +++++ cmd/maglevd/main.go | 40 +- debian/build-deb.sh | 15 +- debian/conffiles | 4 +- debian/control.in | 2 +- debian/{default.maglev => default.vpp-maglev} | 6 +- debian/postinst | 16 +- debian/postrm | 12 + debian/prerm | 4 +- .../{maglevd.service => vpp-maglevd.service} | 9 +- docs/config-guide.md | 110 +-- docs/maglevd.8 | 19 +- docs/user-guide.md | 36 +- go.mod | 4 +- go.sum | 6 +- internal/checker/checker.go | 86 +++ internal/checker/checker_test.go | 92 +++ internal/config/config.go | 49 +- internal/grpcapi/loghandler.go | 148 ++++ internal/grpcapi/maglev.pb.go | 728 +++++++++++++++--- internal/grpcapi/maglev_grpc.pb.go | 220 +++++- internal/grpcapi/server.go | 146 +++- internal/grpcapi/server_test.go | 153 +++- proto/maglev.proto | 65 +- 26 files changed, 1969 insertions(+), 400 deletions(-) create mode 100644 cmd/maglevc/watch.go rename debian/{default.maglev => default.vpp-maglev} (57%) create mode 100644 debian/postrm rename debian/{maglevd.service => vpp-maglevd.service} (54%) create mode 100644 internal/grpcapi/loghandler.go diff --git a/README.md b/README.md index f7c2328..71ad91f 100644 --- a/README.md +++ b/README.md @@ -2,55 +2,38 @@ Health checker and gRPC control plane for VPP Maglev load balancing. -## Build +## Build and Install ```sh make # builds build//maglevd and build//maglevc make test # runs all tests -make proto # regenerates gRPC stubs from proto/maglev.proto -make lint # runs golangci-lint +make pkg-deb # Creates a debian package for arm64 and amd64 ``` Requires Go 1.25+ and (for `make proto`) `protoc` with `protoc-gen-go` and `protoc-gen-go-grpc`. -## Debian package - -```sh -make pkg-deb -``` - Produces `vpp-maglev__amd64.deb` and `vpp-maglev__arm64.deb` -in the project root by cross-compiling with `GOOS=linux GOARCH=`. +in the `build/` directory by cross-compiling with `GOOS=linux GOARCH=`. Requires `dpkg-deb` (available on any Debian/Ubuntu host). -The package installs: - -| Path | Content | -|---|---| -| `/usr/sbin/maglevd` | Health-checker daemon | -| `/usr/bin/maglevc` | Interactive CLI client | -| `/lib/systemd/system/maglevd.service` | systemd unit | -| `/etc/default/maglev` | Environment file for the unit (conffile) | -| `/etc/maglev/maglev.yaml` | Example configuration file (conffile) | -| `/usr/share/man/man8/maglevd.8.gz` | Man page | -| `/usr/share/man/man1/maglevc.1.gz` | Man page | +## Running After installing, the unit is enabled but not started automatically: ```sh -# edit /etc/maglev/maglev.yaml, then: -systemctl start maglevd +# edit /etc/vpp-maglev/maglev.yaml, then: +systemctl enable --now vpp-maglevd ``` -## Run +Or run the server and client by hand: ```sh -maglevd --config /etc/maglev/maglev.yaml --grpc-addr :9090 +maglevd --config /etc/vpp-maglev/maglev.yaml --grpc-addr :9090 maglevd --version # print version and exit maglevc --server localhost:9090 # interactive shell -maglevc show backend nginx0-ams # one-shot +maglevc show frontends # one-shot maglevc -color=false show backends # one-shot, no ANSI color maglevc set backend nginx0-ams pause ``` @@ -58,35 +41,7 @@ maglevc set backend nginx0-ams pause Send `SIGHUP` to `maglevd` to reload config without restarting. `maglevd` requires `CAP_NET_RAW` for ICMP health checks. -## Minimal config - -```yaml -maglev: - healthchecks: - http: - type: http - port: 80 - params: - path: /healthz - interval: 2s - timeout: 3s - - backends: - web0: {address: 192.0.2.10, healthcheck: http} - web1: {address: 192.0.2.11, healthcheck: http} - - frontends: - web: - address: 192.0.2.1 - protocol: tcp - port: 80 - pools: - - name: primary - backends: - web0: {} - web1: {} -``` - +Check out a minimal configuration file in [[debian/maglev.yaml](debian/maglev.yaml)]. See [docs/user-guide.md](docs/user-guide.md) for flags, signals, and `maglevc` usage. See [docs/config-guide.md](docs/config-guide.md) for the full configuration reference. See [docs/healthchecks.md](docs/healthchecks.md) for health state machine details. @@ -95,5 +50,5 @@ See [docs/healthchecks.md](docs/healthchecks.md) for health state machine detail ```sh docker build -t maglevd . -docker run --cap-add NET_RAW -v /etc/maglev:/etc/maglev maglevd +docker run --cap-add NET_RAW -v /etc/vpp-maglev:/etc/vpp-maglev maglevd ``` diff --git a/cmd/maglevc/commands.go b/cmd/maglevc/commands.go index c727015..8d52e89 100644 --- a/cmd/maglevc/commands.go +++ b/cmd/maglevc/commands.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "os" + "strconv" "strings" "text/tabwriter" "time" @@ -83,8 +84,8 @@ func buildTree() *Node { // set backend pause|resume|disabled|enabled setPause := &Node{Word: "pause", Help: "pause health checking", Run: runPauseBackend} setResume := &Node{Word: "resume", Help: "resume health checking", Run: runResumeBackend} - setDisabled := &Node{Word: "disabled", Help: "disable backend (not implemented)", Run: runNotImplemented} - setEnabled := &Node{Word: "enabled", Help: "enable backend (not implemented)", Run: runNotImplemented} + setDisabled := &Node{Word: "disable", Help: "disable backend (stop probing, remove from rotation)", Run: runDisableBackend} + setEnabled := &Node{Word: "enable", Help: "enable backend (resume probing)", Run: runEnableBackend} setBackendName := &Node{ Word: "", Help: "backend name", @@ -96,9 +97,75 @@ func buildTree() *Node { Help: "modify a backend", Children: []*Node{setBackendName}, } - set.Children = []*Node{setBackend} + // set frontend pool backend weight <0-100> + setWeightValue := &Node{ + Word: "", + Help: "weight 0-100", + Run: runSetFrontendPoolBackendWeight, + } + setFrontendPoolBackendWeight := &Node{Word: "weight", Help: "set backend weight in pool", Children: []*Node{setWeightValue}} + setFrontendPoolBackendName := &Node{ + Word: "", + Help: "backend name", + Dynamic: dynBackends, + Children: []*Node{setFrontendPoolBackendWeight}, + } + setFrontendPoolBackend := &Node{Word: "backend", Help: "select a backend", Children: []*Node{setFrontendPoolBackendName}} + setFrontendPoolName := &Node{ + Word: "", + Help: "pool name", + Children: []*Node{setFrontendPoolBackend}, + } + setFrontendPool := &Node{Word: "pool", Help: "select a pool", Children: []*Node{setFrontendPoolName}} + setFrontendName := &Node{ + Word: "", + Help: "frontend name", + Dynamic: dynFrontends, + Children: []*Node{setFrontendPool}, + } + setFrontend := &Node{ + Word: "frontend", + Help: "modify a frontend", + Children: []*Node{setFrontendName}, + } - root.Children = []*Node{show, set, quit, exit} + set.Children = []*Node{setBackend, setFrontend} + + // watch events [num ] [log [level ]] [backend] [frontend] + // + // All tokens after 'events' are captured as args via a self-referencing slot + // node. This lets runWatchEvents parse the optional flags manually while still + // providing tab-completion through the dynamic enumerator. + var watchEventsOptSlot *Node + watchEventsOptSlot = &Node{ + Word: "", + Help: "watch option", + Dynamic: dynWatchEventOpts, + Run: runWatchEvents, + } + watchEventsOptSlot.Children = []*Node{watchEventsOptSlot} + + watchEvents := &Node{ + Word: "events", + Help: "stream events (press any key or Ctrl-C to stop)", + Run: runWatchEvents, + Children: []*Node{watchEventsOptSlot}, + } + watch := &Node{ + Word: "watch", + Help: "watch live event streams", + Children: []*Node{watchEvents}, + } + + // config check + configCheck := &Node{Word: "check", Help: "check configuration file", Run: runConfigCheck} + configNode := &Node{ + Word: "config", + Help: "configuration commands", + Children: []*Node{configCheck}, + } + + root.Children = []*Node{show, set, watch, configNode, quit, exit} return root } @@ -332,7 +399,7 @@ func runPauseBackend(ctx context.Context, client grpcapi.MaglevClient, args []st } ctx, cancel := context.WithTimeout(ctx, callTimeout) defer cancel() - info, err := client.PauseBackend(ctx, &grpcapi.PauseResumeRequest{Name: args[0]}) + info, err := client.PauseBackend(ctx, &grpcapi.BackendRequest{Name: args[0]}) if err != nil { return err } @@ -346,7 +413,7 @@ func runResumeBackend(ctx context.Context, client grpcapi.MaglevClient, args []s } ctx, cancel := context.WithTimeout(ctx, callTimeout) defer cancel() - info, err := client.ResumeBackend(ctx, &grpcapi.PauseResumeRequest{Name: args[0]}) + info, err := client.ResumeBackend(ctx, &grpcapi.BackendRequest{Name: args[0]}) if err != nil { return err } @@ -354,11 +421,86 @@ func runResumeBackend(ctx context.Context, client grpcapi.MaglevClient, args []s return nil } -func runNotImplemented(_ context.Context, _ grpcapi.MaglevClient, _ []string) error { - fmt.Println("not implemented yet") +func runSetFrontendPoolBackendWeight(ctx context.Context, client grpcapi.MaglevClient, args []string) error { + if len(args) != 4 { + return fmt.Errorf("usage: set frontend pool backend weight <0-100>") + } + frontendName, poolName, backendName, weightStr := args[0], args[1], args[2], args[3] + weight, err := strconv.Atoi(weightStr) + if err != nil || weight < 0 || weight > 100 { + return fmt.Errorf("weight: expected integer 0-100, got %q", weightStr) + } + ctx, cancel := context.WithTimeout(ctx, callTimeout) + defer cancel() + info, err := client.SetFrontendPoolBackendWeight(ctx, &grpcapi.SetWeightRequest{ + Frontend: frontendName, + Pool: poolName, + Backend: backendName, + Weight: int32(weight), + }) + if err != nil { + return err + } + // Print the updated pool so the user can confirm the new weight. + for _, pool := range info.Pools { + if pool.Name != poolName { + continue + } + for _, pb := range pool.Backends { + if pb.Name == backendName { + fmt.Printf("%s pool %s backend %s: weight set to %d\n", info.Name, pool.Name, pb.Name, pb.Weight) + return nil + } + } + } return nil } +func runEnableBackend(ctx context.Context, client grpcapi.MaglevClient, args []string) error { + if len(args) == 0 { + return fmt.Errorf("usage: set backend enable") + } + ctx, cancel := context.WithTimeout(ctx, callTimeout) + defer cancel() + info, err := client.EnableBackend(ctx, &grpcapi.BackendRequest{Name: args[0]}) + if err != nil { + return err + } + fmt.Printf("%s: enabled, state is '%s'\n", info.Name, info.State) + return nil +} + +func runDisableBackend(ctx context.Context, client grpcapi.MaglevClient, args []string) error { + if len(args) == 0 { + return fmt.Errorf("usage: set backend disable") + } + ctx, cancel := context.WithTimeout(ctx, callTimeout) + defer cancel() + info, err := client.DisableBackend(ctx, &grpcapi.BackendRequest{Name: args[0]}) + if err != nil { + return err + } + fmt.Printf("%s: disabled, state is '%s'\n", info.Name, info.State) + return nil +} + +func runConfigCheck(ctx context.Context, client grpcapi.MaglevClient, _ []string) error { + ctx, cancel := context.WithTimeout(ctx, callTimeout) + defer cancel() + resp, err := client.CheckConfig(ctx, &grpcapi.CheckConfigRequest{}) + if err != nil { + return err + } + if resp.Ok { + fmt.Println("config ok") + return nil + } + if resp.ParseError != "" { + return fmt.Errorf("parse error: %s", resp.ParseError) + } + return fmt.Errorf("semantic error: %s", resp.SemanticError) +} + // formatDuration formats a duration as Xd Xh Xm Xs without milliseconds. func formatDuration(d time.Duration) string { if d < 0 { diff --git a/cmd/maglevc/watch.go b/cmd/maglevc/watch.go new file mode 100644 index 0000000..213dd53 --- /dev/null +++ b/cmd/maglevc/watch.go @@ -0,0 +1,174 @@ +// Copyright (c) 2026, Pim van Pelt + +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "strconv" + "syscall" + + "golang.org/x/sys/unix" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protojson" + + "git.ipng.ch/ipng/vpp-maglev/internal/grpcapi" +) + +func dynWatchEventOpts(_ context.Context, _ grpcapi.MaglevClient) []string { + return []string{"num", "log", "backend", "frontend"} +} + +// runWatchEvents implements 'watch events [num ] [log [level ]] [backend] [frontend]'. +// All tokens after 'events' are captured as args by the circular slot node in the tree. +// If none of log/backend/frontend are mentioned, all three default to true. +func runWatchEvents(ctx context.Context, client grpcapi.MaglevClient, args []string) error { + var maxEvents int // 0 = unlimited + var wantLog, wantBackend, wantFrontend bool + logLevel := "" + anyExplicit := false + + for i := 0; i < len(args); { + switch args[i] { + case "num": + if i+1 >= len(args) { + return fmt.Errorf("num requires a count argument") + } + n, err := strconv.Atoi(args[i+1]) + if err != nil || n < 1 { + return fmt.Errorf("num: invalid count %q", args[i+1]) + } + maxEvents = n + i += 2 + case "log": + wantLog = true + anyExplicit = true + if i+1 < len(args) && args[i+1] == "level" { + if i+2 >= len(args) { + return fmt.Errorf("log level requires a level argument") + } + logLevel = args[i+2] + i += 3 + } else { + i++ + } + case "backend": + wantBackend = true + anyExplicit = true + i++ + case "frontend": + wantFrontend = true + anyExplicit = true + i++ + default: + return fmt.Errorf("unknown watch option %q; expected: num, log, backend, frontend", args[i]) + } + } + + if !anyExplicit { + wantLog, wantBackend, wantFrontend = true, true, true + } + + boolp := func(b bool) *bool { v := b; return &v } + req := &grpcapi.WatchRequest{ + Log: boolp(wantLog), + LogLevel: logLevel, + Backend: boolp(wantBackend), + Frontend: boolp(wantFrontend), + } + + // Cancel the stream on keypress or signal. + watchCtx, cancel := context.WithCancel(ctx) + defer cancel() + + go watchStopOnKeypress(watchCtx, cancel) + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + defer signal.Stop(sigCh) + go func() { + select { + case <-sigCh: + cancel() + case <-watchCtx.Done(): + } + }() + + stream, err := client.WatchEvents(watchCtx, req) + if err != nil { + return err + } + + marshaler := protojson.MarshalOptions{} + count := 0 + for { + ev, err := stream.Recv() + if err != nil { + if watchCtx.Err() != nil { + return nil // stopped by keypress or signal + } + if st, ok := status.FromError(err); ok && st.Code() == codes.Canceled { + return nil + } + return err + } + data, err := marshaler.Marshal(ev) + if err != nil { + return fmt.Errorf("marshal event: %w", err) + } + fmt.Printf("%s\n", data) + count++ + if maxEvents > 0 && count >= maxEvents { + return nil + } + } +} + +// watchStopOnKeypress puts stdin into cbreak mode (when it is a terminal) and +// calls cancel when any byte arrives. Cbreak mode disables canonical (line) +// input so a single keypress is sufficient, while preserving output +// post-processing (OPOST/ONLCR) so that fmt.Printf("\n") still produces the +// expected carriage-return+newline on screen. Falls back gracefully when stdin +// is not a tty. The goroutine exits when ctx is cancelled. +func watchStopOnKeypress(ctx context.Context, cancel context.CancelFunc) { + fd := int(os.Stdin.Fd()) + if old, err := stdinCbreak(fd); err == nil { + defer unix.IoctlSetTermios(fd, unix.TCSETSF, old) //nolint:errcheck + } + + readDone := make(chan struct{}) + go func() { + defer close(readDone) + buf := make([]byte, 1) + os.Stdin.Read(buf) //nolint:errcheck + }() + + select { + case <-readDone: + cancel() + case <-ctx.Done(): + } +} + +// stdinCbreak sets the terminal referred to by fd into cbreak mode: canonical +// input and echo are disabled (so single keystrokes are immediately available) +// but output post-processing is left untouched (so \n still maps to \r\n). +// Returns the previous termios so the caller can restore it, or an error if fd +// is not a terminal. +func stdinCbreak(fd int) (*unix.Termios, error) { + old, err := unix.IoctlGetTermios(fd, unix.TCGETS) + if err != nil { + return nil, err // not a terminal + } + t := *old + t.Lflag &^= unix.ICANON | unix.ECHO | unix.ECHOE | unix.ECHOK | unix.ECHONL + t.Cc[unix.VMIN] = 1 + t.Cc[unix.VTIME] = 0 + if err := unix.IoctlSetTermios(fd, unix.TCSETS, &t); err != nil { + return nil, err + } + return old, nil +} diff --git a/cmd/maglevd/main.go b/cmd/maglevd/main.go index 6b6e362..3cde175 100644 --- a/cmd/maglevd/main.go +++ b/cmd/maglevd/main.go @@ -13,6 +13,7 @@ import ( "syscall" "google.golang.org/grpc" + "google.golang.org/grpc/reflection" buildinfo "git.ipng.ch/ipng/vpp-maglev/cmd" "git.ipng.ch/ipng/vpp-maglev/internal/checker" @@ -30,7 +31,9 @@ func main() { func run() error { // ---- flags / env -------------------------------------------------------- printVersion := flag.Bool("version", false, "print version and exit") - configPath := stringFlag("config", "/etc/maglev/frontend.yaml", "MAGLEV_CONFIG", "path to frontend.yaml") + checkOnly := flag.Bool("check", false, "check config file and exit (0=ok, 1=parse error, 2=semantic error)") + enableReflection := flag.Bool("reflection", true, "enable gRPC server reflection (for grpcurl)") + configPath := stringFlag("config", "/etc/vpp-maglev/maglev.yaml", "MAGLEV_CONFIG", "path to maglev.yaml") grpcAddr := stringFlag("grpc-addr", ":9090", "MAGLEV_GRPC_ADDR", "gRPC listen address") logLevel := stringFlag("log-level", "info", "MAGLEV_LOG_LEVEL", "log level (debug|info|warn|error)") flag.Parse() @@ -41,12 +44,28 @@ func run() error { return nil } + if *checkOnly { + _, result := config.Check(*configPath) + if result.OK() { + fmt.Printf("config ok: %s\n", *configPath) + return nil + } + if result.ParseError != "" { + fmt.Fprintf(os.Stderr, "parse error: %s\n", result.ParseError) + os.Exit(1) + } + fmt.Fprintf(os.Stderr, "semantic error: %s\n", result.SemanticError) + os.Exit(2) + } + // ---- logging ------------------------------------------------------------ var level slog.Level if err := level.UnmarshalText([]byte(*logLevel)); err != nil { return fmt.Errorf("invalid log level %q: %w", *logLevel, err) } - slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: level}))) + jsonHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: level}) + logBroadcaster := grpcapi.NewLogBroadcaster(jsonHandler) + slog.SetDefault(slog.New(logBroadcaster)) slog.Info("starting", "version", buildinfo.Version(), "commit", buildinfo.Commit(), "date", buildinfo.Date()) // ---- config ------------------------------------------------------------- @@ -74,8 +93,11 @@ func run() error { return fmt.Errorf("listen %s: %w", *grpcAddr, err) } srv := grpc.NewServer() - grpcapi.RegisterMaglevServer(srv, grpcapi.NewServer(ctx, chkr)) - slog.Info("grpc-listening", "addr", *grpcAddr) + grpcapi.RegisterMaglevServer(srv, grpcapi.NewServer(ctx, chkr, logBroadcaster, *configPath)) + if *enableReflection { + reflection.Register(srv) + } + slog.Info("grpc-listening", "addr", *grpcAddr, "reflection", *enableReflection) go func() { if err := srv.Serve(lis); err != nil { @@ -91,9 +113,13 @@ func run() error { switch sig { case syscall.SIGHUP: slog.Info("config-reload-start") - newCfg, err := config.Load(*configPath) - if err != nil { - slog.Error("config-reload-error", "err", err) + newCfg, result := config.Check(*configPath) + if !result.OK() { + if result.ParseError != "" { + slog.Error("config-check-failed", "type", "parse", "err", result.ParseError) + } else { + slog.Error("config-check-failed", "type", "semantic", "err", result.SemanticError) + } continue } if err := chkr.Reload(ctx, newCfg); err != nil { diff --git a/debian/build-deb.sh b/debian/build-deb.sh index 090e11b..73391a8 100755 --- a/debian/build-deb.sh +++ b/debian/build-deb.sh @@ -22,7 +22,7 @@ install -d "$STAGING/usr/share/man/man1" install -d "$STAGING/usr/share/man/man8" install -d "$STAGING/lib/systemd/system" install -d "$STAGING/etc/default" -install -d "$STAGING/etc/maglev" +install -d "$STAGING/etc/vpp-maglev" install -d "$STAGING/DEBIAN" # Binaries @@ -34,22 +34,23 @@ gzip -9 -c "$REPO_ROOT/docs/maglevd.8" > "$STAGING/usr/share/man/man8/maglevd.8. gzip -9 -c "$REPO_ROOT/docs/maglevc.1" > "$STAGING/usr/share/man/man1/maglevc.1.gz" # Systemd unit -install -m 644 "$REPO_ROOT/debian/maglevd.service" "$STAGING/lib/systemd/system/maglevd.service" +install -m 644 "$REPO_ROOT/debian/vpp-maglevd.service" "$STAGING/lib/systemd/system/vpp-maglevd.service" -# /etc/default/maglev (conffile — dpkg won't overwrite on upgrade) -install -m 644 "$REPO_ROOT/debian/default.maglev" "$STAGING/etc/default/maglev" +# /etc/default/vpp-maglev (conffile — dpkg won't overwrite on upgrade) +install -m 644 "$REPO_ROOT/debian/default.vpp-maglev" "$STAGING/etc/default/vpp-maglev" -# /etc/maglev/maglev.yaml (conffile) -install -m 644 "$REPO_ROOT/debian/maglev.yaml" "$STAGING/etc/maglev/maglev.yaml" +# /etc/vpp-maglev/maglev.yaml (conffile) +install -m 644 "$REPO_ROOT/debian/maglev.yaml" "$STAGING/etc/vpp-maglev/maglev.yaml" # DEBIAN/control (version field uses full_version including commit) sed "s/@VERSION@/${FULL_VERSION}/;s/@ARCH@/${ARCH}/" \ "$REPO_ROOT/debian/control.in" > "$STAGING/DEBIAN/control" -# DEBIAN/conffiles, postinst, prerm +# DEBIAN/conffiles, postinst, prerm, postrm install -m 644 "$REPO_ROOT/debian/conffiles" "$STAGING/DEBIAN/conffiles" install -m 755 "$REPO_ROOT/debian/postinst" "$STAGING/DEBIAN/postinst" install -m 755 "$REPO_ROOT/debian/prerm" "$STAGING/DEBIAN/prerm" +install -m 755 "$REPO_ROOT/debian/postrm" "$STAGING/DEBIAN/postrm" # Emit package into build/ mkdir -p "$REPO_ROOT/build" diff --git a/debian/conffiles b/debian/conffiles index 8d217c4..b8147db 100644 --- a/debian/conffiles +++ b/debian/conffiles @@ -1,2 +1,2 @@ -/etc/default/maglev -/etc/maglev/maglev.yaml +/etc/default/vpp-maglev +/etc/vpp-maglev/maglev.yaml diff --git a/debian/control.in b/debian/control.in index f70ab89..bb88a62 100644 --- a/debian/control.in +++ b/debian/control.in @@ -4,7 +4,7 @@ Architecture: @ARCH@ Maintainer: Pim van Pelt Section: net Priority: optional -Depends: systemd +Depends: systemd, adduser Description: Maglev health-checker daemon and CLI client maglevd monitors backends (HTTP, TCP, ICMP) with a rise/fall counter model and exposes their aggregated state over a gRPC API. Configuration diff --git a/debian/default.maglev b/debian/default.vpp-maglev similarity index 57% rename from debian/default.maglev rename to debian/default.vpp-maglev index 95a7afd..80f10d5 100644 --- a/debian/default.maglev +++ b/debian/default.vpp-maglev @@ -1,9 +1,9 @@ # Default settings for maglevd. -# This file is sourced by /lib/systemd/system/maglevd.service. -# After editing, run: systemctl restart maglevd +# This file is sourced by /lib/systemd/system/vpp-maglevd.service. +# After editing, run: systemctl restart vpp-maglevd # Path to the YAML configuration file. -MAGLEV_CONFIG=/etc/maglev/maglev.yaml +MAGLEV_CONFIG=/etc/vpp-maglev/maglev.yaml # gRPC listen address (default: :9090) #MAGLEV_GRPC_ADDR=:9090 diff --git a/debian/postinst b/debian/postinst index 8ec3b92..b44dd14 100644 --- a/debian/postinst +++ b/debian/postinst @@ -2,7 +2,21 @@ set -e case "$1" in configure) + # Create system user and group if they don't exist. + if ! getent group maglevd > /dev/null 2>&1; then + addgroup --system --quiet maglevd + fi + if ! getent passwd maglevd > /dev/null 2>&1; then + adduser --system --no-create-home --shell /usr/sbin/nologin \ + --ingroup maglevd --quiet maglevd + fi + + # Add maglevd to vpp group if it exists (needed for VPP API socket access). + if getent group vpp > /dev/null 2>&1; then + adduser --quiet maglevd vpp || true + fi + systemctl daemon-reload || true - systemctl enable maglevd.service || true + systemctl enable vpp-maglevd.service || true ;; esac diff --git a/debian/postrm b/debian/postrm new file mode 100644 index 0000000..611fba5 --- /dev/null +++ b/debian/postrm @@ -0,0 +1,12 @@ +#!/bin/sh +set -e +case "$1" in + purge) + if getent passwd maglevd > /dev/null 2>&1; then + deluser --quiet --system maglevd || true + fi + if getent group maglevd > /dev/null 2>&1; then + delgroup --quiet --system maglevd || true + fi + ;; +esac diff --git a/debian/prerm b/debian/prerm index dfce330..c1f4aaa 100644 --- a/debian/prerm +++ b/debian/prerm @@ -2,7 +2,7 @@ set -e case "$1" in remove|purge) - systemctl stop maglevd.service || true - systemctl disable maglevd.service || true + systemctl stop vpp-maglevd.service || true + systemctl disable vpp-maglevd.service || true ;; esac diff --git a/debian/maglevd.service b/debian/vpp-maglevd.service similarity index 54% rename from debian/maglevd.service rename to debian/vpp-maglevd.service index c481e2b..f70df30 100644 --- a/debian/maglevd.service +++ b/debian/vpp-maglevd.service @@ -5,11 +5,18 @@ After=network-online.target Wants=network-online.target [Service] -EnvironmentFile=/etc/default/maglev +User=maglevd +Group=maglevd +EnvironmentFile=/etc/default/vpp-maglev ExecStart=/usr/sbin/maglevd --config ${MAGLEV_CONFIG} Restart=on-failure RestartSec=5s Type=simple +# Grant only CAP_NET_RAW (needed for ICMP probes) and drop everything else. +AmbientCapabilities=CAP_NET_RAW +CapabilityBoundingSet=CAP_NET_RAW +NoNewPrivileges=yes + [Install] WantedBy=multi-user.target diff --git a/docs/config-guide.md b/docs/config-guide.md index 771d182..3caf84a 100644 --- a/docs/config-guide.md +++ b/docs/config-guide.md @@ -11,7 +11,7 @@ in two stages: ensuring that every backend referenced by a frontend exists, that address families are consistent within a frontend, and that IP source addresses are the correct family. -If you want to get started quickly, take a look at the [example config](#example). +If you want to get started quickly, take a look at the [[example config](../debian/mavleg.yaml)]. ## Basic structure @@ -277,109 +277,7 @@ frontends: --- -## Example +For a detailed description of the health state machine, probe intervals, and all transition events, +see [[healthchecks.md](healthchecks.md)]. For a user guide on how to use the maglev daemon and client, +see the [[user-guide.md](user-guide.md)]. -A complete configuration tying all sections together: - -```yaml -maglev: - healthchecker: - transition-history: 5 - netns: dataplane - - healthchecks: - nginx: - type: http - port: 80 - params: - path: /healthz - host: nginx.example.com - response-code: "200" - interval: 2s - fast-interval: 500ms - down-interval: 30s - timeout: 3s - rise: 2 - fall: 3 - - dovecot: - type: tcp - port: 993 - params: - ssl: true - server-name: imaps.example.com - interval: 5s - fast-interval: 1s - down-interval: 30s - timeout: 3s - rise: 2 - fall: 3 - - ping6: - type: icmp - probe-ipv6-src: 2001:db8:probe::1 - interval: 2s - timeout: 1s - - backends: - nginx0-ams: - address: 198.51.100.10 - healthcheck: nginx - nginx0-lon: - address: 198.51.100.11 - healthcheck: nginx - nginx0-fra: - address: 198.51.100.12 - healthcheck: nginx - maildrop0-ams: - address: 2001:db8:1::10 - healthcheck: dovecot - maildrop0-lon: - address: 2001:db8:1::11 - healthcheck: dovecot - - frontends: - nginx-http: - description: "HTTP VIP with fallback" - address: 198.51.100.1 - protocol: tcp - port: 80 - pools: - - name: primary - backends: - nginx0-ams: { weight: 10 } - nginx0-lon: {} - - name: fallback - backends: - nginx0-fra: {} - - nginx-https: - description: "HTTPS VIP — same backends, different port" - address: 198.51.100.1 - protocol: tcp - port: 443 - pools: - - name: primary - backends: - nginx0-ams: { weight: 10 } - nginx0-lon: {} - - name: fallback - backends: - nginx0-fra: {} - - maildrop-imaps: - description: "IMAPS VIP" - address: 2001:db8::1 - protocol: tcp - port: 993 - pools: - - name: primary - backends: - maildrop0-ams: {} - maildrop0-lon: {} -``` - ---- - -For a detailed description of the health state machine, probe intervals, and all -transition events, see [healthchecks.md](healthchecks.md). diff --git a/docs/maglevd.8 b/docs/maglevd.8 index c57109f..c9eed51 100644 --- a/docs/maglevd.8 +++ b/docs/maglevd.8 @@ -6,6 +6,7 @@ maglevd \- Maglev health\-checker daemon [\fB\-config\fR \fIfile\fR] [\fB\-grpc\-addr\fR \fIaddr\fR] [\fB\-log\-level\fR \fIlevel\fR] +[\fB\-reflection\fR[=\fIbool\fR]] [\fB\-version\fR] .SH DESCRIPTION .B maglevd @@ -32,7 +33,7 @@ parentheses); the flag takes precedence. .TP .BI \-config " file" Path to the YAML configuration file. -.RI "(default: " /etc/maglev/maglev.conf "; env: " MAGLEV_CONFIG ) +.RI "(default: " /etc/vpp-maglev/maglev.yaml "; env: " MAGLEV_CONFIG ) .TP .BI \-grpc\-addr " addr" TCP address on which the gRPC server listens. @@ -47,6 +48,16 @@ or .BR error . .RI "(default: " info "; env: " MAGLEV_LOG_LEVEL ) .TP +.B \-reflection +Enable gRPC server reflection so that clients such as +.BR grpcurl (1) +can introspect the API without access to the +.I .proto +file. +Enabled by default; pass +.B \-reflection=false +to disable. +.TP .B \-version Print version, commit hash, and build date, then exit. .SH SIGNALS @@ -60,10 +71,10 @@ backend workers are left running. Gracefully shut down: drain active gRPC streams, then exit. .SH FILES .TP -.I /etc/maglev/maglev.conf +.I /etc/vpp-maglev/maglev.yaml Default configuration file (YAML). .TP -.I /etc/default/maglev +.I /etc/default/vpp-maglev Environment file sourced by the systemd unit before starting .BR maglevd . .SH CONFIGURATION @@ -77,7 +88,7 @@ and .BR frontends . .PP See the example at -.I /etc/maglev/maglev.conf +.I /etc/vpp-maglev/maglev.yaml and the full reference in the project documentation. .SH SEE ALSO .BR maglevc (1) diff --git a/docs/user-guide.md b/docs/user-guide.md index db7e3dc..2489204 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -10,9 +10,11 @@ inspection and control. | Flag | Environment variable | Default | Description | |---|---|---|---| -| `--config` | `MAGLEV_CONFIG` | `/etc/maglev/maglev.yaml` | Path to the YAML configuration file. | +| `--config` | `MAGLEV_CONFIG` | `/etc/vpp-maglev/maglev.yaml` | Path to the YAML configuration file. | | `--grpc-addr` | `MAGLEV_GRPC_ADDR` | `:9090` | TCP address on which the gRPC server listens. | | `--log-level` | `MAGLEV_LOG_LEVEL` | `info` | Log verbosity: `debug`, `info`, `warn`, or `error`. | +| `--check` | — | — | Read and validate the config file, then exit. Exits 0 if the config is valid, 1 on YAML parse error, 2 on semantic error. | +| `--reflection` | — | `true` | Enable gRPC server reflection. Allows `grpcurl` to introspect the API without the `.proto` file. Set to `false` to disable. | | `--version` | — | — | Print version, commit hash, and build date, then exit. | Flags take precedence over environment variables. Both are optional; defaults @@ -22,7 +24,7 @@ are used for anything not set. | Signal | Effect | |---|---| -| `SIGHUP` | Reload the configuration file. New backends are started, removed backends are stopped, backends whose health-check config is unchanged continue probing without interruption. | +| `SIGHUP` | Reload the configuration file. The file is checked before applying; if there is a parse or semantic error the reload is aborted and the error is logged (the daemon continues running with its current config). New backends are started, removed backends are stopped, backends whose health-check config is unchanged continue probing without interruption. | | `SIGTERM` / `SIGINT` | Graceful shutdown. Active gRPC streams are closed, the server drains, then the process exits. | ### Capabilities @@ -83,6 +85,36 @@ show healthcheck Show full health-check configuration. set backend pause Suspend health checking for a backend, freezing its state. set backend resume Resume health checking; backend re-enters unknown state and is probed immediately. +set frontend pool backend weight <0-100> + Set the weight of a backend within a pool. Weight 0 keeps + the backend in the pool but assigns it no traffic. + Takes effect immediately without reloading configuration. + +set backend disable Stop probing entirely and remove the backend from rotation. + The backend remains visible (state: removed) and can be + re-enabled without reloading configuration. +set backend enable Re-enable a disabled backend. A fresh probe goroutine is + started and the backend re-enters unknown state. + +watch events Stream all events (log, backend transitions, frontend) + [num ] Stop after receiving n events. + [log [level ]] Include log events. level is debug|info|warn|error + (default: info). Omitting log/backend/frontend enables all. + [backend] Include backend transition events. + [frontend] Include frontend events (reserved for future use). + + Each event is printed as compact JSON on its own line. + Press any key or Ctrl-C to stop. Examples: + + watch events + watch events num 20 + watch events log level debug + watch events backend num 100 + watch events log level debug backend + +config check Ask maglevd to read and validate its current config file. + Prints "config ok" on success, or the error (parse or + semantic) returned by the daemon. quit / exit Leave the interactive shell. ``` diff --git a/go.mod b/go.mod index 8097e04..b48d2ef 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,16 @@ module git.ipng.ch/ipng/vpp-maglev go 1.25.0 require ( + github.com/chzyer/readline v1.5.1 github.com/vishvananda/netns v0.0.5 golang.org/x/net v0.52.0 + golang.org/x/sys v0.43.0 google.golang.org/grpc v1.80.0 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 ) require ( - github.com/chzyer/readline v1.5.1 // indirect - golang.org/x/sys v0.42.0 // indirect golang.org/x/text v0.35.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect ) diff --git a/go.sum b/go.sum index 5816856..129e2fd 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,10 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.2.1 h1:XHDu3E6q+gdHgsdTPH6ImJMIp436vR6MPtH8gP05QzM= github.com/chzyer/logex v1.2.1/go.mod h1:JLbx6lG2kDbNRFnfkgvh4eRJRPX1QCoOIWomwysCBrQ= github.com/chzyer/readline v1.5.1 h1:upd/6fQk4src78LMRzh5vItIt361/o4uq553V8B5sGI= github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk= +github.com/chzyer/test v1.0.0 h1:p3BQDXSxOhOG0P9z6/hGnII4LGiEPOYBhs8asl/fC04= github.com/chzyer/test v1.0.0/go.mod h1:2JlltgoNkt4TW/z9V/IzDdFaMTM2JPIi26O1pF38GC8= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -31,8 +33,8 @@ go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4Etq golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= -golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 7819344..99c1feb 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -4,6 +4,7 @@ package checker import ( "context" + "fmt" "log/slog" "net" "sort" @@ -42,6 +43,7 @@ type worker struct { // Each backend is probed exactly once, regardless of how many frontends // reference it. type Checker struct { + runCtx context.Context // set in Run; used by EnableBackend to start new goroutines cfg *config.Config mu sync.RWMutex workers map[string]*worker // keyed by backend name @@ -67,6 +69,7 @@ func (c *Checker) Run(ctx context.Context) error { go c.fanOut(ctx) c.mu.Lock() + c.runCtx = ctx // safe: held under mu before any EnableBackend call can read it names := activeBackendNames(c.cfg) maxHistory := c.cfg.HealthChecker.TransitionHistory for i, name := range names { @@ -167,6 +170,36 @@ func (c *Checker) GetFrontend(name string) (config.Frontend, bool) { return v, ok } +// SetFrontendPoolBackendWeight updates the weight of a backend within a named +// pool of a frontend. Returns the updated FrontendInfo and a descriptive error +// if the frontend, pool, or backend is not found or the weight is out of range. +func (c *Checker) SetFrontendPoolBackendWeight(frontendName, poolName, backendName string, weight int) (config.Frontend, error) { + if weight < 0 || weight > 100 { + return config.Frontend{}, fmt.Errorf("weight %d out of range [0, 100]", weight) + } + c.mu.Lock() + defer c.mu.Unlock() + fe, ok := c.cfg.Frontends[frontendName] + if !ok { + return config.Frontend{}, fmt.Errorf("frontend %q not found", frontendName) + } + for i, pool := range fe.Pools { + if pool.Name != poolName { + continue + } + pb, ok := pool.Backends[backendName] + if !ok { + return config.Frontend{}, fmt.Errorf("backend %q not found in pool %q", backendName, poolName) + } + pb.Weight = weight + fe.Pools[i].Backends[backendName] = pb + c.cfg.Frontends[frontendName] = fe + slog.Info("frontend-pool-weight", "frontend", frontendName, "pool", poolName, "backend", backendName, "weight", weight) + return fe, nil + } + return config.Frontend{}, fmt.Errorf("pool %q not found in frontend %q", poolName, frontendName) +} + // ListHealthChecks returns the names of all configured health checks, sorted. func (c *Checker) ListHealthChecks() []string { c.mu.RLock() @@ -278,6 +311,59 @@ func (c *Checker) ResumeBackend(name string) (BackendSnapshot, bool) { return BackendSnapshot{Health: w.backend, Config: w.entry}, true } +// DisableBackend stops health checking for a backend and removes it from active +// rotation. The worker entry is kept in the map so the backend remains visible +// via GetBackend and can be re-enabled with EnableBackend. +func (c *Checker) DisableBackend(name string) (BackendSnapshot, bool) { + c.mu.Lock() + defer c.mu.Unlock() + w, ok := c.workers[name] + if !ok { + return BackendSnapshot{}, false + } + if !w.entry.Enabled { + return BackendSnapshot{Health: w.backend, Config: w.entry}, true + } + maxHistory := c.cfg.HealthChecker.TransitionHistory + t := w.backend.Remove(maxHistory) + slog.Info("backend-disable", "backend", name) + c.emitForBackend(name, w.backend.Address, t, c.cfg.Frontends) + w.cancel() + w.entry.Enabled = false + if b, ok := c.cfg.Backends[name]; ok { + b.Enabled = false + c.cfg.Backends[name] = b + } + return BackendSnapshot{Health: w.backend, Config: w.entry}, true +} + +// EnableBackend re-enables a previously disabled backend. A fresh probe +// goroutine is started and the backend re-enters StateUnknown. +func (c *Checker) EnableBackend(name string) (BackendSnapshot, bool) { + c.mu.Lock() + defer c.mu.Unlock() + w, ok := c.workers[name] + if !ok { + return BackendSnapshot{}, false + } + if w.entry.Enabled { + return BackendSnapshot{Health: w.backend, Config: w.entry}, true + } + entry := w.entry + entry.Enabled = true + if b, ok := c.cfg.Backends[name]; ok { + b.Enabled = true + c.cfg.Backends[name] = b + } + maxHistory := c.cfg.HealthChecker.TransitionHistory + hc := c.cfg.HealthChecks[entry.HealthCheck] + slog.Info("backend-enable", "backend", name) + c.startWorker(c.runCtx, name, entry, hc, 0, 1, maxHistory) + nw := c.workers[name] + c.emitForBackend(name, nw.backend.Address, nw.backend.Transitions[0], c.cfg.Frontends) + return BackendSnapshot{Health: nw.backend, Config: nw.entry}, true +} + // ---- internal -------------------------------------------------------------- // startWorker creates a Backend and launches a probe goroutine. diff --git a/internal/checker/checker_test.go b/internal/checker/checker_test.go index 28de8d7..037abf8 100644 --- a/internal/checker/checker_test.go +++ b/internal/checker/checker_test.go @@ -246,6 +246,98 @@ func TestSubscribe(t *testing.T) { } } +func TestSetFrontendPoolBackendWeight(t *testing.T) { + cfg := makeTestConfig(time.Hour, 3, 2) + c := New(cfg) + + // Valid weight change. + fe, err := c.SetFrontendPoolBackendWeight("web", "primary", "be0", 42) + if err != nil { + t.Fatalf("SetFrontendPoolBackendWeight: %v", err) + } + if fe.Pools[0].Backends["be0"].Weight != 42 { + t.Errorf("weight: got %d, want 42", fe.Pools[0].Backends["be0"].Weight) + } + // Persisted in live config. + got, _ := c.GetFrontend("web") + if got.Pools[0].Backends["be0"].Weight != 42 { + t.Errorf("config weight: got %d, want 42", got.Pools[0].Backends["be0"].Weight) + } + + // Out-of-range weight. + if _, err := c.SetFrontendPoolBackendWeight("web", "primary", "be0", 101); err == nil { + t.Error("expected error for weight 101") + } + + // Unknown frontend. + if _, err := c.SetFrontendPoolBackendWeight("nope", "primary", "be0", 50); err == nil { + t.Error("expected error for unknown frontend") + } + + // Unknown pool. + if _, err := c.SetFrontendPoolBackendWeight("web", "nope", "be0", 50); err == nil { + t.Error("expected error for unknown pool") + } + + // Unknown backend. + if _, err := c.SetFrontendPoolBackendWeight("web", "primary", "nope", 50); err == nil { + t.Error("expected error for unknown backend in pool") + } +} + +func TestEnableDisable(t *testing.T) { + cfg := makeTestConfig(time.Hour, 3, 2) + c := New(cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go c.fanOut(ctx) + + // Seed a worker as EnableBackend/DisableBackend require one in c.workers. + wCtx, wCancel := context.WithCancel(ctx) + c.mu.Lock() + c.runCtx = ctx + c.workers["be0"] = &worker{ + backend: health.New("be0", net.ParseIP("10.0.0.2"), 2, 3), + hc: cfg.HealthChecks["icmp"], + entry: cfg.Backends["be0"], + cancel: wCancel, + } + c.mu.Unlock() + _ = wCtx + + b, ok := c.DisableBackend("be0") + if !ok { + t.Fatal("DisableBackend: not found") + } + if b.Health.State != health.StateRemoved { + t.Errorf("after disable: state=%s, want removed", b.Health.State) + } + if b.Config.Enabled { + t.Error("after disable: Enabled should be false") + } + + // Backend should still be visible after disable. + snap, ok := c.GetBackend("be0") + if !ok { + t.Fatal("GetBackend after disable: not found") + } + if snap.Config.Enabled { + t.Error("GetBackend after disable: Enabled should be false") + } + + b, ok = c.EnableBackend("be0") + if !ok { + t.Fatal("EnableBackend: not found") + } + if b.Health.State != health.StateUnknown { + t.Errorf("after enable: state=%s, want unknown", b.Health.State) + } + if !b.Config.Enabled { + t.Error("after enable: Enabled should be true") + } +} + func TestPauseResume(t *testing.T) { cfg := makeTestConfig(time.Hour, 3, 2) c := New(cfg) diff --git a/internal/config/config.go b/internal/config/config.go index f6f4319..dfcf4dd 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -156,21 +156,58 @@ type rawFrontend struct { Pools []rawPool `yaml:"pools"` } -// ---- Load ------------------------------------------------------------------ +// ---- Check / Load ---------------------------------------------------------- + +// CheckResult holds the outcome of a config file validation. Exactly one of +// ParseError and SemanticError is non-empty when the config is invalid; both +// are empty on success. +type CheckResult struct { + ParseError string // YAML could not be read or parsed + SemanticError string // YAML parsed but semantic validation failed +} + +// OK reports whether the config is valid. +func (r CheckResult) OK() bool { + return r.ParseError == "" && r.SemanticError == "" +} + +// Check reads and validates the config file at path, returning the parsed +// Config (nil on failure) and a CheckResult that distinguishes YAML parse +// errors from semantic validation errors. +func Check(path string) (*Config, CheckResult) { + data, err := os.ReadFile(path) + if err != nil { + return nil, CheckResult{ParseError: fmt.Sprintf("read %q: %v", path, err)} + } + var raw rawConfig + if err := yaml.Unmarshal(data, &raw); err != nil { + return nil, CheckResult{ParseError: fmt.Sprintf("parse yaml: %v", err)} + } + cfg, err := convert(&raw.Maglev) + if err != nil { + return nil, CheckResult{SemanticError: err.Error()} + } + return cfg, CheckResult{} +} // Load reads and validates the config file at path. func Load(path string) (*Config, error) { - data, err := os.ReadFile(path) - if err != nil { - return nil, fmt.Errorf("read config %q: %w", path, err) + cfg, result := Check(path) + if !result.OK() { + if result.ParseError != "" { + return nil, fmt.Errorf("%s", result.ParseError) + } + return nil, fmt.Errorf("%s", result.SemanticError) } - return parse(data) + return cfg, nil } +// parse unmarshals raw YAML bytes and converts them into a validated Config. +// Used by tests; production code goes through Check or Load. func parse(data []byte) (*Config, error) { var raw rawConfig if err := yaml.Unmarshal(data, &raw); err != nil { - return nil, fmt.Errorf("parse yaml: %w", err) + return nil, fmt.Errorf("parse yaml: %v", err) } return convert(&raw.Maglev) } diff --git a/internal/grpcapi/loghandler.go b/internal/grpcapi/loghandler.go new file mode 100644 index 0000000..acf8b8e --- /dev/null +++ b/internal/grpcapi/loghandler.go @@ -0,0 +1,148 @@ +// Copyright (c) 2026, Pim van Pelt + +package grpcapi + +import ( + "context" + "log/slog" + "sync" +) + +// logSub is a single WatchEvents subscriber interested in log events. +type logSub struct { + minLevel slog.Level + ch chan *LogEvent +} + +// broadcasterState holds the shared subscription registry. It is referenced +// by pointer so that copies returned from WithAttrs/WithGroup share the same set. +type broadcasterState struct { + mu sync.Mutex + nextID int + subs map[int]*logSub +} + +func (s *broadcasterState) subscribe(minLevel slog.Level) (<-chan *LogEvent, func()) { + s.mu.Lock() + id := s.nextID + s.nextID++ + sub := &logSub{minLevel: minLevel, ch: make(chan *LogEvent, 256)} + s.subs[id] = sub + s.mu.Unlock() + return sub.ch, func() { + s.mu.Lock() + delete(s.subs, id) + close(sub.ch) + s.mu.Unlock() + } +} + +// hasSubscriberAt reports whether any subscriber wants records at level or above. +func (s *broadcasterState) hasSubscriberAt(level slog.Level) bool { + s.mu.Lock() + defer s.mu.Unlock() + for _, sub := range s.subs { + if level >= sub.minLevel { + return true + } + } + return false +} + +func (s *broadcasterState) fanOut(level slog.Level, ev *LogEvent) { + s.mu.Lock() + for _, sub := range s.subs { + if level >= sub.minLevel { + select { + case sub.ch <- ev: + default: + // slow subscriber — drop rather than block + } + } + } + s.mu.Unlock() +} + +// LogBroadcaster implements slog.Handler. It forwards every record to an +// inner handler (e.g. the JSON stdout handler) and simultaneously fans out +// structured LogEvent messages to all gRPC WatchEvents subscribers. +type LogBroadcaster struct { + inner slog.Handler + preAttrs []*LogAttr // pre-resolved attrs from WithAttrs calls + groupPfx string // key prefix accumulated by WithGroup calls + shared *broadcasterState +} + +// NewLogBroadcaster wraps inner and returns a LogBroadcaster ready for use +// as the process slog default handler. +func NewLogBroadcaster(inner slog.Handler) *LogBroadcaster { + return &LogBroadcaster{ + inner: inner, + shared: &broadcasterState{subs: make(map[int]*logSub)}, + } +} + +// Subscribe registers a subscriber that receives LogEvents at or above +// minLevel. The returned channel is closed when the cancel func is called. +func (b *LogBroadcaster) Subscribe(minLevel slog.Level) (<-chan *LogEvent, func()) { + return b.shared.subscribe(minLevel) +} + +// Enabled implements slog.Handler. It returns true when either the inner +// handler wants the record OR at least one gRPC subscriber has a minLevel at +// or below level. This allows a WatchEvents client requesting debug log events +// to receive them even when maglevd's own -log-level is set higher (e.g. error). +func (b *LogBroadcaster) Enabled(ctx context.Context, level slog.Level) bool { + return b.inner.Enabled(ctx, level) || b.shared.hasSubscriberAt(level) +} + +// Handle implements slog.Handler. It forwards the record to the inner handler +// only when the inner handler is enabled for that level (avoiding duplicate or +// unwanted stdout output), then fans it out to all interested gRPC subscribers. +func (b *LogBroadcaster) Handle(ctx context.Context, r slog.Record) error { + var err error + if b.inner.Enabled(ctx, r.Level) { + err = b.inner.Handle(ctx, r) + } + + attrs := make([]*LogAttr, 0, len(b.preAttrs)+r.NumAttrs()) + attrs = append(attrs, b.preAttrs...) + r.Attrs(func(a slog.Attr) bool { + attrs = append(attrs, &LogAttr{Key: b.groupPfx + a.Key, Value: a.Value.String()}) + return true + }) + + ev := &LogEvent{ + AtUnixNs: r.Time.UnixNano(), + Level: r.Level.String(), + Msg: r.Message, + Attrs: attrs, + } + b.shared.fanOut(r.Level, ev) + return err +} + +// WithAttrs implements slog.Handler. +func (b *LogBroadcaster) WithAttrs(attrs []slog.Attr) slog.Handler { + pre := make([]*LogAttr, len(b.preAttrs), len(b.preAttrs)+len(attrs)) + copy(pre, b.preAttrs) + for _, a := range attrs { + pre = append(pre, &LogAttr{Key: b.groupPfx + a.Key, Value: a.Value.String()}) + } + return &LogBroadcaster{ + inner: b.inner.WithAttrs(attrs), + preAttrs: pre, + groupPfx: b.groupPfx, + shared: b.shared, + } +} + +// WithGroup implements slog.Handler. +func (b *LogBroadcaster) WithGroup(name string) slog.Handler { + return &LogBroadcaster{ + inner: b.inner.WithGroup(name), + preAttrs: b.preAttrs, + groupPfx: b.groupPfx + name + ".", + shared: b.shared, + } +} diff --git a/internal/grpcapi/maglev.pb.go b/internal/grpcapi/maglev.pb.go index 86da8cb..25ea605 100644 --- a/internal/grpcapi/maglev.pb.go +++ b/internal/grpcapi/maglev.pb.go @@ -181,27 +181,27 @@ func (x *GetBackendRequest) GetName() string { return "" } -type PauseResumeRequest struct { +type BackendRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } -func (x *PauseResumeRequest) Reset() { - *x = PauseResumeRequest{} +func (x *BackendRequest) Reset() { + *x = BackendRequest{} mi := &file_proto_maglev_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *PauseResumeRequest) String() string { +func (x *BackendRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*PauseResumeRequest) ProtoMessage() {} +func (*BackendRequest) ProtoMessage() {} -func (x *PauseResumeRequest) ProtoReflect() protoreflect.Message { +func (x *BackendRequest) ProtoReflect() protoreflect.Message { mi := &file_proto_maglev_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -213,12 +213,12 @@ func (x *PauseResumeRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use PauseResumeRequest.ProtoReflect.Descriptor instead. -func (*PauseResumeRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use BackendRequest.ProtoReflect.Descriptor instead. +func (*BackendRequest) Descriptor() ([]byte, []int) { return file_proto_maglev_proto_rawDescGZIP(), []int{4} } -func (x *PauseResumeRequest) GetName() string { +func (x *BackendRequest) GetName() string { if x != nil { return x.Name } @@ -305,15 +305,185 @@ func (x *GetHealthCheckRequest) GetName() string { return "" } +type CheckConfigRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CheckConfigRequest) Reset() { + *x = CheckConfigRequest{} + mi := &file_proto_maglev_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CheckConfigRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CheckConfigRequest) ProtoMessage() {} + +func (x *CheckConfigRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_maglev_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CheckConfigRequest.ProtoReflect.Descriptor instead. +func (*CheckConfigRequest) Descriptor() ([]byte, []int) { + return file_proto_maglev_proto_rawDescGZIP(), []int{7} +} + +type CheckConfigResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Ok bool `protobuf:"varint,1,opt,name=ok,proto3" json:"ok,omitempty"` + ParseError string `protobuf:"bytes,2,opt,name=parse_error,json=parseError,proto3" json:"parse_error,omitempty"` // set when YAML cannot be read or parsed + SemanticError string `protobuf:"bytes,3,opt,name=semantic_error,json=semanticError,proto3" json:"semantic_error,omitempty"` // set when YAML is valid but semantically incorrect + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CheckConfigResponse) Reset() { + *x = CheckConfigResponse{} + mi := &file_proto_maglev_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CheckConfigResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CheckConfigResponse) ProtoMessage() {} + +func (x *CheckConfigResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_maglev_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CheckConfigResponse.ProtoReflect.Descriptor instead. +func (*CheckConfigResponse) Descriptor() ([]byte, []int) { + return file_proto_maglev_proto_rawDescGZIP(), []int{8} +} + +func (x *CheckConfigResponse) GetOk() bool { + if x != nil { + return x.Ok + } + return false +} + +func (x *CheckConfigResponse) GetParseError() string { + if x != nil { + return x.ParseError + } + return "" +} + +func (x *CheckConfigResponse) GetSemanticError() string { + if x != nil { + return x.SemanticError + } + return "" +} + +type SetWeightRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Frontend string `protobuf:"bytes,1,opt,name=frontend,proto3" json:"frontend,omitempty"` + Pool string `protobuf:"bytes,2,opt,name=pool,proto3" json:"pool,omitempty"` + Backend string `protobuf:"bytes,3,opt,name=backend,proto3" json:"backend,omitempty"` + Weight int32 `protobuf:"varint,4,opt,name=weight,proto3" json:"weight,omitempty"` // 0-100 + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *SetWeightRequest) Reset() { + *x = SetWeightRequest{} + mi := &file_proto_maglev_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *SetWeightRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SetWeightRequest) ProtoMessage() {} + +func (x *SetWeightRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_maglev_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SetWeightRequest.ProtoReflect.Descriptor instead. +func (*SetWeightRequest) Descriptor() ([]byte, []int) { + return file_proto_maglev_proto_rawDescGZIP(), []int{9} +} + +func (x *SetWeightRequest) GetFrontend() string { + if x != nil { + return x.Frontend + } + return "" +} + +func (x *SetWeightRequest) GetPool() string { + if x != nil { + return x.Pool + } + return "" +} + +func (x *SetWeightRequest) GetBackend() string { + if x != nil { + return x.Backend + } + return "" +} + +func (x *SetWeightRequest) GetWeight() int32 { + if x != nil { + return x.Weight + } + return 0 +} + +// WatchRequest controls which event types are streamed. All fields default to +// true (i.e. an empty request subscribes to everything at info level). type WatchRequest struct { state protoimpl.MessageState `protogen:"open.v1"` + Log *bool `protobuf:"varint,1,opt,name=log,proto3,oneof" json:"log,omitempty"` // include log events (default: true) + LogLevel string `protobuf:"bytes,2,opt,name=log_level,json=logLevel,proto3" json:"log_level,omitempty"` // minimum log level: debug|info|warn|error (default: info) + Backend *bool `protobuf:"varint,3,opt,name=backend,proto3,oneof" json:"backend,omitempty"` // include backend transition events (default: true) + Frontend *bool `protobuf:"varint,4,opt,name=frontend,proto3,oneof" json:"frontend,omitempty"` // include frontend events (default: true) unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *WatchRequest) Reset() { *x = WatchRequest{} - mi := &file_proto_maglev_proto_msgTypes[7] + mi := &file_proto_maglev_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -325,7 +495,7 @@ func (x *WatchRequest) String() string { func (*WatchRequest) ProtoMessage() {} func (x *WatchRequest) ProtoReflect() protoreflect.Message { - mi := &file_proto_maglev_proto_msgTypes[7] + mi := &file_proto_maglev_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -338,7 +508,35 @@ func (x *WatchRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use WatchRequest.ProtoReflect.Descriptor instead. func (*WatchRequest) Descriptor() ([]byte, []int) { - return file_proto_maglev_proto_rawDescGZIP(), []int{7} + return file_proto_maglev_proto_rawDescGZIP(), []int{10} +} + +func (x *WatchRequest) GetLog() bool { + if x != nil && x.Log != nil { + return *x.Log + } + return false +} + +func (x *WatchRequest) GetLogLevel() string { + if x != nil { + return x.LogLevel + } + return "" +} + +func (x *WatchRequest) GetBackend() bool { + if x != nil && x.Backend != nil { + return *x.Backend + } + return false +} + +func (x *WatchRequest) GetFrontend() bool { + if x != nil && x.Frontend != nil { + return *x.Frontend + } + return false } type ListFrontendsResponse struct { @@ -350,7 +548,7 @@ type ListFrontendsResponse struct { func (x *ListFrontendsResponse) Reset() { *x = ListFrontendsResponse{} - mi := &file_proto_maglev_proto_msgTypes[8] + mi := &file_proto_maglev_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -362,7 +560,7 @@ func (x *ListFrontendsResponse) String() string { func (*ListFrontendsResponse) ProtoMessage() {} func (x *ListFrontendsResponse) ProtoReflect() protoreflect.Message { - mi := &file_proto_maglev_proto_msgTypes[8] + mi := &file_proto_maglev_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -375,7 +573,7 @@ func (x *ListFrontendsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ListFrontendsResponse.ProtoReflect.Descriptor instead. func (*ListFrontendsResponse) Descriptor() ([]byte, []int) { - return file_proto_maglev_proto_rawDescGZIP(), []int{8} + return file_proto_maglev_proto_rawDescGZIP(), []int{11} } func (x *ListFrontendsResponse) GetFrontendNames() []string { @@ -395,7 +593,7 @@ type PoolBackendInfo struct { func (x *PoolBackendInfo) Reset() { *x = PoolBackendInfo{} - mi := &file_proto_maglev_proto_msgTypes[9] + mi := &file_proto_maglev_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -407,7 +605,7 @@ func (x *PoolBackendInfo) String() string { func (*PoolBackendInfo) ProtoMessage() {} func (x *PoolBackendInfo) ProtoReflect() protoreflect.Message { - mi := &file_proto_maglev_proto_msgTypes[9] + mi := &file_proto_maglev_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -420,7 +618,7 @@ func (x *PoolBackendInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use PoolBackendInfo.ProtoReflect.Descriptor instead. func (*PoolBackendInfo) Descriptor() ([]byte, []int) { - return file_proto_maglev_proto_rawDescGZIP(), []int{9} + return file_proto_maglev_proto_rawDescGZIP(), []int{12} } func (x *PoolBackendInfo) GetName() string { @@ -447,7 +645,7 @@ type PoolInfo struct { func (x *PoolInfo) Reset() { *x = PoolInfo{} - mi := &file_proto_maglev_proto_msgTypes[10] + mi := &file_proto_maglev_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -459,7 +657,7 @@ func (x *PoolInfo) String() string { func (*PoolInfo) ProtoMessage() {} func (x *PoolInfo) ProtoReflect() protoreflect.Message { - mi := &file_proto_maglev_proto_msgTypes[10] + mi := &file_proto_maglev_proto_msgTypes[13] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -472,7 +670,7 @@ func (x *PoolInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use PoolInfo.ProtoReflect.Descriptor instead. func (*PoolInfo) Descriptor() ([]byte, []int) { - return file_proto_maglev_proto_rawDescGZIP(), []int{10} + return file_proto_maglev_proto_rawDescGZIP(), []int{13} } func (x *PoolInfo) GetName() string { @@ -503,7 +701,7 @@ type FrontendInfo struct { func (x *FrontendInfo) Reset() { *x = FrontendInfo{} - mi := &file_proto_maglev_proto_msgTypes[11] + mi := &file_proto_maglev_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -515,7 +713,7 @@ func (x *FrontendInfo) String() string { func (*FrontendInfo) ProtoMessage() {} func (x *FrontendInfo) ProtoReflect() protoreflect.Message { - mi := &file_proto_maglev_proto_msgTypes[11] + mi := &file_proto_maglev_proto_msgTypes[14] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -528,7 +726,7 @@ func (x *FrontendInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use FrontendInfo.ProtoReflect.Descriptor instead. func (*FrontendInfo) Descriptor() ([]byte, []int) { - return file_proto_maglev_proto_rawDescGZIP(), []int{11} + return file_proto_maglev_proto_rawDescGZIP(), []int{14} } func (x *FrontendInfo) GetName() string { @@ -582,7 +780,7 @@ type ListBackendsResponse struct { func (x *ListBackendsResponse) Reset() { *x = ListBackendsResponse{} - mi := &file_proto_maglev_proto_msgTypes[12] + mi := &file_proto_maglev_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -594,7 +792,7 @@ func (x *ListBackendsResponse) String() string { func (*ListBackendsResponse) ProtoMessage() {} func (x *ListBackendsResponse) ProtoReflect() protoreflect.Message { - mi := &file_proto_maglev_proto_msgTypes[12] + mi := &file_proto_maglev_proto_msgTypes[15] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -607,7 +805,7 @@ func (x *ListBackendsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ListBackendsResponse.ProtoReflect.Descriptor instead. func (*ListBackendsResponse) Descriptor() ([]byte, []int) { - return file_proto_maglev_proto_rawDescGZIP(), []int{12} + return file_proto_maglev_proto_rawDescGZIP(), []int{15} } func (x *ListBackendsResponse) GetBackendNames() []string { @@ -626,7 +824,7 @@ type ListHealthChecksResponse struct { func (x *ListHealthChecksResponse) Reset() { *x = ListHealthChecksResponse{} - mi := &file_proto_maglev_proto_msgTypes[13] + mi := &file_proto_maglev_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -638,7 +836,7 @@ func (x *ListHealthChecksResponse) String() string { func (*ListHealthChecksResponse) ProtoMessage() {} func (x *ListHealthChecksResponse) ProtoReflect() protoreflect.Message { - mi := &file_proto_maglev_proto_msgTypes[13] + mi := &file_proto_maglev_proto_msgTypes[16] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -651,7 +849,7 @@ func (x *ListHealthChecksResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ListHealthChecksResponse.ProtoReflect.Descriptor instead. func (*ListHealthChecksResponse) Descriptor() ([]byte, []int) { - return file_proto_maglev_proto_rawDescGZIP(), []int{13} + return file_proto_maglev_proto_rawDescGZIP(), []int{16} } func (x *ListHealthChecksResponse) GetNames() []string { @@ -676,7 +874,7 @@ type HTTPCheckParams struct { func (x *HTTPCheckParams) Reset() { *x = HTTPCheckParams{} - mi := &file_proto_maglev_proto_msgTypes[14] + mi := &file_proto_maglev_proto_msgTypes[17] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -688,7 +886,7 @@ func (x *HTTPCheckParams) String() string { func (*HTTPCheckParams) ProtoMessage() {} func (x *HTTPCheckParams) ProtoReflect() protoreflect.Message { - mi := &file_proto_maglev_proto_msgTypes[14] + mi := &file_proto_maglev_proto_msgTypes[17] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -701,7 +899,7 @@ func (x *HTTPCheckParams) ProtoReflect() protoreflect.Message { // Deprecated: Use HTTPCheckParams.ProtoReflect.Descriptor instead. func (*HTTPCheckParams) Descriptor() ([]byte, []int) { - return file_proto_maglev_proto_rawDescGZIP(), []int{14} + return file_proto_maglev_proto_rawDescGZIP(), []int{17} } func (x *HTTPCheckParams) GetPath() string { @@ -764,7 +962,7 @@ type TCPCheckParams struct { func (x *TCPCheckParams) Reset() { *x = TCPCheckParams{} - mi := &file_proto_maglev_proto_msgTypes[15] + mi := &file_proto_maglev_proto_msgTypes[18] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -776,7 +974,7 @@ func (x *TCPCheckParams) String() string { func (*TCPCheckParams) ProtoMessage() {} func (x *TCPCheckParams) ProtoReflect() protoreflect.Message { - mi := &file_proto_maglev_proto_msgTypes[15] + mi := &file_proto_maglev_proto_msgTypes[18] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -789,7 +987,7 @@ func (x *TCPCheckParams) ProtoReflect() protoreflect.Message { // Deprecated: Use TCPCheckParams.ProtoReflect.Descriptor instead. func (*TCPCheckParams) Descriptor() ([]byte, []int) { - return file_proto_maglev_proto_rawDescGZIP(), []int{15} + return file_proto_maglev_proto_rawDescGZIP(), []int{18} } func (x *TCPCheckParams) GetSsl() bool { @@ -834,7 +1032,7 @@ type HealthCheckInfo struct { func (x *HealthCheckInfo) Reset() { *x = HealthCheckInfo{} - mi := &file_proto_maglev_proto_msgTypes[16] + mi := &file_proto_maglev_proto_msgTypes[19] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -846,7 +1044,7 @@ func (x *HealthCheckInfo) String() string { func (*HealthCheckInfo) ProtoMessage() {} func (x *HealthCheckInfo) ProtoReflect() protoreflect.Message { - mi := &file_proto_maglev_proto_msgTypes[16] + mi := &file_proto_maglev_proto_msgTypes[19] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -859,7 +1057,7 @@ func (x *HealthCheckInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use HealthCheckInfo.ProtoReflect.Descriptor instead. func (*HealthCheckInfo) Descriptor() ([]byte, []int) { - return file_proto_maglev_proto_rawDescGZIP(), []int{16} + return file_proto_maglev_proto_rawDescGZIP(), []int{19} } func (x *HealthCheckInfo) GetName() string { @@ -967,7 +1165,7 @@ type BackendInfo struct { func (x *BackendInfo) Reset() { *x = BackendInfo{} - mi := &file_proto_maglev_proto_msgTypes[17] + mi := &file_proto_maglev_proto_msgTypes[20] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -979,7 +1177,7 @@ func (x *BackendInfo) String() string { func (*BackendInfo) ProtoMessage() {} func (x *BackendInfo) ProtoReflect() protoreflect.Message { - mi := &file_proto_maglev_proto_msgTypes[17] + mi := &file_proto_maglev_proto_msgTypes[20] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -992,7 +1190,7 @@ func (x *BackendInfo) ProtoReflect() protoreflect.Message { // Deprecated: Use BackendInfo.ProtoReflect.Descriptor instead. func (*BackendInfo) Descriptor() ([]byte, []int) { - return file_proto_maglev_proto_rawDescGZIP(), []int{17} + return file_proto_maglev_proto_rawDescGZIP(), []int{20} } func (x *BackendInfo) GetName() string { @@ -1048,7 +1246,7 @@ type TransitionRecord struct { func (x *TransitionRecord) Reset() { *x = TransitionRecord{} - mi := &file_proto_maglev_proto_msgTypes[18] + mi := &file_proto_maglev_proto_msgTypes[21] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1060,7 +1258,7 @@ func (x *TransitionRecord) String() string { func (*TransitionRecord) ProtoMessage() {} func (x *TransitionRecord) ProtoReflect() protoreflect.Message { - mi := &file_proto_maglev_proto_msgTypes[18] + mi := &file_proto_maglev_proto_msgTypes[21] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1073,7 +1271,7 @@ func (x *TransitionRecord) ProtoReflect() protoreflect.Message { // Deprecated: Use TransitionRecord.ProtoReflect.Descriptor instead. func (*TransitionRecord) Descriptor() ([]byte, []int) { - return file_proto_maglev_proto_rawDescGZIP(), []int{18} + return file_proto_maglev_proto_rawDescGZIP(), []int{21} } func (x *TransitionRecord) GetFrom() string { @@ -1097,6 +1295,129 @@ func (x *TransitionRecord) GetAtUnixNs() int64 { return 0 } +// LogAttr is a single key/value attribute from a structured log record. +type LogAttr struct { + state protoimpl.MessageState `protogen:"open.v1"` + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *LogAttr) Reset() { + *x = LogAttr{} + mi := &file_proto_maglev_proto_msgTypes[22] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *LogAttr) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogAttr) ProtoMessage() {} + +func (x *LogAttr) ProtoReflect() protoreflect.Message { + mi := &file_proto_maglev_proto_msgTypes[22] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogAttr.ProtoReflect.Descriptor instead. +func (*LogAttr) Descriptor() ([]byte, []int) { + return file_proto_maglev_proto_rawDescGZIP(), []int{22} +} + +func (x *LogAttr) GetKey() string { + if x != nil { + return x.Key + } + return "" +} + +func (x *LogAttr) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +// LogEvent carries a single structured log record. +type LogEvent struct { + state protoimpl.MessageState `protogen:"open.v1"` + AtUnixNs int64 `protobuf:"varint,1,opt,name=at_unix_ns,json=atUnixNs,proto3" json:"at_unix_ns,omitempty"` + Level string `protobuf:"bytes,2,opt,name=level,proto3" json:"level,omitempty"` + Msg string `protobuf:"bytes,3,opt,name=msg,proto3" json:"msg,omitempty"` + Attrs []*LogAttr `protobuf:"bytes,4,rep,name=attrs,proto3" json:"attrs,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *LogEvent) Reset() { + *x = LogEvent{} + mi := &file_proto_maglev_proto_msgTypes[23] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *LogEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogEvent) ProtoMessage() {} + +func (x *LogEvent) ProtoReflect() protoreflect.Message { + mi := &file_proto_maglev_proto_msgTypes[23] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogEvent.ProtoReflect.Descriptor instead. +func (*LogEvent) Descriptor() ([]byte, []int) { + return file_proto_maglev_proto_rawDescGZIP(), []int{23} +} + +func (x *LogEvent) GetAtUnixNs() int64 { + if x != nil { + return x.AtUnixNs + } + return 0 +} + +func (x *LogEvent) GetLevel() string { + if x != nil { + return x.Level + } + return "" +} + +func (x *LogEvent) GetMsg() string { + if x != nil { + return x.Msg + } + return "" +} + +func (x *LogEvent) GetAttrs() []*LogAttr { + if x != nil { + return x.Attrs + } + return nil +} + +// BackendEvent is emitted on every backend state transition. type BackendEvent struct { state protoimpl.MessageState `protogen:"open.v1"` BackendName string `protobuf:"bytes,1,opt,name=backend_name,json=backendName,proto3" json:"backend_name,omitempty"` @@ -1107,7 +1428,7 @@ type BackendEvent struct { func (x *BackendEvent) Reset() { *x = BackendEvent{} - mi := &file_proto_maglev_proto_msgTypes[19] + mi := &file_proto_maglev_proto_msgTypes[24] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1119,7 +1440,7 @@ func (x *BackendEvent) String() string { func (*BackendEvent) ProtoMessage() {} func (x *BackendEvent) ProtoReflect() protoreflect.Message { - mi := &file_proto_maglev_proto_msgTypes[19] + mi := &file_proto_maglev_proto_msgTypes[24] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1132,7 +1453,7 @@ func (x *BackendEvent) ProtoReflect() protoreflect.Message { // Deprecated: Use BackendEvent.ProtoReflect.Descriptor instead. func (*BackendEvent) Descriptor() ([]byte, []int) { - return file_proto_maglev_proto_rawDescGZIP(), []int{19} + return file_proto_maglev_proto_rawDescGZIP(), []int{24} } func (x *BackendEvent) GetBackendName() string { @@ -1149,6 +1470,142 @@ func (x *BackendEvent) GetTransition() *TransitionRecord { return nil } +// FrontendEvent is reserved for future frontend-level events. +type FrontendEvent struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FrontendEvent) Reset() { + *x = FrontendEvent{} + mi := &file_proto_maglev_proto_msgTypes[25] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FrontendEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FrontendEvent) ProtoMessage() {} + +func (x *FrontendEvent) ProtoReflect() protoreflect.Message { + mi := &file_proto_maglev_proto_msgTypes[25] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FrontendEvent.ProtoReflect.Descriptor instead. +func (*FrontendEvent) Descriptor() ([]byte, []int) { + return file_proto_maglev_proto_rawDescGZIP(), []int{25} +} + +// Event is the envelope returned by WatchEvents. +type Event struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Types that are valid to be assigned to Event: + // + // *Event_Log + // *Event_Backend + // *Event_Frontend + Event isEvent_Event `protobuf_oneof:"event"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Event) Reset() { + *x = Event{} + mi := &file_proto_maglev_proto_msgTypes[26] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Event) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Event) ProtoMessage() {} + +func (x *Event) ProtoReflect() protoreflect.Message { + mi := &file_proto_maglev_proto_msgTypes[26] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Event.ProtoReflect.Descriptor instead. +func (*Event) Descriptor() ([]byte, []int) { + return file_proto_maglev_proto_rawDescGZIP(), []int{26} +} + +func (x *Event) GetEvent() isEvent_Event { + if x != nil { + return x.Event + } + return nil +} + +func (x *Event) GetLog() *LogEvent { + if x != nil { + if x, ok := x.Event.(*Event_Log); ok { + return x.Log + } + } + return nil +} + +func (x *Event) GetBackend() *BackendEvent { + if x != nil { + if x, ok := x.Event.(*Event_Backend); ok { + return x.Backend + } + } + return nil +} + +func (x *Event) GetFrontend() *FrontendEvent { + if x != nil { + if x, ok := x.Event.(*Event_Frontend); ok { + return x.Frontend + } + } + return nil +} + +type isEvent_Event interface { + isEvent_Event() +} + +type Event_Log struct { + Log *LogEvent `protobuf:"bytes,1,opt,name=log,proto3,oneof"` +} + +type Event_Backend struct { + Backend *BackendEvent `protobuf:"bytes,2,opt,name=backend,proto3,oneof"` +} + +type Event_Frontend struct { + Frontend *FrontendEvent `protobuf:"bytes,3,opt,name=frontend,proto3,oneof"` +} + +func (*Event_Log) isEvent_Event() {} + +func (*Event_Backend) isEvent_Event() {} + +func (*Event_Frontend) isEvent_Event() {} + var File_proto_maglev_proto protoreflect.FileDescriptor const file_proto_maglev_proto_rawDesc = "" + @@ -1159,13 +1616,32 @@ const file_proto_maglev_proto_rawDesc = "" + "\x04name\x18\x01 \x01(\tR\x04name\"\x15\n" + "\x13ListBackendsRequest\"'\n" + "\x11GetBackendRequest\x12\x12\n" + - "\x04name\x18\x01 \x01(\tR\x04name\"(\n" + - "\x12PauseResumeRequest\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\"$\n" + + "\x0eBackendRequest\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\"\x19\n" + "\x17ListHealthChecksRequest\"+\n" + "\x15GetHealthCheckRequest\x12\x12\n" + - "\x04name\x18\x01 \x01(\tR\x04name\"\x0e\n" + - "\fWatchRequest\">\n" + + "\x04name\x18\x01 \x01(\tR\x04name\"\x14\n" + + "\x12CheckConfigRequest\"m\n" + + "\x13CheckConfigResponse\x12\x0e\n" + + "\x02ok\x18\x01 \x01(\bR\x02ok\x12\x1f\n" + + "\vparse_error\x18\x02 \x01(\tR\n" + + "parseError\x12%\n" + + "\x0esemantic_error\x18\x03 \x01(\tR\rsemanticError\"t\n" + + "\x10SetWeightRequest\x12\x1a\n" + + "\bfrontend\x18\x01 \x01(\tR\bfrontend\x12\x12\n" + + "\x04pool\x18\x02 \x01(\tR\x04pool\x12\x18\n" + + "\abackend\x18\x03 \x01(\tR\abackend\x12\x16\n" + + "\x06weight\x18\x04 \x01(\x05R\x06weight\"\xa3\x01\n" + + "\fWatchRequest\x12\x15\n" + + "\x03log\x18\x01 \x01(\bH\x00R\x03log\x88\x01\x01\x12\x1b\n" + + "\tlog_level\x18\x02 \x01(\tR\blogLevel\x12\x1d\n" + + "\abackend\x18\x03 \x01(\bH\x01R\abackend\x88\x01\x01\x12\x1f\n" + + "\bfrontend\x18\x04 \x01(\bH\x02R\bfrontend\x88\x01\x01B\x06\n" + + "\x04_logB\n" + + "\n" + + "\b_backendB\v\n" + + "\t_frontend\">\n" + "\x15ListFrontendsResponse\x12%\n" + "\x0efrontend_names\x18\x01 \x03(\tR\rfrontendNames\"=\n" + "\x0fPoolBackendInfo\x12\x12\n" + @@ -1227,23 +1703,42 @@ const file_proto_maglev_proto_rawDesc = "" + "\x04from\x18\x01 \x01(\tR\x04from\x12\x0e\n" + "\x02to\x18\x02 \x01(\tR\x02to\x12\x1c\n" + "\n" + - "at_unix_ns\x18\x03 \x01(\x03R\batUnixNs\"k\n" + + "at_unix_ns\x18\x03 \x01(\x03R\batUnixNs\"1\n" + + "\aLogAttr\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value\"w\n" + + "\bLogEvent\x12\x1c\n" + + "\n" + + "at_unix_ns\x18\x01 \x01(\x03R\batUnixNs\x12\x14\n" + + "\x05level\x18\x02 \x01(\tR\x05level\x12\x10\n" + + "\x03msg\x18\x03 \x01(\tR\x03msg\x12%\n" + + "\x05attrs\x18\x04 \x03(\v2\x0f.maglev.LogAttrR\x05attrs\"k\n" + "\fBackendEvent\x12!\n" + "\fbackend_name\x18\x01 \x01(\tR\vbackendName\x128\n" + "\n" + "transition\x18\x02 \x01(\v2\x18.maglev.TransitionRecordR\n" + - "transition2\x88\x05\n" + + "transition\"\x0f\n" + + "\rFrontendEvent\"\x9d\x01\n" + + "\x05Event\x12$\n" + + "\x03log\x18\x01 \x01(\v2\x10.maglev.LogEventH\x00R\x03log\x120\n" + + "\abackend\x18\x02 \x01(\v2\x14.maglev.BackendEventH\x00R\abackend\x123\n" + + "\bfrontend\x18\x03 \x01(\v2\x15.maglev.FrontendEventH\x00R\bfrontendB\a\n" + + "\x05event2\x87\a\n" + "\x06Maglev\x12L\n" + "\rListFrontends\x12\x1c.maglev.ListFrontendsRequest\x1a\x1d.maglev.ListFrontendsResponse\x12?\n" + "\vGetFrontend\x12\x1a.maglev.GetFrontendRequest\x1a\x14.maglev.FrontendInfo\x12I\n" + "\fListBackends\x12\x1b.maglev.ListBackendsRequest\x1a\x1c.maglev.ListBackendsResponse\x12<\n" + "\n" + - "GetBackend\x12\x19.maglev.GetBackendRequest\x1a\x13.maglev.BackendInfo\x12?\n" + - "\fPauseBackend\x12\x1a.maglev.PauseResumeRequest\x1a\x13.maglev.BackendInfo\x12@\n" + - "\rResumeBackend\x12\x1a.maglev.PauseResumeRequest\x1a\x13.maglev.BackendInfo\x12U\n" + + "GetBackend\x12\x19.maglev.GetBackendRequest\x1a\x13.maglev.BackendInfo\x12;\n" + + "\fPauseBackend\x12\x16.maglev.BackendRequest\x1a\x13.maglev.BackendInfo\x12<\n" + + "\rResumeBackend\x12\x16.maglev.BackendRequest\x1a\x13.maglev.BackendInfo\x12<\n" + + "\rEnableBackend\x12\x16.maglev.BackendRequest\x1a\x13.maglev.BackendInfo\x12=\n" + + "\x0eDisableBackend\x12\x16.maglev.BackendRequest\x1a\x13.maglev.BackendInfo\x12U\n" + "\x10ListHealthChecks\x12\x1f.maglev.ListHealthChecksRequest\x1a .maglev.ListHealthChecksResponse\x12H\n" + - "\x0eGetHealthCheck\x12\x1d.maglev.GetHealthCheckRequest\x1a\x17.maglev.HealthCheckInfo\x12B\n" + - "\x12WatchBackendEvents\x12\x14.maglev.WatchRequest\x1a\x14.maglev.BackendEvent0\x01B.Z,git.ipng.ch/ipng/vpp-maglev/internal/grpcapib\x06proto3" + "\x0eGetHealthCheck\x12\x1d.maglev.GetHealthCheckRequest\x1a\x17.maglev.HealthCheckInfo\x12N\n" + + "\x1cSetFrontendPoolBackendWeight\x12\x18.maglev.SetWeightRequest\x1a\x14.maglev.FrontendInfo\x124\n" + + "\vWatchEvents\x12\x14.maglev.WatchRequest\x1a\r.maglev.Event0\x01\x12F\n" + + "\vCheckConfig\x12\x1a.maglev.CheckConfigRequest\x1a\x1b.maglev.CheckConfigResponseB.Z,git.ipng.ch/ipng/vpp-maglev/internal/grpcapib\x06proto3" var ( file_proto_maglev_proto_rawDescOnce sync.Once @@ -1257,59 +1752,78 @@ func file_proto_maglev_proto_rawDescGZIP() []byte { return file_proto_maglev_proto_rawDescData } -var file_proto_maglev_proto_msgTypes = make([]protoimpl.MessageInfo, 20) +var file_proto_maglev_proto_msgTypes = make([]protoimpl.MessageInfo, 27) var file_proto_maglev_proto_goTypes = []any{ (*ListFrontendsRequest)(nil), // 0: maglev.ListFrontendsRequest (*GetFrontendRequest)(nil), // 1: maglev.GetFrontendRequest (*ListBackendsRequest)(nil), // 2: maglev.ListBackendsRequest (*GetBackendRequest)(nil), // 3: maglev.GetBackendRequest - (*PauseResumeRequest)(nil), // 4: maglev.PauseResumeRequest + (*BackendRequest)(nil), // 4: maglev.BackendRequest (*ListHealthChecksRequest)(nil), // 5: maglev.ListHealthChecksRequest (*GetHealthCheckRequest)(nil), // 6: maglev.GetHealthCheckRequest - (*WatchRequest)(nil), // 7: maglev.WatchRequest - (*ListFrontendsResponse)(nil), // 8: maglev.ListFrontendsResponse - (*PoolBackendInfo)(nil), // 9: maglev.PoolBackendInfo - (*PoolInfo)(nil), // 10: maglev.PoolInfo - (*FrontendInfo)(nil), // 11: maglev.FrontendInfo - (*ListBackendsResponse)(nil), // 12: maglev.ListBackendsResponse - (*ListHealthChecksResponse)(nil), // 13: maglev.ListHealthChecksResponse - (*HTTPCheckParams)(nil), // 14: maglev.HTTPCheckParams - (*TCPCheckParams)(nil), // 15: maglev.TCPCheckParams - (*HealthCheckInfo)(nil), // 16: maglev.HealthCheckInfo - (*BackendInfo)(nil), // 17: maglev.BackendInfo - (*TransitionRecord)(nil), // 18: maglev.TransitionRecord - (*BackendEvent)(nil), // 19: maglev.BackendEvent + (*CheckConfigRequest)(nil), // 7: maglev.CheckConfigRequest + (*CheckConfigResponse)(nil), // 8: maglev.CheckConfigResponse + (*SetWeightRequest)(nil), // 9: maglev.SetWeightRequest + (*WatchRequest)(nil), // 10: maglev.WatchRequest + (*ListFrontendsResponse)(nil), // 11: maglev.ListFrontendsResponse + (*PoolBackendInfo)(nil), // 12: maglev.PoolBackendInfo + (*PoolInfo)(nil), // 13: maglev.PoolInfo + (*FrontendInfo)(nil), // 14: maglev.FrontendInfo + (*ListBackendsResponse)(nil), // 15: maglev.ListBackendsResponse + (*ListHealthChecksResponse)(nil), // 16: maglev.ListHealthChecksResponse + (*HTTPCheckParams)(nil), // 17: maglev.HTTPCheckParams + (*TCPCheckParams)(nil), // 18: maglev.TCPCheckParams + (*HealthCheckInfo)(nil), // 19: maglev.HealthCheckInfo + (*BackendInfo)(nil), // 20: maglev.BackendInfo + (*TransitionRecord)(nil), // 21: maglev.TransitionRecord + (*LogAttr)(nil), // 22: maglev.LogAttr + (*LogEvent)(nil), // 23: maglev.LogEvent + (*BackendEvent)(nil), // 24: maglev.BackendEvent + (*FrontendEvent)(nil), // 25: maglev.FrontendEvent + (*Event)(nil), // 26: maglev.Event } var file_proto_maglev_proto_depIdxs = []int32{ - 9, // 0: maglev.PoolInfo.backends:type_name -> maglev.PoolBackendInfo - 10, // 1: maglev.FrontendInfo.pools:type_name -> maglev.PoolInfo - 14, // 2: maglev.HealthCheckInfo.http:type_name -> maglev.HTTPCheckParams - 15, // 3: maglev.HealthCheckInfo.tcp:type_name -> maglev.TCPCheckParams - 18, // 4: maglev.BackendInfo.transitions:type_name -> maglev.TransitionRecord - 18, // 5: maglev.BackendEvent.transition:type_name -> maglev.TransitionRecord - 0, // 6: maglev.Maglev.ListFrontends:input_type -> maglev.ListFrontendsRequest - 1, // 7: maglev.Maglev.GetFrontend:input_type -> maglev.GetFrontendRequest - 2, // 8: maglev.Maglev.ListBackends:input_type -> maglev.ListBackendsRequest - 3, // 9: maglev.Maglev.GetBackend:input_type -> maglev.GetBackendRequest - 4, // 10: maglev.Maglev.PauseBackend:input_type -> maglev.PauseResumeRequest - 4, // 11: maglev.Maglev.ResumeBackend:input_type -> maglev.PauseResumeRequest - 5, // 12: maglev.Maglev.ListHealthChecks:input_type -> maglev.ListHealthChecksRequest - 6, // 13: maglev.Maglev.GetHealthCheck:input_type -> maglev.GetHealthCheckRequest - 7, // 14: maglev.Maglev.WatchBackendEvents:input_type -> maglev.WatchRequest - 8, // 15: maglev.Maglev.ListFrontends:output_type -> maglev.ListFrontendsResponse - 11, // 16: maglev.Maglev.GetFrontend:output_type -> maglev.FrontendInfo - 12, // 17: maglev.Maglev.ListBackends:output_type -> maglev.ListBackendsResponse - 17, // 18: maglev.Maglev.GetBackend:output_type -> maglev.BackendInfo - 17, // 19: maglev.Maglev.PauseBackend:output_type -> maglev.BackendInfo - 17, // 20: maglev.Maglev.ResumeBackend:output_type -> maglev.BackendInfo - 13, // 21: maglev.Maglev.ListHealthChecks:output_type -> maglev.ListHealthChecksResponse - 16, // 22: maglev.Maglev.GetHealthCheck:output_type -> maglev.HealthCheckInfo - 19, // 23: maglev.Maglev.WatchBackendEvents:output_type -> maglev.BackendEvent - 15, // [15:24] is the sub-list for method output_type - 6, // [6:15] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 12, // 0: maglev.PoolInfo.backends:type_name -> maglev.PoolBackendInfo + 13, // 1: maglev.FrontendInfo.pools:type_name -> maglev.PoolInfo + 17, // 2: maglev.HealthCheckInfo.http:type_name -> maglev.HTTPCheckParams + 18, // 3: maglev.HealthCheckInfo.tcp:type_name -> maglev.TCPCheckParams + 21, // 4: maglev.BackendInfo.transitions:type_name -> maglev.TransitionRecord + 22, // 5: maglev.LogEvent.attrs:type_name -> maglev.LogAttr + 21, // 6: maglev.BackendEvent.transition:type_name -> maglev.TransitionRecord + 23, // 7: maglev.Event.log:type_name -> maglev.LogEvent + 24, // 8: maglev.Event.backend:type_name -> maglev.BackendEvent + 25, // 9: maglev.Event.frontend:type_name -> maglev.FrontendEvent + 0, // 10: maglev.Maglev.ListFrontends:input_type -> maglev.ListFrontendsRequest + 1, // 11: maglev.Maglev.GetFrontend:input_type -> maglev.GetFrontendRequest + 2, // 12: maglev.Maglev.ListBackends:input_type -> maglev.ListBackendsRequest + 3, // 13: maglev.Maglev.GetBackend:input_type -> maglev.GetBackendRequest + 4, // 14: maglev.Maglev.PauseBackend:input_type -> maglev.BackendRequest + 4, // 15: maglev.Maglev.ResumeBackend:input_type -> maglev.BackendRequest + 4, // 16: maglev.Maglev.EnableBackend:input_type -> maglev.BackendRequest + 4, // 17: maglev.Maglev.DisableBackend:input_type -> maglev.BackendRequest + 5, // 18: maglev.Maglev.ListHealthChecks:input_type -> maglev.ListHealthChecksRequest + 6, // 19: maglev.Maglev.GetHealthCheck:input_type -> maglev.GetHealthCheckRequest + 9, // 20: maglev.Maglev.SetFrontendPoolBackendWeight:input_type -> maglev.SetWeightRequest + 10, // 21: maglev.Maglev.WatchEvents:input_type -> maglev.WatchRequest + 7, // 22: maglev.Maglev.CheckConfig:input_type -> maglev.CheckConfigRequest + 11, // 23: maglev.Maglev.ListFrontends:output_type -> maglev.ListFrontendsResponse + 14, // 24: maglev.Maglev.GetFrontend:output_type -> maglev.FrontendInfo + 15, // 25: maglev.Maglev.ListBackends:output_type -> maglev.ListBackendsResponse + 20, // 26: maglev.Maglev.GetBackend:output_type -> maglev.BackendInfo + 20, // 27: maglev.Maglev.PauseBackend:output_type -> maglev.BackendInfo + 20, // 28: maglev.Maglev.ResumeBackend:output_type -> maglev.BackendInfo + 20, // 29: maglev.Maglev.EnableBackend:output_type -> maglev.BackendInfo + 20, // 30: maglev.Maglev.DisableBackend:output_type -> maglev.BackendInfo + 16, // 31: maglev.Maglev.ListHealthChecks:output_type -> maglev.ListHealthChecksResponse + 19, // 32: maglev.Maglev.GetHealthCheck:output_type -> maglev.HealthCheckInfo + 14, // 33: maglev.Maglev.SetFrontendPoolBackendWeight:output_type -> maglev.FrontendInfo + 26, // 34: maglev.Maglev.WatchEvents:output_type -> maglev.Event + 8, // 35: maglev.Maglev.CheckConfig:output_type -> maglev.CheckConfigResponse + 23, // [23:36] is the sub-list for method output_type + 10, // [10:23] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name } func init() { file_proto_maglev_proto_init() } @@ -1317,13 +1831,19 @@ func file_proto_maglev_proto_init() { if File_proto_maglev_proto != nil { return } + file_proto_maglev_proto_msgTypes[10].OneofWrappers = []any{} + file_proto_maglev_proto_msgTypes[26].OneofWrappers = []any{ + (*Event_Log)(nil), + (*Event_Backend)(nil), + (*Event_Frontend)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_maglev_proto_rawDesc), len(file_proto_maglev_proto_rawDesc)), NumEnums: 0, - NumMessages: 20, + NumMessages: 27, NumExtensions: 0, NumServices: 1, }, diff --git a/internal/grpcapi/maglev_grpc.pb.go b/internal/grpcapi/maglev_grpc.pb.go index 1794ab0..6ef365b 100644 --- a/internal/grpcapi/maglev_grpc.pb.go +++ b/internal/grpcapi/maglev_grpc.pb.go @@ -19,15 +19,19 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - Maglev_ListFrontends_FullMethodName = "/maglev.Maglev/ListFrontends" - Maglev_GetFrontend_FullMethodName = "/maglev.Maglev/GetFrontend" - Maglev_ListBackends_FullMethodName = "/maglev.Maglev/ListBackends" - Maglev_GetBackend_FullMethodName = "/maglev.Maglev/GetBackend" - Maglev_PauseBackend_FullMethodName = "/maglev.Maglev/PauseBackend" - Maglev_ResumeBackend_FullMethodName = "/maglev.Maglev/ResumeBackend" - Maglev_ListHealthChecks_FullMethodName = "/maglev.Maglev/ListHealthChecks" - Maglev_GetHealthCheck_FullMethodName = "/maglev.Maglev/GetHealthCheck" - Maglev_WatchBackendEvents_FullMethodName = "/maglev.Maglev/WatchBackendEvents" + Maglev_ListFrontends_FullMethodName = "/maglev.Maglev/ListFrontends" + Maglev_GetFrontend_FullMethodName = "/maglev.Maglev/GetFrontend" + Maglev_ListBackends_FullMethodName = "/maglev.Maglev/ListBackends" + Maglev_GetBackend_FullMethodName = "/maglev.Maglev/GetBackend" + Maglev_PauseBackend_FullMethodName = "/maglev.Maglev/PauseBackend" + Maglev_ResumeBackend_FullMethodName = "/maglev.Maglev/ResumeBackend" + Maglev_EnableBackend_FullMethodName = "/maglev.Maglev/EnableBackend" + Maglev_DisableBackend_FullMethodName = "/maglev.Maglev/DisableBackend" + Maglev_ListHealthChecks_FullMethodName = "/maglev.Maglev/ListHealthChecks" + Maglev_GetHealthCheck_FullMethodName = "/maglev.Maglev/GetHealthCheck" + Maglev_SetFrontendPoolBackendWeight_FullMethodName = "/maglev.Maglev/SetFrontendPoolBackendWeight" + Maglev_WatchEvents_FullMethodName = "/maglev.Maglev/WatchEvents" + Maglev_CheckConfig_FullMethodName = "/maglev.Maglev/CheckConfig" ) // MaglevClient is the client API for Maglev service. @@ -40,11 +44,15 @@ type MaglevClient interface { GetFrontend(ctx context.Context, in *GetFrontendRequest, opts ...grpc.CallOption) (*FrontendInfo, error) ListBackends(ctx context.Context, in *ListBackendsRequest, opts ...grpc.CallOption) (*ListBackendsResponse, error) GetBackend(ctx context.Context, in *GetBackendRequest, opts ...grpc.CallOption) (*BackendInfo, error) - PauseBackend(ctx context.Context, in *PauseResumeRequest, opts ...grpc.CallOption) (*BackendInfo, error) - ResumeBackend(ctx context.Context, in *PauseResumeRequest, opts ...grpc.CallOption) (*BackendInfo, error) + PauseBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error) + ResumeBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error) + EnableBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error) + DisableBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error) ListHealthChecks(ctx context.Context, in *ListHealthChecksRequest, opts ...grpc.CallOption) (*ListHealthChecksResponse, error) GetHealthCheck(ctx context.Context, in *GetHealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckInfo, error) - WatchBackendEvents(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[BackendEvent], error) + SetFrontendPoolBackendWeight(ctx context.Context, in *SetWeightRequest, opts ...grpc.CallOption) (*FrontendInfo, error) + WatchEvents(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Event], error) + CheckConfig(ctx context.Context, in *CheckConfigRequest, opts ...grpc.CallOption) (*CheckConfigResponse, error) } type maglevClient struct { @@ -95,7 +103,7 @@ func (c *maglevClient) GetBackend(ctx context.Context, in *GetBackendRequest, op return out, nil } -func (c *maglevClient) PauseBackend(ctx context.Context, in *PauseResumeRequest, opts ...grpc.CallOption) (*BackendInfo, error) { +func (c *maglevClient) PauseBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(BackendInfo) err := c.cc.Invoke(ctx, Maglev_PauseBackend_FullMethodName, in, out, cOpts...) @@ -105,7 +113,7 @@ func (c *maglevClient) PauseBackend(ctx context.Context, in *PauseResumeRequest, return out, nil } -func (c *maglevClient) ResumeBackend(ctx context.Context, in *PauseResumeRequest, opts ...grpc.CallOption) (*BackendInfo, error) { +func (c *maglevClient) ResumeBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(BackendInfo) err := c.cc.Invoke(ctx, Maglev_ResumeBackend_FullMethodName, in, out, cOpts...) @@ -115,6 +123,26 @@ func (c *maglevClient) ResumeBackend(ctx context.Context, in *PauseResumeRequest return out, nil } +func (c *maglevClient) EnableBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(BackendInfo) + err := c.cc.Invoke(ctx, Maglev_EnableBackend_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *maglevClient) DisableBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(BackendInfo) + err := c.cc.Invoke(ctx, Maglev_DisableBackend_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *maglevClient) ListHealthChecks(ctx context.Context, in *ListHealthChecksRequest, opts ...grpc.CallOption) (*ListHealthChecksResponse, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(ListHealthChecksResponse) @@ -135,13 +163,23 @@ func (c *maglevClient) GetHealthCheck(ctx context.Context, in *GetHealthCheckReq return out, nil } -func (c *maglevClient) WatchBackendEvents(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[BackendEvent], error) { +func (c *maglevClient) SetFrontendPoolBackendWeight(ctx context.Context, in *SetWeightRequest, opts ...grpc.CallOption) (*FrontendInfo, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - stream, err := c.cc.NewStream(ctx, &Maglev_ServiceDesc.Streams[0], Maglev_WatchBackendEvents_FullMethodName, cOpts...) + out := new(FrontendInfo) + err := c.cc.Invoke(ctx, Maglev_SetFrontendPoolBackendWeight_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } - x := &grpc.GenericClientStream[WatchRequest, BackendEvent]{ClientStream: stream} + return out, nil +} + +func (c *maglevClient) WatchEvents(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Event], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &Maglev_ServiceDesc.Streams[0], Maglev_WatchEvents_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[WatchRequest, Event]{ClientStream: stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } @@ -152,7 +190,17 @@ func (c *maglevClient) WatchBackendEvents(ctx context.Context, in *WatchRequest, } // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. -type Maglev_WatchBackendEventsClient = grpc.ServerStreamingClient[BackendEvent] +type Maglev_WatchEventsClient = grpc.ServerStreamingClient[Event] + +func (c *maglevClient) CheckConfig(ctx context.Context, in *CheckConfigRequest, opts ...grpc.CallOption) (*CheckConfigResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(CheckConfigResponse) + err := c.cc.Invoke(ctx, Maglev_CheckConfig_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} // MaglevServer is the server API for Maglev service. // All implementations must embed UnimplementedMaglevServer @@ -164,11 +212,15 @@ type MaglevServer interface { GetFrontend(context.Context, *GetFrontendRequest) (*FrontendInfo, error) ListBackends(context.Context, *ListBackendsRequest) (*ListBackendsResponse, error) GetBackend(context.Context, *GetBackendRequest) (*BackendInfo, error) - PauseBackend(context.Context, *PauseResumeRequest) (*BackendInfo, error) - ResumeBackend(context.Context, *PauseResumeRequest) (*BackendInfo, error) + PauseBackend(context.Context, *BackendRequest) (*BackendInfo, error) + ResumeBackend(context.Context, *BackendRequest) (*BackendInfo, error) + EnableBackend(context.Context, *BackendRequest) (*BackendInfo, error) + DisableBackend(context.Context, *BackendRequest) (*BackendInfo, error) ListHealthChecks(context.Context, *ListHealthChecksRequest) (*ListHealthChecksResponse, error) GetHealthCheck(context.Context, *GetHealthCheckRequest) (*HealthCheckInfo, error) - WatchBackendEvents(*WatchRequest, grpc.ServerStreamingServer[BackendEvent]) error + SetFrontendPoolBackendWeight(context.Context, *SetWeightRequest) (*FrontendInfo, error) + WatchEvents(*WatchRequest, grpc.ServerStreamingServer[Event]) error + CheckConfig(context.Context, *CheckConfigRequest) (*CheckConfigResponse, error) mustEmbedUnimplementedMaglevServer() } @@ -191,20 +243,32 @@ func (UnimplementedMaglevServer) ListBackends(context.Context, *ListBackendsRequ func (UnimplementedMaglevServer) GetBackend(context.Context, *GetBackendRequest) (*BackendInfo, error) { return nil, status.Error(codes.Unimplemented, "method GetBackend not implemented") } -func (UnimplementedMaglevServer) PauseBackend(context.Context, *PauseResumeRequest) (*BackendInfo, error) { +func (UnimplementedMaglevServer) PauseBackend(context.Context, *BackendRequest) (*BackendInfo, error) { return nil, status.Error(codes.Unimplemented, "method PauseBackend not implemented") } -func (UnimplementedMaglevServer) ResumeBackend(context.Context, *PauseResumeRequest) (*BackendInfo, error) { +func (UnimplementedMaglevServer) ResumeBackend(context.Context, *BackendRequest) (*BackendInfo, error) { return nil, status.Error(codes.Unimplemented, "method ResumeBackend not implemented") } +func (UnimplementedMaglevServer) EnableBackend(context.Context, *BackendRequest) (*BackendInfo, error) { + return nil, status.Error(codes.Unimplemented, "method EnableBackend not implemented") +} +func (UnimplementedMaglevServer) DisableBackend(context.Context, *BackendRequest) (*BackendInfo, error) { + return nil, status.Error(codes.Unimplemented, "method DisableBackend not implemented") +} func (UnimplementedMaglevServer) ListHealthChecks(context.Context, *ListHealthChecksRequest) (*ListHealthChecksResponse, error) { return nil, status.Error(codes.Unimplemented, "method ListHealthChecks not implemented") } func (UnimplementedMaglevServer) GetHealthCheck(context.Context, *GetHealthCheckRequest) (*HealthCheckInfo, error) { return nil, status.Error(codes.Unimplemented, "method GetHealthCheck not implemented") } -func (UnimplementedMaglevServer) WatchBackendEvents(*WatchRequest, grpc.ServerStreamingServer[BackendEvent]) error { - return status.Error(codes.Unimplemented, "method WatchBackendEvents not implemented") +func (UnimplementedMaglevServer) SetFrontendPoolBackendWeight(context.Context, *SetWeightRequest) (*FrontendInfo, error) { + return nil, status.Error(codes.Unimplemented, "method SetFrontendPoolBackendWeight not implemented") +} +func (UnimplementedMaglevServer) WatchEvents(*WatchRequest, grpc.ServerStreamingServer[Event]) error { + return status.Error(codes.Unimplemented, "method WatchEvents not implemented") +} +func (UnimplementedMaglevServer) CheckConfig(context.Context, *CheckConfigRequest) (*CheckConfigResponse, error) { + return nil, status.Error(codes.Unimplemented, "method CheckConfig not implemented") } func (UnimplementedMaglevServer) mustEmbedUnimplementedMaglevServer() {} func (UnimplementedMaglevServer) testEmbeddedByValue() {} @@ -300,7 +364,7 @@ func _Maglev_GetBackend_Handler(srv interface{}, ctx context.Context, dec func(i } func _Maglev_PauseBackend_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(PauseResumeRequest) + in := new(BackendRequest) if err := dec(in); err != nil { return nil, err } @@ -312,13 +376,13 @@ func _Maglev_PauseBackend_Handler(srv interface{}, ctx context.Context, dec func FullMethod: Maglev_PauseBackend_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(MaglevServer).PauseBackend(ctx, req.(*PauseResumeRequest)) + return srv.(MaglevServer).PauseBackend(ctx, req.(*BackendRequest)) } return interceptor(ctx, in, info, handler) } func _Maglev_ResumeBackend_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(PauseResumeRequest) + in := new(BackendRequest) if err := dec(in); err != nil { return nil, err } @@ -330,7 +394,43 @@ func _Maglev_ResumeBackend_Handler(srv interface{}, ctx context.Context, dec fun FullMethod: Maglev_ResumeBackend_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(MaglevServer).ResumeBackend(ctx, req.(*PauseResumeRequest)) + return srv.(MaglevServer).ResumeBackend(ctx, req.(*BackendRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Maglev_EnableBackend_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BackendRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MaglevServer).EnableBackend(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Maglev_EnableBackend_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MaglevServer).EnableBackend(ctx, req.(*BackendRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Maglev_DisableBackend_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BackendRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MaglevServer).DisableBackend(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Maglev_DisableBackend_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MaglevServer).DisableBackend(ctx, req.(*BackendRequest)) } return interceptor(ctx, in, info, handler) } @@ -371,16 +471,52 @@ func _Maglev_GetHealthCheck_Handler(srv interface{}, ctx context.Context, dec fu return interceptor(ctx, in, info, handler) } -func _Maglev_WatchBackendEvents_Handler(srv interface{}, stream grpc.ServerStream) error { +func _Maglev_SetFrontendPoolBackendWeight_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SetWeightRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MaglevServer).SetFrontendPoolBackendWeight(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Maglev_SetFrontendPoolBackendWeight_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MaglevServer).SetFrontendPoolBackendWeight(ctx, req.(*SetWeightRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Maglev_WatchEvents_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(WatchRequest) if err := stream.RecvMsg(m); err != nil { return err } - return srv.(MaglevServer).WatchBackendEvents(m, &grpc.GenericServerStream[WatchRequest, BackendEvent]{ServerStream: stream}) + return srv.(MaglevServer).WatchEvents(m, &grpc.GenericServerStream[WatchRequest, Event]{ServerStream: stream}) } // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. -type Maglev_WatchBackendEventsServer = grpc.ServerStreamingServer[BackendEvent] +type Maglev_WatchEventsServer = grpc.ServerStreamingServer[Event] + +func _Maglev_CheckConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CheckConfigRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MaglevServer).CheckConfig(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Maglev_CheckConfig_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MaglevServer).CheckConfig(ctx, req.(*CheckConfigRequest)) + } + return interceptor(ctx, in, info, handler) +} // Maglev_ServiceDesc is the grpc.ServiceDesc for Maglev service. // It's only intended for direct use with grpc.RegisterService, @@ -413,6 +549,14 @@ var Maglev_ServiceDesc = grpc.ServiceDesc{ MethodName: "ResumeBackend", Handler: _Maglev_ResumeBackend_Handler, }, + { + MethodName: "EnableBackend", + Handler: _Maglev_EnableBackend_Handler, + }, + { + MethodName: "DisableBackend", + Handler: _Maglev_DisableBackend_Handler, + }, { MethodName: "ListHealthChecks", Handler: _Maglev_ListHealthChecks_Handler, @@ -421,11 +565,19 @@ var Maglev_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetHealthCheck", Handler: _Maglev_GetHealthCheck_Handler, }, + { + MethodName: "SetFrontendPoolBackendWeight", + Handler: _Maglev_SetFrontendPoolBackendWeight_Handler, + }, + { + MethodName: "CheckConfig", + Handler: _Maglev_CheckConfig_Handler, + }, }, Streams: []grpc.StreamDesc{ { - StreamName: "WatchBackendEvents", - Handler: _Maglev_WatchBackendEvents_Handler, + StreamName: "WatchEvents", + Handler: _Maglev_WatchEvents_Handler, ServerStreams: true, }, }, diff --git a/internal/grpcapi/server.go b/internal/grpcapi/server.go index 3036012..7fa20fc 100644 --- a/internal/grpcapi/server.go +++ b/internal/grpcapi/server.go @@ -4,6 +4,7 @@ package grpcapi import ( "context" + "log/slog" "net" "google.golang.org/grpc/codes" @@ -17,15 +18,20 @@ import ( // Server implements the MaglevServer gRPC interface. type Server struct { UnimplementedMaglevServer - ctx context.Context - checker *checker.Checker + ctx context.Context + checker *checker.Checker + logs *LogBroadcaster + configPath string } -// NewServer creates a Server backed by the given Checker. The provided context -// controls the lifetime of streaming RPCs: cancelling it closes all active -// WatchBackendEvents streams so that grpc.Server.GracefulStop can complete. -func NewServer(ctx context.Context, c *checker.Checker) *Server { - return &Server{ctx: ctx, checker: c} +// NewServer creates a Server backed by the given Checker. logs may be nil, in +// which case log events are never sent to WatchEvents streams. configPath is +// used by CheckConfig to reload and validate the configuration file on demand. +// The provided context controls the lifetime of streaming RPCs: cancelling it +// closes all active WatchEvents streams so that grpc.Server.GracefulStop can +// complete. +func NewServer(ctx context.Context, c *checker.Checker, logs *LogBroadcaster, configPath string) *Server { + return &Server{ctx: ctx, checker: c, logs: logs, configPath: configPath} } // ListFrontends returns the names of all configured frontends. @@ -57,7 +63,7 @@ func (s *Server) GetBackend(_ context.Context, req *GetBackendRequest) (*Backend } // PauseBackend pauses health checking for a backend by name. -func (s *Server) PauseBackend(_ context.Context, req *PauseResumeRequest) (*BackendInfo, error) { +func (s *Server) PauseBackend(_ context.Context, req *BackendRequest) (*BackendInfo, error) { b, ok := s.checker.PauseBackend(req.Name) if !ok { return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name) @@ -66,7 +72,7 @@ func (s *Server) PauseBackend(_ context.Context, req *PauseResumeRequest) (*Back } // ResumeBackend resumes health checking for a backend by name. -func (s *Server) ResumeBackend(_ context.Context, req *PauseResumeRequest) (*BackendInfo, error) { +func (s *Server) ResumeBackend(_ context.Context, req *BackendRequest) (*BackendInfo, error) { b, ok := s.checker.ResumeBackend(req.Name) if !ok { return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name) @@ -74,6 +80,36 @@ func (s *Server) ResumeBackend(_ context.Context, req *PauseResumeRequest) (*Bac return backendToProto(b), nil } +// EnableBackend re-enables a previously disabled backend. +func (s *Server) EnableBackend(_ context.Context, req *BackendRequest) (*BackendInfo, error) { + b, ok := s.checker.EnableBackend(req.Name) + if !ok { + return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name) + } + return backendToProto(b), nil +} + +// DisableBackend disables a backend, stopping its probe goroutine. +func (s *Server) DisableBackend(_ context.Context, req *BackendRequest) (*BackendInfo, error) { + b, ok := s.checker.DisableBackend(req.Name) + if !ok { + return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name) + } + return backendToProto(b), nil +} + +// SetFrontendPoolBackendWeight updates the weight of a backend in a pool. +func (s *Server) SetFrontendPoolBackendWeight(_ context.Context, req *SetWeightRequest) (*FrontendInfo, error) { + if req.Weight < 0 || req.Weight > 100 { + return nil, status.Errorf(codes.InvalidArgument, "weight %d out of range [0, 100]", req.Weight) + } + fe, err := s.checker.SetFrontendPoolBackendWeight(req.Frontend, req.Pool, req.Backend, int(req.Weight)) + if err != nil { + return nil, status.Errorf(codes.NotFound, "%v", err) + } + return frontendToProto(req.Frontend, fe), nil +} + // ListHealthChecks returns the names of all configured health checks. func (s *Server) ListHealthChecks(_ context.Context, _ *ListHealthChecksRequest) (*ListHealthChecksResponse, error) { return &ListHealthChecksResponse{Names: s.checker.ListHealthChecks()}, nil @@ -88,30 +124,55 @@ func (s *Server) GetHealthCheck(_ context.Context, req *GetHealthCheckRequest) ( return healthCheckToProto(req.Name, hc), nil } -// WatchBackendEvents streams the current state of all backends on connect, then -// streams live state transitions until the client disconnects. -func (s *Server) WatchBackendEvents(_ *WatchRequest, stream Maglev_WatchBackendEventsServer) error { - // Send current state of all backends as synthetic events. - for _, name := range s.checker.ListBackends() { - snap, ok := s.checker.GetBackend(name) - if !ok { - continue - } - ev := &BackendEvent{ - BackendName: name, - Transition: &TransitionRecord{ - From: snap.Health.State.String(), - To: snap.Health.State.String(), - AtUnixNs: 0, - }, - } - if err := stream.Send(ev); err != nil { - return err +// WatchEvents streams events to the client. On connect, the current state of +// all backends is sent as synthetic BackendEvents. Afterwards, live events are +// forwarded based on the filter flags in req. An unset (nil) flag defaults to +// true (subscribe). An empty log_level defaults to "info". +func (s *Server) WatchEvents(req *WatchRequest, stream Maglev_WatchEventsServer) error { + wantLog := req.Log == nil || *req.Log + wantBackend := req.Backend == nil || *req.Backend + wantFrontend := req.Frontend == nil || *req.Frontend + _ = wantFrontend // no frontend events emitted yet + + logLevel := slog.LevelInfo + if req.LogLevel != "" { + if err := logLevel.UnmarshalText([]byte(req.LogLevel)); err != nil { + return status.Errorf(codes.InvalidArgument, "invalid log_level %q: must be debug, info, warn, or error", req.LogLevel) } } - ch, unsub := s.checker.Subscribe() - defer unsub() + // Subscribe to log events (nil channel blocks forever when not wanted). + var logCh <-chan *LogEvent + if wantLog && s.logs != nil { + var unsub func() + logCh, unsub = s.logs.Subscribe(logLevel) + defer unsub() + } + + // Subscribe to backend events; send initial state snapshot first. + var backendCh <-chan checker.Event + if wantBackend { + for _, name := range s.checker.ListBackends() { + snap, ok := s.checker.GetBackend(name) + if !ok { + continue + } + ev := &Event{Event: &Event_Backend{Backend: &BackendEvent{ + BackendName: name, + Transition: &TransitionRecord{ + From: snap.Health.State.String(), + To: snap.Health.State.String(), + AtUnixNs: 0, + }, + }}} + if err := stream.Send(ev); err != nil { + return err + } + } + var unsub func() + backendCh, unsub = s.checker.Subscribe() + defer unsub() + } for { select { @@ -119,21 +180,38 @@ func (s *Server) WatchBackendEvents(_ *WatchRequest, stream Maglev_WatchBackendE return status.Error(codes.Unavailable, "server shutting down") case <-stream.Context().Done(): return nil - case e, ok := <-ch: + case le, ok := <-logCh: if !ok { return nil } - ev := &BackendEvent{ + if err := stream.Send(&Event{Event: &Event_Log{Log: le}}); err != nil { + return err + } + case e, ok := <-backendCh: + if !ok { + return nil + } + if err := stream.Send(&Event{Event: &Event_Backend{Backend: &BackendEvent{ BackendName: e.BackendName, Transition: transitionToProto(e.Transition), - } - if err := stream.Send(ev); err != nil { + }}}); err != nil { return err } } } } +// CheckConfig reads and validates the configuration file, returning a +// structured result that distinguishes YAML parse errors from semantic errors. +func (s *Server) CheckConfig(_ context.Context, _ *CheckConfigRequest) (*CheckConfigResponse, error) { + _, result := config.Check(s.configPath) + return &CheckConfigResponse{ + Ok: result.OK(), + ParseError: result.ParseError, + SemanticError: result.SemanticError, + }, nil +} + // ---- conversion helpers ---------------------------------------------------- func frontendToProto(name string, fe config.Frontend) *FrontendInfo { diff --git a/internal/grpcapi/server_test.go b/internal/grpcapi/server_test.go index 51375f0..4ca0007 100644 --- a/internal/grpcapi/server_test.go +++ b/internal/grpcapi/server_test.go @@ -62,7 +62,7 @@ func startTestServer(t *testing.T, ctx context.Context, c *checker.Checker) (Mag t.Fatalf("listen: %v", err) } srv := grpc.NewServer() - RegisterMaglevServer(srv, NewServer(ctx, c)) + RegisterMaglevServer(srv, NewServer(ctx, c, nil, "")) go srv.Serve(lis) //nolint:errcheck conn, err := grpc.NewClient(lis.Addr().String(), @@ -198,7 +198,7 @@ func TestPauseResumeBackend(t *testing.T) { client, cleanup := startTestServer(t, ctx, c) defer cleanup() - info, err := client.PauseBackend(ctx, &PauseResumeRequest{Name: "be0"}) + info, err := client.PauseBackend(ctx, &BackendRequest{Name: "be0"}) if err != nil { t.Fatalf("PauseBackend: %v", err) } @@ -206,7 +206,7 @@ func TestPauseResumeBackend(t *testing.T) { t.Errorf("after pause: got %q, want paused", info.State) } - info, err = client.ResumeBackend(ctx, &PauseResumeRequest{Name: "be0"}) + info, err = client.ResumeBackend(ctx, &BackendRequest{Name: "be0"}) if err != nil { t.Fatalf("ResumeBackend: %v", err) } @@ -215,6 +215,78 @@ func TestPauseResumeBackend(t *testing.T) { } } +func TestSetFrontendPoolBackendWeight(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := makeTestChecker(ctx) + client, cleanup := startTestServer(t, ctx, c) + defer cleanup() + + info, err := client.SetFrontendPoolBackendWeight(ctx, &SetWeightRequest{ + Frontend: "web", + Pool: "primary", + Backend: "be0", + Weight: 42, + }) + if err != nil { + t.Fatalf("SetFrontendPoolBackendWeight: %v", err) + } + if len(info.Pools) == 0 || len(info.Pools[0].Backends) == 0 { + t.Fatal("response missing pools/backends") + } + if info.Pools[0].Backends[0].Weight != 42 { + t.Errorf("weight: got %d, want 42", info.Pools[0].Backends[0].Weight) + } + + // Invalid weight. + _, err = client.SetFrontendPoolBackendWeight(ctx, &SetWeightRequest{ + Frontend: "web", Pool: "primary", Backend: "be0", Weight: 101, + }) + if err == nil { + t.Error("expected error for weight 101") + } + + // Unknown frontend. + _, err = client.SetFrontendPoolBackendWeight(ctx, &SetWeightRequest{ + Frontend: "nope", Pool: "primary", Backend: "be0", Weight: 50, + }) + if err == nil { + t.Error("expected error for unknown frontend") + } +} + +func TestEnableDisableBackend(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := makeTestChecker(ctx) + client, cleanup := startTestServer(t, ctx, c) + defer cleanup() + + info, err := client.DisableBackend(ctx, &BackendRequest{Name: "be0"}) + if err != nil { + t.Fatalf("DisableBackend: %v", err) + } + if info.State != "removed" { + t.Errorf("after disable: got %q, want removed", info.State) + } + if info.Enabled { + t.Error("after disable: Enabled should be false") + } + + info, err = client.EnableBackend(ctx, &BackendRequest{Name: "be0"}) + if err != nil { + t.Fatalf("EnableBackend: %v", err) + } + if info.State != "unknown" { + t.Errorf("after enable: got %q, want unknown", info.State) + } + if !info.Enabled { + t.Error("after enable: Enabled should be true") + } +} + func TestListHealthChecks(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -266,7 +338,7 @@ func TestGetHealthCheckNotFound(t *testing.T) { } } -func TestWatchBackendEventsServerShutdown(t *testing.T) { +func TestWatchEventsServerShutdown(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -277,11 +349,11 @@ func TestWatchBackendEventsServerShutdown(t *testing.T) { client, cleanup := startTestServer(t, srvCtx, c) defer cleanup() - stream, err := client.WatchBackendEvents(ctx, &WatchRequest{}) + stream, err := client.WatchEvents(ctx, &WatchRequest{}) if err != nil { - t.Fatalf("WatchBackendEvents: %v", err) + t.Fatalf("WatchEvents: %v", err) } - // Drain the initial synthetic event. + // Drain the initial synthetic backend event. if _, err := stream.Recv(); err != nil { t.Fatalf("initial Recv: %v", err) } @@ -294,7 +366,7 @@ func TestWatchBackendEventsServerShutdown(t *testing.T) { } } -func TestWatchBackendEvents(t *testing.T) { +func TestWatchEventsBackend(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -302,17 +374,72 @@ func TestWatchBackendEvents(t *testing.T) { client, cleanup := startTestServer(t, ctx, c) defer cleanup() - stream, err := client.WatchBackendEvents(ctx, &WatchRequest{}) + stream, err := client.WatchEvents(ctx, &WatchRequest{}) if err != nil { - t.Fatalf("WatchBackendEvents: %v", err) + t.Fatalf("WatchEvents: %v", err) } - // Should receive the current state for be0 immediately. + // Should receive the current state for be0 immediately as a BackendEvent. ev, err := stream.Recv() if err != nil { t.Fatalf("Recv: %v", err) } - if ev.BackendName != "be0" { - t.Errorf("initial event: backend=%q, want be0", ev.BackendName) + be, ok := ev.Event.(*Event_Backend) + if !ok { + t.Fatalf("expected BackendEvent, got %T", ev.Event) + } + if be.Backend.BackendName != "be0" { + t.Errorf("initial event: backend=%q, want be0", be.Backend.BackendName) + } +} + +func TestWatchEventsLogOnly(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := makeTestChecker(ctx) + client, cleanup := startTestServer(t, ctx, c) + defer cleanup() + + f := false + stream, err := client.WatchEvents(ctx, &WatchRequest{Backend: &f, Frontend: &f}) + if err != nil { + t.Fatalf("WatchEvents: %v", err) + } + + // No initial snapshot should arrive (backend disabled). Verify by checking + // that the stream has no immediately-readable event. + recvCh := make(chan *Event, 1) + go func() { + ev, _ := stream.Recv() + recvCh <- ev + }() + select { + case ev := <-recvCh: + if _, isLog := ev.Event.(*Event_Log); !isLog { + t.Errorf("expected only LogEvents, got %T", ev.Event) + } + case <-time.After(50 * time.Millisecond): + // expected: no backend snapshot arrived + } +} + +func TestWatchEventsInvalidLogLevel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := makeTestChecker(ctx) + client, cleanup := startTestServer(t, ctx, c) + defer cleanup() + + // For streaming RPCs the server error arrives on the first Recv, not on the + // initial call. + stream, err := client.WatchEvents(ctx, &WatchRequest{LogLevel: "verbose"}) + if err != nil { + t.Fatalf("WatchEvents: %v", err) + } + _, err = stream.Recv() + if err == nil { + t.Fatal("expected error for invalid log_level") } } diff --git a/proto/maglev.proto b/proto/maglev.proto index 3ae43a5..26ff378 100644 --- a/proto/maglev.proto +++ b/proto/maglev.proto @@ -10,11 +10,15 @@ service Maglev { rpc GetFrontend(GetFrontendRequest) returns (FrontendInfo); rpc ListBackends(ListBackendsRequest) returns (ListBackendsResponse); rpc GetBackend(GetBackendRequest) returns (BackendInfo); - rpc PauseBackend(PauseResumeRequest) returns (BackendInfo); - rpc ResumeBackend(PauseResumeRequest) returns (BackendInfo); + rpc PauseBackend(BackendRequest) returns (BackendInfo); + rpc ResumeBackend(BackendRequest) returns (BackendInfo); + rpc EnableBackend(BackendRequest) returns (BackendInfo); + rpc DisableBackend(BackendRequest) returns (BackendInfo); rpc ListHealthChecks(ListHealthChecksRequest) returns (ListHealthChecksResponse); rpc GetHealthCheck(GetHealthCheckRequest) returns (HealthCheckInfo); - rpc WatchBackendEvents(WatchRequest) returns (stream BackendEvent); + rpc SetFrontendPoolBackendWeight(SetWeightRequest) returns (FrontendInfo); + rpc WatchEvents(WatchRequest) returns (stream Event); + rpc CheckConfig(CheckConfigRequest) returns (CheckConfigResponse); } // ---- requests --------------------------------------------------------------- @@ -31,7 +35,7 @@ message GetBackendRequest { string name = 1; } -message PauseResumeRequest { +message BackendRequest { string name = 1; } @@ -41,7 +45,29 @@ message GetHealthCheckRequest { string name = 1; } -message WatchRequest {} +message CheckConfigRequest {} + +message CheckConfigResponse { + bool ok = 1; + string parse_error = 2; // set when YAML cannot be read or parsed + string semantic_error = 3; // set when YAML is valid but semantically incorrect +} + +message SetWeightRequest { + string frontend = 1; + string pool = 2; + string backend = 3; + int32 weight = 4; // 0-100 +} + +// WatchRequest controls which event types are streamed. All fields default to +// true (i.e. an empty request subscribes to everything at info level). +message WatchRequest { + optional bool log = 1; // include log events (default: true) + string log_level = 2; // minimum log level: debug|info|warn|error (default: info) + optional bool backend = 3; // include backend transition events (default: true) + optional bool frontend = 4; // include frontend events (default: true) +} // ---- responses -------------------------------------------------------------- @@ -123,7 +149,36 @@ message TransitionRecord { int64 at_unix_ns = 3; } +// ---- event stream ----------------------------------------------------------- + +// LogAttr is a single key/value attribute from a structured log record. +message LogAttr { + string key = 1; + string value = 2; +} + +// LogEvent carries a single structured log record. +message LogEvent { + int64 at_unix_ns = 1; + string level = 2; + string msg = 3; + repeated LogAttr attrs = 4; +} + +// BackendEvent is emitted on every backend state transition. message BackendEvent { string backend_name = 1; TransitionRecord transition = 2; } + +// FrontendEvent is reserved for future frontend-level events. +message FrontendEvent {} + +// Event is the envelope returned by WatchEvents. +message Event { + oneof event { + LogEvent log = 1; + BackendEvent backend = 2; + FrontendEvent frontend = 3; + } +}