Add WatchEvents, enable/disable/weight RPCs, and config check

gRPC / proto
- Rename WatchBackendEvents → WatchEvents; return a stream of Event
  oneof (LogEvent, BackendEvent, FrontendEvent) with optional filter
  flags (log, log_level, backend, frontend)
- Add EnableBackend, DisableBackend, SetFrontendPoolBackendWeight RPCs
- Rename PauseResumeRequest → BackendRequest
- Add CheckConfig RPC returning ok/parse_error/semantic_error

maglevd
- Route slog through a LogBroadcaster (slog.Handler) so WatchEvents
  subscribers can receive structured log records independently of the
  daemon's own --log-level
- Add --reflection flag (default true) to toggle gRPC server reflection
- Add --check flag: validates config file and exits 0/1/2
- SIGHUP: use config.Check before applying reload; log parse vs semantic
  error separately; refuse reload on any error
- Rename default config path /etc/maglev → /etc/vpp-maglev

maglevc
- Add 'watch events [num <n>] [log [level <level>]] [backend] [frontend]'
  command; prints compact protojson, stops on any keypress or Ctrl-C;
  uses cbreak mode (not raw) so output post-processing is preserved
- Add 'set backend <name> enable|disable'
- Add 'set frontend <name> pool <pool> backend <name> weight <0-100>'
- Add 'config check' command

Debian packaging
- Rename service unit to vpp-maglevd.service
- Rename conffiles to /etc/default/vpp-maglev and /etc/vpp-maglev/
- Create maglevd system user/group in postinst; add to vpp group if present
- Add postrm; add adduser to Depends
This commit is contained in:
2026-04-11 16:42:11 +02:00
parent d612086a5f
commit 58391f5463
26 changed files with 1969 additions and 400 deletions

View File

@@ -2,55 +2,38 @@
Health checker and gRPC control plane for VPP Maglev load balancing.
## Build
## Build and Install
```sh
make # builds build/<arch>/maglevd and build/<arch>/maglevc
make test # runs all tests
make proto # regenerates gRPC stubs from proto/maglev.proto
make lint # runs golangci-lint
make pkg-deb # Creates a debian package for arm64 and amd64
```
Requires Go 1.25+ and (for `make proto`) `protoc` with `protoc-gen-go` and
`protoc-gen-go-grpc`.
## Debian package
```sh
make pkg-deb
```
Produces `vpp-maglev_<version>_amd64.deb` and `vpp-maglev_<version>_arm64.deb`
in the project root by cross-compiling with `GOOS=linux GOARCH=<arch>`.
in the `build/` directory by cross-compiling with `GOOS=linux GOARCH=<arch>`.
Requires `dpkg-deb` (available on any Debian/Ubuntu host).
The package installs:
| Path | Content |
|---|---|
| `/usr/sbin/maglevd` | Health-checker daemon |
| `/usr/bin/maglevc` | Interactive CLI client |
| `/lib/systemd/system/maglevd.service` | systemd unit |
| `/etc/default/maglev` | Environment file for the unit (conffile) |
| `/etc/maglev/maglev.yaml` | Example configuration file (conffile) |
| `/usr/share/man/man8/maglevd.8.gz` | Man page |
| `/usr/share/man/man1/maglevc.1.gz` | Man page |
## Running
After installing, the unit is enabled but not started automatically:
```sh
# edit /etc/maglev/maglev.yaml, then:
systemctl start maglevd
# edit /etc/vpp-maglev/maglev.yaml, then:
systemctl enable --now vpp-maglevd
```
## Run
Or run the server and client by hand:
```sh
maglevd --config /etc/maglev/maglev.yaml --grpc-addr :9090
maglevd --config /etc/vpp-maglev/maglev.yaml --grpc-addr :9090
maglevd --version # print version and exit
maglevc --server localhost:9090 # interactive shell
maglevc show backend nginx0-ams # one-shot
maglevc show frontends # one-shot
maglevc -color=false show backends # one-shot, no ANSI color
maglevc set backend nginx0-ams pause
```
@@ -58,35 +41,7 @@ maglevc set backend nginx0-ams pause
Send `SIGHUP` to `maglevd` to reload config without restarting.
`maglevd` requires `CAP_NET_RAW` for ICMP health checks.
## Minimal config
```yaml
maglev:
healthchecks:
http:
type: http
port: 80
params:
path: /healthz
interval: 2s
timeout: 3s
backends:
web0: {address: 192.0.2.10, healthcheck: http}
web1: {address: 192.0.2.11, healthcheck: http}
frontends:
web:
address: 192.0.2.1
protocol: tcp
port: 80
pools:
- name: primary
backends:
web0: {}
web1: {}
```
Check out a minimal configuration file in [[debian/maglev.yaml](debian/maglev.yaml)].
See [docs/user-guide.md](docs/user-guide.md) for flags, signals, and `maglevc` usage.
See [docs/config-guide.md](docs/config-guide.md) for the full configuration reference.
See [docs/healthchecks.md](docs/healthchecks.md) for health state machine details.
@@ -95,5 +50,5 @@ See [docs/healthchecks.md](docs/healthchecks.md) for health state machine detail
```sh
docker build -t maglevd .
docker run --cap-add NET_RAW -v /etc/maglev:/etc/maglev maglevd
docker run --cap-add NET_RAW -v /etc/vpp-maglev:/etc/vpp-maglev maglevd
```

View File

@@ -6,6 +6,7 @@ import (
"context"
"fmt"
"os"
"strconv"
"strings"
"text/tabwriter"
"time"
@@ -83,8 +84,8 @@ func buildTree() *Node {
// set backend <name> pause|resume|disabled|enabled
setPause := &Node{Word: "pause", Help: "pause health checking", Run: runPauseBackend}
setResume := &Node{Word: "resume", Help: "resume health checking", Run: runResumeBackend}
setDisabled := &Node{Word: "disabled", Help: "disable backend (not implemented)", Run: runNotImplemented}
setEnabled := &Node{Word: "enabled", Help: "enable backend (not implemented)", Run: runNotImplemented}
setDisabled := &Node{Word: "disable", Help: "disable backend (stop probing, remove from rotation)", Run: runDisableBackend}
setEnabled := &Node{Word: "enable", Help: "enable backend (resume probing)", Run: runEnableBackend}
setBackendName := &Node{
Word: "<name>",
Help: "backend name",
@@ -96,9 +97,75 @@ func buildTree() *Node {
Help: "modify a backend",
Children: []*Node{setBackendName},
}
set.Children = []*Node{setBackend}
// set frontend <name> pool <pool> backend <name> weight <0-100>
setWeightValue := &Node{
Word: "<weight>",
Help: "weight 0-100",
Run: runSetFrontendPoolBackendWeight,
}
setFrontendPoolBackendWeight := &Node{Word: "weight", Help: "set backend weight in pool", Children: []*Node{setWeightValue}}
setFrontendPoolBackendName := &Node{
Word: "<backend>",
Help: "backend name",
Dynamic: dynBackends,
Children: []*Node{setFrontendPoolBackendWeight},
}
setFrontendPoolBackend := &Node{Word: "backend", Help: "select a backend", Children: []*Node{setFrontendPoolBackendName}}
setFrontendPoolName := &Node{
Word: "<pool>",
Help: "pool name",
Children: []*Node{setFrontendPoolBackend},
}
setFrontendPool := &Node{Word: "pool", Help: "select a pool", Children: []*Node{setFrontendPoolName}}
setFrontendName := &Node{
Word: "<name>",
Help: "frontend name",
Dynamic: dynFrontends,
Children: []*Node{setFrontendPool},
}
setFrontend := &Node{
Word: "frontend",
Help: "modify a frontend",
Children: []*Node{setFrontendName},
}
root.Children = []*Node{show, set, quit, exit}
set.Children = []*Node{setBackend, setFrontend}
// watch events [num <n>] [log [level <level>]] [backend] [frontend]
//
// All tokens after 'events' are captured as args via a self-referencing slot
// node. This lets runWatchEvents parse the optional flags manually while still
// providing tab-completion through the dynamic enumerator.
var watchEventsOptSlot *Node
watchEventsOptSlot = &Node{
Word: "<opt>",
Help: "watch option",
Dynamic: dynWatchEventOpts,
Run: runWatchEvents,
}
watchEventsOptSlot.Children = []*Node{watchEventsOptSlot}
watchEvents := &Node{
Word: "events",
Help: "stream events (press any key or Ctrl-C to stop)",
Run: runWatchEvents,
Children: []*Node{watchEventsOptSlot},
}
watch := &Node{
Word: "watch",
Help: "watch live event streams",
Children: []*Node{watchEvents},
}
// config check
configCheck := &Node{Word: "check", Help: "check configuration file", Run: runConfigCheck}
configNode := &Node{
Word: "config",
Help: "configuration commands",
Children: []*Node{configCheck},
}
root.Children = []*Node{show, set, watch, configNode, quit, exit}
return root
}
@@ -332,7 +399,7 @@ func runPauseBackend(ctx context.Context, client grpcapi.MaglevClient, args []st
}
ctx, cancel := context.WithTimeout(ctx, callTimeout)
defer cancel()
info, err := client.PauseBackend(ctx, &grpcapi.PauseResumeRequest{Name: args[0]})
info, err := client.PauseBackend(ctx, &grpcapi.BackendRequest{Name: args[0]})
if err != nil {
return err
}
@@ -346,7 +413,7 @@ func runResumeBackend(ctx context.Context, client grpcapi.MaglevClient, args []s
}
ctx, cancel := context.WithTimeout(ctx, callTimeout)
defer cancel()
info, err := client.ResumeBackend(ctx, &grpcapi.PauseResumeRequest{Name: args[0]})
info, err := client.ResumeBackend(ctx, &grpcapi.BackendRequest{Name: args[0]})
if err != nil {
return err
}
@@ -354,9 +421,84 @@ func runResumeBackend(ctx context.Context, client grpcapi.MaglevClient, args []s
return nil
}
func runNotImplemented(_ context.Context, _ grpcapi.MaglevClient, _ []string) error {
fmt.Println("not implemented yet")
func runSetFrontendPoolBackendWeight(ctx context.Context, client grpcapi.MaglevClient, args []string) error {
if len(args) != 4 {
return fmt.Errorf("usage: set frontend <name> pool <pool> backend <name> weight <0-100>")
}
frontendName, poolName, backendName, weightStr := args[0], args[1], args[2], args[3]
weight, err := strconv.Atoi(weightStr)
if err != nil || weight < 0 || weight > 100 {
return fmt.Errorf("weight: expected integer 0-100, got %q", weightStr)
}
ctx, cancel := context.WithTimeout(ctx, callTimeout)
defer cancel()
info, err := client.SetFrontendPoolBackendWeight(ctx, &grpcapi.SetWeightRequest{
Frontend: frontendName,
Pool: poolName,
Backend: backendName,
Weight: int32(weight),
})
if err != nil {
return err
}
// Print the updated pool so the user can confirm the new weight.
for _, pool := range info.Pools {
if pool.Name != poolName {
continue
}
for _, pb := range pool.Backends {
if pb.Name == backendName {
fmt.Printf("%s pool %s backend %s: weight set to %d\n", info.Name, pool.Name, pb.Name, pb.Weight)
return nil
}
}
}
return nil
}
func runEnableBackend(ctx context.Context, client grpcapi.MaglevClient, args []string) error {
if len(args) == 0 {
return fmt.Errorf("usage: set backend <name> enable")
}
ctx, cancel := context.WithTimeout(ctx, callTimeout)
defer cancel()
info, err := client.EnableBackend(ctx, &grpcapi.BackendRequest{Name: args[0]})
if err != nil {
return err
}
fmt.Printf("%s: enabled, state is '%s'\n", info.Name, info.State)
return nil
}
func runDisableBackend(ctx context.Context, client grpcapi.MaglevClient, args []string) error {
if len(args) == 0 {
return fmt.Errorf("usage: set backend <name> disable")
}
ctx, cancel := context.WithTimeout(ctx, callTimeout)
defer cancel()
info, err := client.DisableBackend(ctx, &grpcapi.BackendRequest{Name: args[0]})
if err != nil {
return err
}
fmt.Printf("%s: disabled, state is '%s'\n", info.Name, info.State)
return nil
}
func runConfigCheck(ctx context.Context, client grpcapi.MaglevClient, _ []string) error {
ctx, cancel := context.WithTimeout(ctx, callTimeout)
defer cancel()
resp, err := client.CheckConfig(ctx, &grpcapi.CheckConfigRequest{})
if err != nil {
return err
}
if resp.Ok {
fmt.Println("config ok")
return nil
}
if resp.ParseError != "" {
return fmt.Errorf("parse error: %s", resp.ParseError)
}
return fmt.Errorf("semantic error: %s", resp.SemanticError)
}
// formatDuration formats a duration as Xd Xh Xm Xs without milliseconds.

174
cmd/maglevc/watch.go Normal file
View File

@@ -0,0 +1,174 @@
// Copyright (c) 2026, Pim van Pelt <pim@ipng.ch>
package main
import (
"context"
"fmt"
"os"
"os/signal"
"strconv"
"syscall"
"golang.org/x/sys/unix"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"git.ipng.ch/ipng/vpp-maglev/internal/grpcapi"
)
func dynWatchEventOpts(_ context.Context, _ grpcapi.MaglevClient) []string {
return []string{"num", "log", "backend", "frontend"}
}
// runWatchEvents implements 'watch events [num <n>] [log [level <level>]] [backend] [frontend]'.
// All tokens after 'events' are captured as args by the circular slot node in the tree.
// If none of log/backend/frontend are mentioned, all three default to true.
func runWatchEvents(ctx context.Context, client grpcapi.MaglevClient, args []string) error {
var maxEvents int // 0 = unlimited
var wantLog, wantBackend, wantFrontend bool
logLevel := ""
anyExplicit := false
for i := 0; i < len(args); {
switch args[i] {
case "num":
if i+1 >= len(args) {
return fmt.Errorf("num requires a count argument")
}
n, err := strconv.Atoi(args[i+1])
if err != nil || n < 1 {
return fmt.Errorf("num: invalid count %q", args[i+1])
}
maxEvents = n
i += 2
case "log":
wantLog = true
anyExplicit = true
if i+1 < len(args) && args[i+1] == "level" {
if i+2 >= len(args) {
return fmt.Errorf("log level requires a level argument")
}
logLevel = args[i+2]
i += 3
} else {
i++
}
case "backend":
wantBackend = true
anyExplicit = true
i++
case "frontend":
wantFrontend = true
anyExplicit = true
i++
default:
return fmt.Errorf("unknown watch option %q; expected: num, log, backend, frontend", args[i])
}
}
if !anyExplicit {
wantLog, wantBackend, wantFrontend = true, true, true
}
boolp := func(b bool) *bool { v := b; return &v }
req := &grpcapi.WatchRequest{
Log: boolp(wantLog),
LogLevel: logLevel,
Backend: boolp(wantBackend),
Frontend: boolp(wantFrontend),
}
// Cancel the stream on keypress or signal.
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
go watchStopOnKeypress(watchCtx, cancel)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(sigCh)
go func() {
select {
case <-sigCh:
cancel()
case <-watchCtx.Done():
}
}()
stream, err := client.WatchEvents(watchCtx, req)
if err != nil {
return err
}
marshaler := protojson.MarshalOptions{}
count := 0
for {
ev, err := stream.Recv()
if err != nil {
if watchCtx.Err() != nil {
return nil // stopped by keypress or signal
}
if st, ok := status.FromError(err); ok && st.Code() == codes.Canceled {
return nil
}
return err
}
data, err := marshaler.Marshal(ev)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
fmt.Printf("%s\n", data)
count++
if maxEvents > 0 && count >= maxEvents {
return nil
}
}
}
// watchStopOnKeypress puts stdin into cbreak mode (when it is a terminal) and
// calls cancel when any byte arrives. Cbreak mode disables canonical (line)
// input so a single keypress is sufficient, while preserving output
// post-processing (OPOST/ONLCR) so that fmt.Printf("\n") still produces the
// expected carriage-return+newline on screen. Falls back gracefully when stdin
// is not a tty. The goroutine exits when ctx is cancelled.
func watchStopOnKeypress(ctx context.Context, cancel context.CancelFunc) {
fd := int(os.Stdin.Fd())
if old, err := stdinCbreak(fd); err == nil {
defer unix.IoctlSetTermios(fd, unix.TCSETSF, old) //nolint:errcheck
}
readDone := make(chan struct{})
go func() {
defer close(readDone)
buf := make([]byte, 1)
os.Stdin.Read(buf) //nolint:errcheck
}()
select {
case <-readDone:
cancel()
case <-ctx.Done():
}
}
// stdinCbreak sets the terminal referred to by fd into cbreak mode: canonical
// input and echo are disabled (so single keystrokes are immediately available)
// but output post-processing is left untouched (so \n still maps to \r\n).
// Returns the previous termios so the caller can restore it, or an error if fd
// is not a terminal.
func stdinCbreak(fd int) (*unix.Termios, error) {
old, err := unix.IoctlGetTermios(fd, unix.TCGETS)
if err != nil {
return nil, err // not a terminal
}
t := *old
t.Lflag &^= unix.ICANON | unix.ECHO | unix.ECHOE | unix.ECHOK | unix.ECHONL
t.Cc[unix.VMIN] = 1
t.Cc[unix.VTIME] = 0
if err := unix.IoctlSetTermios(fd, unix.TCSETS, &t); err != nil {
return nil, err
}
return old, nil
}

View File

@@ -13,6 +13,7 @@ import (
"syscall"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
buildinfo "git.ipng.ch/ipng/vpp-maglev/cmd"
"git.ipng.ch/ipng/vpp-maglev/internal/checker"
@@ -30,7 +31,9 @@ func main() {
func run() error {
// ---- flags / env --------------------------------------------------------
printVersion := flag.Bool("version", false, "print version and exit")
configPath := stringFlag("config", "/etc/maglev/frontend.yaml", "MAGLEV_CONFIG", "path to frontend.yaml")
checkOnly := flag.Bool("check", false, "check config file and exit (0=ok, 1=parse error, 2=semantic error)")
enableReflection := flag.Bool("reflection", true, "enable gRPC server reflection (for grpcurl)")
configPath := stringFlag("config", "/etc/vpp-maglev/maglev.yaml", "MAGLEV_CONFIG", "path to maglev.yaml")
grpcAddr := stringFlag("grpc-addr", ":9090", "MAGLEV_GRPC_ADDR", "gRPC listen address")
logLevel := stringFlag("log-level", "info", "MAGLEV_LOG_LEVEL", "log level (debug|info|warn|error)")
flag.Parse()
@@ -41,12 +44,28 @@ func run() error {
return nil
}
if *checkOnly {
_, result := config.Check(*configPath)
if result.OK() {
fmt.Printf("config ok: %s\n", *configPath)
return nil
}
if result.ParseError != "" {
fmt.Fprintf(os.Stderr, "parse error: %s\n", result.ParseError)
os.Exit(1)
}
fmt.Fprintf(os.Stderr, "semantic error: %s\n", result.SemanticError)
os.Exit(2)
}
// ---- logging ------------------------------------------------------------
var level slog.Level
if err := level.UnmarshalText([]byte(*logLevel)); err != nil {
return fmt.Errorf("invalid log level %q: %w", *logLevel, err)
}
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: level})))
jsonHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: level})
logBroadcaster := grpcapi.NewLogBroadcaster(jsonHandler)
slog.SetDefault(slog.New(logBroadcaster))
slog.Info("starting", "version", buildinfo.Version(), "commit", buildinfo.Commit(), "date", buildinfo.Date())
// ---- config -------------------------------------------------------------
@@ -74,8 +93,11 @@ func run() error {
return fmt.Errorf("listen %s: %w", *grpcAddr, err)
}
srv := grpc.NewServer()
grpcapi.RegisterMaglevServer(srv, grpcapi.NewServer(ctx, chkr))
slog.Info("grpc-listening", "addr", *grpcAddr)
grpcapi.RegisterMaglevServer(srv, grpcapi.NewServer(ctx, chkr, logBroadcaster, *configPath))
if *enableReflection {
reflection.Register(srv)
}
slog.Info("grpc-listening", "addr", *grpcAddr, "reflection", *enableReflection)
go func() {
if err := srv.Serve(lis); err != nil {
@@ -91,9 +113,13 @@ func run() error {
switch sig {
case syscall.SIGHUP:
slog.Info("config-reload-start")
newCfg, err := config.Load(*configPath)
if err != nil {
slog.Error("config-reload-error", "err", err)
newCfg, result := config.Check(*configPath)
if !result.OK() {
if result.ParseError != "" {
slog.Error("config-check-failed", "type", "parse", "err", result.ParseError)
} else {
slog.Error("config-check-failed", "type", "semantic", "err", result.SemanticError)
}
continue
}
if err := chkr.Reload(ctx, newCfg); err != nil {

15
debian/build-deb.sh vendored
View File

@@ -22,7 +22,7 @@ install -d "$STAGING/usr/share/man/man1"
install -d "$STAGING/usr/share/man/man8"
install -d "$STAGING/lib/systemd/system"
install -d "$STAGING/etc/default"
install -d "$STAGING/etc/maglev"
install -d "$STAGING/etc/vpp-maglev"
install -d "$STAGING/DEBIAN"
# Binaries
@@ -34,22 +34,23 @@ gzip -9 -c "$REPO_ROOT/docs/maglevd.8" > "$STAGING/usr/share/man/man8/maglevd.8.
gzip -9 -c "$REPO_ROOT/docs/maglevc.1" > "$STAGING/usr/share/man/man1/maglevc.1.gz"
# Systemd unit
install -m 644 "$REPO_ROOT/debian/maglevd.service" "$STAGING/lib/systemd/system/maglevd.service"
install -m 644 "$REPO_ROOT/debian/vpp-maglevd.service" "$STAGING/lib/systemd/system/vpp-maglevd.service"
# /etc/default/maglev (conffile — dpkg won't overwrite on upgrade)
install -m 644 "$REPO_ROOT/debian/default.maglev" "$STAGING/etc/default/maglev"
# /etc/default/vpp-maglev (conffile — dpkg won't overwrite on upgrade)
install -m 644 "$REPO_ROOT/debian/default.vpp-maglev" "$STAGING/etc/default/vpp-maglev"
# /etc/maglev/maglev.yaml (conffile)
install -m 644 "$REPO_ROOT/debian/maglev.yaml" "$STAGING/etc/maglev/maglev.yaml"
# /etc/vpp-maglev/maglev.yaml (conffile)
install -m 644 "$REPO_ROOT/debian/maglev.yaml" "$STAGING/etc/vpp-maglev/maglev.yaml"
# DEBIAN/control (version field uses full_version including commit)
sed "s/@VERSION@/${FULL_VERSION}/;s/@ARCH@/${ARCH}/" \
"$REPO_ROOT/debian/control.in" > "$STAGING/DEBIAN/control"
# DEBIAN/conffiles, postinst, prerm
# DEBIAN/conffiles, postinst, prerm, postrm
install -m 644 "$REPO_ROOT/debian/conffiles" "$STAGING/DEBIAN/conffiles"
install -m 755 "$REPO_ROOT/debian/postinst" "$STAGING/DEBIAN/postinst"
install -m 755 "$REPO_ROOT/debian/prerm" "$STAGING/DEBIAN/prerm"
install -m 755 "$REPO_ROOT/debian/postrm" "$STAGING/DEBIAN/postrm"
# Emit package into build/
mkdir -p "$REPO_ROOT/build"

4
debian/conffiles vendored
View File

@@ -1,2 +1,2 @@
/etc/default/maglev
/etc/maglev/maglev.yaml
/etc/default/vpp-maglev
/etc/vpp-maglev/maglev.yaml

2
debian/control.in vendored
View File

@@ -4,7 +4,7 @@ Architecture: @ARCH@
Maintainer: Pim van Pelt <pim@ipng.ch>
Section: net
Priority: optional
Depends: systemd
Depends: systemd, adduser
Description: Maglev health-checker daemon and CLI client
maglevd monitors backends (HTTP, TCP, ICMP) with a rise/fall counter
model and exposes their aggregated state over a gRPC API. Configuration

View File

@@ -1,9 +1,9 @@
# Default settings for maglevd.
# This file is sourced by /lib/systemd/system/maglevd.service.
# After editing, run: systemctl restart maglevd
# This file is sourced by /lib/systemd/system/vpp-maglevd.service.
# After editing, run: systemctl restart vpp-maglevd
# Path to the YAML configuration file.
MAGLEV_CONFIG=/etc/maglev/maglev.yaml
MAGLEV_CONFIG=/etc/vpp-maglev/maglev.yaml
# gRPC listen address (default: :9090)
#MAGLEV_GRPC_ADDR=:9090

16
debian/postinst vendored
View File

@@ -2,7 +2,21 @@
set -e
case "$1" in
configure)
# Create system user and group if they don't exist.
if ! getent group maglevd > /dev/null 2>&1; then
addgroup --system --quiet maglevd
fi
if ! getent passwd maglevd > /dev/null 2>&1; then
adduser --system --no-create-home --shell /usr/sbin/nologin \
--ingroup maglevd --quiet maglevd
fi
# Add maglevd to vpp group if it exists (needed for VPP API socket access).
if getent group vpp > /dev/null 2>&1; then
adduser --quiet maglevd vpp || true
fi
systemctl daemon-reload || true
systemctl enable maglevd.service || true
systemctl enable vpp-maglevd.service || true
;;
esac

12
debian/postrm vendored Normal file
View File

@@ -0,0 +1,12 @@
#!/bin/sh
set -e
case "$1" in
purge)
if getent passwd maglevd > /dev/null 2>&1; then
deluser --quiet --system maglevd || true
fi
if getent group maglevd > /dev/null 2>&1; then
delgroup --quiet --system maglevd || true
fi
;;
esac

4
debian/prerm vendored
View File

@@ -2,7 +2,7 @@
set -e
case "$1" in
remove|purge)
systemctl stop maglevd.service || true
systemctl disable maglevd.service || true
systemctl stop vpp-maglevd.service || true
systemctl disable vpp-maglevd.service || true
;;
esac

View File

@@ -5,11 +5,18 @@ After=network-online.target
Wants=network-online.target
[Service]
EnvironmentFile=/etc/default/maglev
User=maglevd
Group=maglevd
EnvironmentFile=/etc/default/vpp-maglev
ExecStart=/usr/sbin/maglevd --config ${MAGLEV_CONFIG}
Restart=on-failure
RestartSec=5s
Type=simple
# Grant only CAP_NET_RAW (needed for ICMP probes) and drop everything else.
AmbientCapabilities=CAP_NET_RAW
CapabilityBoundingSet=CAP_NET_RAW
NoNewPrivileges=yes
[Install]
WantedBy=multi-user.target

View File

@@ -11,7 +11,7 @@ in two stages:
ensuring that every backend referenced by a frontend exists, that address families are
consistent within a frontend, and that IP source addresses are the correct family.
If you want to get started quickly, take a look at the [example config](#example).
If you want to get started quickly, take a look at the [[example config](../debian/mavleg.yaml)].
## Basic structure
@@ -277,109 +277,7 @@ frontends:
---
## Example
For a detailed description of the health state machine, probe intervals, and all transition events,
see [[healthchecks.md](healthchecks.md)]. For a user guide on how to use the maglev daemon and client,
see the [[user-guide.md](user-guide.md)].
A complete configuration tying all sections together:
```yaml
maglev:
healthchecker:
transition-history: 5
netns: dataplane
healthchecks:
nginx:
type: http
port: 80
params:
path: /healthz
host: nginx.example.com
response-code: "200"
interval: 2s
fast-interval: 500ms
down-interval: 30s
timeout: 3s
rise: 2
fall: 3
dovecot:
type: tcp
port: 993
params:
ssl: true
server-name: imaps.example.com
interval: 5s
fast-interval: 1s
down-interval: 30s
timeout: 3s
rise: 2
fall: 3
ping6:
type: icmp
probe-ipv6-src: 2001:db8:probe::1
interval: 2s
timeout: 1s
backends:
nginx0-ams:
address: 198.51.100.10
healthcheck: nginx
nginx0-lon:
address: 198.51.100.11
healthcheck: nginx
nginx0-fra:
address: 198.51.100.12
healthcheck: nginx
maildrop0-ams:
address: 2001:db8:1::10
healthcheck: dovecot
maildrop0-lon:
address: 2001:db8:1::11
healthcheck: dovecot
frontends:
nginx-http:
description: "HTTP VIP with fallback"
address: 198.51.100.1
protocol: tcp
port: 80
pools:
- name: primary
backends:
nginx0-ams: { weight: 10 }
nginx0-lon: {}
- name: fallback
backends:
nginx0-fra: {}
nginx-https:
description: "HTTPS VIP — same backends, different port"
address: 198.51.100.1
protocol: tcp
port: 443
pools:
- name: primary
backends:
nginx0-ams: { weight: 10 }
nginx0-lon: {}
- name: fallback
backends:
nginx0-fra: {}
maildrop-imaps:
description: "IMAPS VIP"
address: 2001:db8::1
protocol: tcp
port: 993
pools:
- name: primary
backends:
maildrop0-ams: {}
maildrop0-lon: {}
```
---
For a detailed description of the health state machine, probe intervals, and all
transition events, see [healthchecks.md](healthchecks.md).

View File

@@ -6,6 +6,7 @@ maglevd \- Maglev health\-checker daemon
[\fB\-config\fR \fIfile\fR]
[\fB\-grpc\-addr\fR \fIaddr\fR]
[\fB\-log\-level\fR \fIlevel\fR]
[\fB\-reflection\fR[=\fIbool\fR]]
[\fB\-version\fR]
.SH DESCRIPTION
.B maglevd
@@ -32,7 +33,7 @@ parentheses); the flag takes precedence.
.TP
.BI \-config " file"
Path to the YAML configuration file.
.RI "(default: " /etc/maglev/maglev.conf "; env: " MAGLEV_CONFIG )
.RI "(default: " /etc/vpp-maglev/maglev.yaml "; env: " MAGLEV_CONFIG )
.TP
.BI \-grpc\-addr " addr"
TCP address on which the gRPC server listens.
@@ -47,6 +48,16 @@ or
.BR error .
.RI "(default: " info "; env: " MAGLEV_LOG_LEVEL )
.TP
.B \-reflection
Enable gRPC server reflection so that clients such as
.BR grpcurl (1)
can introspect the API without access to the
.I .proto
file.
Enabled by default; pass
.B \-reflection=false
to disable.
.TP
.B \-version
Print version, commit hash, and build date, then exit.
.SH SIGNALS
@@ -60,10 +71,10 @@ backend workers are left running.
Gracefully shut down: drain active gRPC streams, then exit.
.SH FILES
.TP
.I /etc/maglev/maglev.conf
.I /etc/vpp-maglev/maglev.yaml
Default configuration file (YAML).
.TP
.I /etc/default/maglev
.I /etc/default/vpp-maglev
Environment file sourced by the systemd unit before starting
.BR maglevd .
.SH CONFIGURATION
@@ -77,7 +88,7 @@ and
.BR frontends .
.PP
See the example at
.I /etc/maglev/maglev.conf
.I /etc/vpp-maglev/maglev.yaml
and the full reference in the project documentation.
.SH SEE ALSO
.BR maglevc (1)

View File

@@ -10,9 +10,11 @@ inspection and control.
| Flag | Environment variable | Default | Description |
|---|---|---|---|
| `--config` | `MAGLEV_CONFIG` | `/etc/maglev/maglev.yaml` | Path to the YAML configuration file. |
| `--config` | `MAGLEV_CONFIG` | `/etc/vpp-maglev/maglev.yaml` | Path to the YAML configuration file. |
| `--grpc-addr` | `MAGLEV_GRPC_ADDR` | `:9090` | TCP address on which the gRPC server listens. |
| `--log-level` | `MAGLEV_LOG_LEVEL` | `info` | Log verbosity: `debug`, `info`, `warn`, or `error`. |
| `--check` | — | — | Read and validate the config file, then exit. Exits 0 if the config is valid, 1 on YAML parse error, 2 on semantic error. |
| `--reflection` | — | `true` | Enable gRPC server reflection. Allows `grpcurl` to introspect the API without the `.proto` file. Set to `false` to disable. |
| `--version` | — | — | Print version, commit hash, and build date, then exit. |
Flags take precedence over environment variables. Both are optional; defaults
@@ -22,7 +24,7 @@ are used for anything not set.
| Signal | Effect |
|---|---|
| `SIGHUP` | Reload the configuration file. New backends are started, removed backends are stopped, backends whose health-check config is unchanged continue probing without interruption. |
| `SIGHUP` | Reload the configuration file. The file is checked before applying; if there is a parse or semantic error the reload is aborted and the error is logged (the daemon continues running with its current config). New backends are started, removed backends are stopped, backends whose health-check config is unchanged continue probing without interruption. |
| `SIGTERM` / `SIGINT` | Graceful shutdown. Active gRPC streams are closed, the server drains, then the process exits. |
### Capabilities
@@ -83,6 +85,36 @@ show healthcheck <name> Show full health-check configuration.
set backend <name> pause Suspend health checking for a backend, freezing its state.
set backend <name> resume Resume health checking; backend re-enters unknown state
and is probed immediately.
set frontend <name> pool <pool> backend <name> weight <0-100>
Set the weight of a backend within a pool. Weight 0 keeps
the backend in the pool but assigns it no traffic.
Takes effect immediately without reloading configuration.
set backend <name> disable Stop probing entirely and remove the backend from rotation.
The backend remains visible (state: removed) and can be
re-enabled without reloading configuration.
set backend <name> enable Re-enable a disabled backend. A fresh probe goroutine is
started and the backend re-enters unknown state.
watch events Stream all events (log, backend transitions, frontend)
[num <n>] Stop after receiving n events.
[log [level <level>]] Include log events. level is debug|info|warn|error
(default: info). Omitting log/backend/frontend enables all.
[backend] Include backend transition events.
[frontend] Include frontend events (reserved for future use).
Each event is printed as compact JSON on its own line.
Press any key or Ctrl-C to stop. Examples:
watch events
watch events num 20
watch events log level debug
watch events backend num 100
watch events log level debug backend
config check Ask maglevd to read and validate its current config file.
Prints "config ok" on success, or the error (parse or
semantic) returned by the daemon.
quit / exit Leave the interactive shell.
```

4
go.mod
View File

@@ -3,16 +3,16 @@ module git.ipng.ch/ipng/vpp-maglev
go 1.25.0
require (
github.com/chzyer/readline v1.5.1
github.com/vishvananda/netns v0.0.5
golang.org/x/net v0.52.0
golang.org/x/sys v0.43.0
google.golang.org/grpc v1.80.0
google.golang.org/protobuf v1.36.11
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/chzyer/readline v1.5.1 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/text v0.35.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect
)

6
go.sum
View File

@@ -1,8 +1,10 @@
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.2.1 h1:XHDu3E6q+gdHgsdTPH6ImJMIp436vR6MPtH8gP05QzM=
github.com/chzyer/logex v1.2.1/go.mod h1:JLbx6lG2kDbNRFnfkgvh4eRJRPX1QCoOIWomwysCBrQ=
github.com/chzyer/readline v1.5.1 h1:upd/6fQk4src78LMRzh5vItIt361/o4uq553V8B5sGI=
github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk=
github.com/chzyer/test v1.0.0 h1:p3BQDXSxOhOG0P9z6/hGnII4LGiEPOYBhs8asl/fC04=
github.com/chzyer/test v1.0.0/go.mod h1:2JlltgoNkt4TW/z9V/IzDdFaMTM2JPIi26O1pF38GC8=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
@@ -31,8 +33,8 @@ go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4Etq
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=

View File

@@ -4,6 +4,7 @@ package checker
import (
"context"
"fmt"
"log/slog"
"net"
"sort"
@@ -42,6 +43,7 @@ type worker struct {
// Each backend is probed exactly once, regardless of how many frontends
// reference it.
type Checker struct {
runCtx context.Context // set in Run; used by EnableBackend to start new goroutines
cfg *config.Config
mu sync.RWMutex
workers map[string]*worker // keyed by backend name
@@ -67,6 +69,7 @@ func (c *Checker) Run(ctx context.Context) error {
go c.fanOut(ctx)
c.mu.Lock()
c.runCtx = ctx // safe: held under mu before any EnableBackend call can read it
names := activeBackendNames(c.cfg)
maxHistory := c.cfg.HealthChecker.TransitionHistory
for i, name := range names {
@@ -167,6 +170,36 @@ func (c *Checker) GetFrontend(name string) (config.Frontend, bool) {
return v, ok
}
// SetFrontendPoolBackendWeight updates the weight of a backend within a named
// pool of a frontend. Returns the updated FrontendInfo and a descriptive error
// if the frontend, pool, or backend is not found or the weight is out of range.
func (c *Checker) SetFrontendPoolBackendWeight(frontendName, poolName, backendName string, weight int) (config.Frontend, error) {
if weight < 0 || weight > 100 {
return config.Frontend{}, fmt.Errorf("weight %d out of range [0, 100]", weight)
}
c.mu.Lock()
defer c.mu.Unlock()
fe, ok := c.cfg.Frontends[frontendName]
if !ok {
return config.Frontend{}, fmt.Errorf("frontend %q not found", frontendName)
}
for i, pool := range fe.Pools {
if pool.Name != poolName {
continue
}
pb, ok := pool.Backends[backendName]
if !ok {
return config.Frontend{}, fmt.Errorf("backend %q not found in pool %q", backendName, poolName)
}
pb.Weight = weight
fe.Pools[i].Backends[backendName] = pb
c.cfg.Frontends[frontendName] = fe
slog.Info("frontend-pool-weight", "frontend", frontendName, "pool", poolName, "backend", backendName, "weight", weight)
return fe, nil
}
return config.Frontend{}, fmt.Errorf("pool %q not found in frontend %q", poolName, frontendName)
}
// ListHealthChecks returns the names of all configured health checks, sorted.
func (c *Checker) ListHealthChecks() []string {
c.mu.RLock()
@@ -278,6 +311,59 @@ func (c *Checker) ResumeBackend(name string) (BackendSnapshot, bool) {
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
}
// DisableBackend stops health checking for a backend and removes it from active
// rotation. The worker entry is kept in the map so the backend remains visible
// via GetBackend and can be re-enabled with EnableBackend.
func (c *Checker) DisableBackend(name string) (BackendSnapshot, bool) {
c.mu.Lock()
defer c.mu.Unlock()
w, ok := c.workers[name]
if !ok {
return BackendSnapshot{}, false
}
if !w.entry.Enabled {
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
}
maxHistory := c.cfg.HealthChecker.TransitionHistory
t := w.backend.Remove(maxHistory)
slog.Info("backend-disable", "backend", name)
c.emitForBackend(name, w.backend.Address, t, c.cfg.Frontends)
w.cancel()
w.entry.Enabled = false
if b, ok := c.cfg.Backends[name]; ok {
b.Enabled = false
c.cfg.Backends[name] = b
}
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
}
// EnableBackend re-enables a previously disabled backend. A fresh probe
// goroutine is started and the backend re-enters StateUnknown.
func (c *Checker) EnableBackend(name string) (BackendSnapshot, bool) {
c.mu.Lock()
defer c.mu.Unlock()
w, ok := c.workers[name]
if !ok {
return BackendSnapshot{}, false
}
if w.entry.Enabled {
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
}
entry := w.entry
entry.Enabled = true
if b, ok := c.cfg.Backends[name]; ok {
b.Enabled = true
c.cfg.Backends[name] = b
}
maxHistory := c.cfg.HealthChecker.TransitionHistory
hc := c.cfg.HealthChecks[entry.HealthCheck]
slog.Info("backend-enable", "backend", name)
c.startWorker(c.runCtx, name, entry, hc, 0, 1, maxHistory)
nw := c.workers[name]
c.emitForBackend(name, nw.backend.Address, nw.backend.Transitions[0], c.cfg.Frontends)
return BackendSnapshot{Health: nw.backend, Config: nw.entry}, true
}
// ---- internal --------------------------------------------------------------
// startWorker creates a Backend and launches a probe goroutine.

View File

@@ -246,6 +246,98 @@ func TestSubscribe(t *testing.T) {
}
}
func TestSetFrontendPoolBackendWeight(t *testing.T) {
cfg := makeTestConfig(time.Hour, 3, 2)
c := New(cfg)
// Valid weight change.
fe, err := c.SetFrontendPoolBackendWeight("web", "primary", "be0", 42)
if err != nil {
t.Fatalf("SetFrontendPoolBackendWeight: %v", err)
}
if fe.Pools[0].Backends["be0"].Weight != 42 {
t.Errorf("weight: got %d, want 42", fe.Pools[0].Backends["be0"].Weight)
}
// Persisted in live config.
got, _ := c.GetFrontend("web")
if got.Pools[0].Backends["be0"].Weight != 42 {
t.Errorf("config weight: got %d, want 42", got.Pools[0].Backends["be0"].Weight)
}
// Out-of-range weight.
if _, err := c.SetFrontendPoolBackendWeight("web", "primary", "be0", 101); err == nil {
t.Error("expected error for weight 101")
}
// Unknown frontend.
if _, err := c.SetFrontendPoolBackendWeight("nope", "primary", "be0", 50); err == nil {
t.Error("expected error for unknown frontend")
}
// Unknown pool.
if _, err := c.SetFrontendPoolBackendWeight("web", "nope", "be0", 50); err == nil {
t.Error("expected error for unknown pool")
}
// Unknown backend.
if _, err := c.SetFrontendPoolBackendWeight("web", "primary", "nope", 50); err == nil {
t.Error("expected error for unknown backend in pool")
}
}
func TestEnableDisable(t *testing.T) {
cfg := makeTestConfig(time.Hour, 3, 2)
c := New(cfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go c.fanOut(ctx)
// Seed a worker as EnableBackend/DisableBackend require one in c.workers.
wCtx, wCancel := context.WithCancel(ctx)
c.mu.Lock()
c.runCtx = ctx
c.workers["be0"] = &worker{
backend: health.New("be0", net.ParseIP("10.0.0.2"), 2, 3),
hc: cfg.HealthChecks["icmp"],
entry: cfg.Backends["be0"],
cancel: wCancel,
}
c.mu.Unlock()
_ = wCtx
b, ok := c.DisableBackend("be0")
if !ok {
t.Fatal("DisableBackend: not found")
}
if b.Health.State != health.StateRemoved {
t.Errorf("after disable: state=%s, want removed", b.Health.State)
}
if b.Config.Enabled {
t.Error("after disable: Enabled should be false")
}
// Backend should still be visible after disable.
snap, ok := c.GetBackend("be0")
if !ok {
t.Fatal("GetBackend after disable: not found")
}
if snap.Config.Enabled {
t.Error("GetBackend after disable: Enabled should be false")
}
b, ok = c.EnableBackend("be0")
if !ok {
t.Fatal("EnableBackend: not found")
}
if b.Health.State != health.StateUnknown {
t.Errorf("after enable: state=%s, want unknown", b.Health.State)
}
if !b.Config.Enabled {
t.Error("after enable: Enabled should be true")
}
}
func TestPauseResume(t *testing.T) {
cfg := makeTestConfig(time.Hour, 3, 2)
c := New(cfg)

View File

@@ -156,21 +156,58 @@ type rawFrontend struct {
Pools []rawPool `yaml:"pools"`
}
// ---- Load ------------------------------------------------------------------
// ---- Check / Load ----------------------------------------------------------
// CheckResult holds the outcome of a config file validation. Exactly one of
// ParseError and SemanticError is non-empty when the config is invalid; both
// are empty on success.
type CheckResult struct {
ParseError string // YAML could not be read or parsed
SemanticError string // YAML parsed but semantic validation failed
}
// OK reports whether the config is valid.
func (r CheckResult) OK() bool {
return r.ParseError == "" && r.SemanticError == ""
}
// Check reads and validates the config file at path, returning the parsed
// Config (nil on failure) and a CheckResult that distinguishes YAML parse
// errors from semantic validation errors.
func Check(path string) (*Config, CheckResult) {
data, err := os.ReadFile(path)
if err != nil {
return nil, CheckResult{ParseError: fmt.Sprintf("read %q: %v", path, err)}
}
var raw rawConfig
if err := yaml.Unmarshal(data, &raw); err != nil {
return nil, CheckResult{ParseError: fmt.Sprintf("parse yaml: %v", err)}
}
cfg, err := convert(&raw.Maglev)
if err != nil {
return nil, CheckResult{SemanticError: err.Error()}
}
return cfg, CheckResult{}
}
// Load reads and validates the config file at path.
func Load(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("read config %q: %w", path, err)
cfg, result := Check(path)
if !result.OK() {
if result.ParseError != "" {
return nil, fmt.Errorf("%s", result.ParseError)
}
return parse(data)
return nil, fmt.Errorf("%s", result.SemanticError)
}
return cfg, nil
}
// parse unmarshals raw YAML bytes and converts them into a validated Config.
// Used by tests; production code goes through Check or Load.
func parse(data []byte) (*Config, error) {
var raw rawConfig
if err := yaml.Unmarshal(data, &raw); err != nil {
return nil, fmt.Errorf("parse yaml: %w", err)
return nil, fmt.Errorf("parse yaml: %v", err)
}
return convert(&raw.Maglev)
}

View File

@@ -0,0 +1,148 @@
// Copyright (c) 2026, Pim van Pelt <pim@ipng.ch>
package grpcapi
import (
"context"
"log/slog"
"sync"
)
// logSub is a single WatchEvents subscriber interested in log events.
type logSub struct {
minLevel slog.Level
ch chan *LogEvent
}
// broadcasterState holds the shared subscription registry. It is referenced
// by pointer so that copies returned from WithAttrs/WithGroup share the same set.
type broadcasterState struct {
mu sync.Mutex
nextID int
subs map[int]*logSub
}
func (s *broadcasterState) subscribe(minLevel slog.Level) (<-chan *LogEvent, func()) {
s.mu.Lock()
id := s.nextID
s.nextID++
sub := &logSub{minLevel: minLevel, ch: make(chan *LogEvent, 256)}
s.subs[id] = sub
s.mu.Unlock()
return sub.ch, func() {
s.mu.Lock()
delete(s.subs, id)
close(sub.ch)
s.mu.Unlock()
}
}
// hasSubscriberAt reports whether any subscriber wants records at level or above.
func (s *broadcasterState) hasSubscriberAt(level slog.Level) bool {
s.mu.Lock()
defer s.mu.Unlock()
for _, sub := range s.subs {
if level >= sub.minLevel {
return true
}
}
return false
}
func (s *broadcasterState) fanOut(level slog.Level, ev *LogEvent) {
s.mu.Lock()
for _, sub := range s.subs {
if level >= sub.minLevel {
select {
case sub.ch <- ev:
default:
// slow subscriber — drop rather than block
}
}
}
s.mu.Unlock()
}
// LogBroadcaster implements slog.Handler. It forwards every record to an
// inner handler (e.g. the JSON stdout handler) and simultaneously fans out
// structured LogEvent messages to all gRPC WatchEvents subscribers.
type LogBroadcaster struct {
inner slog.Handler
preAttrs []*LogAttr // pre-resolved attrs from WithAttrs calls
groupPfx string // key prefix accumulated by WithGroup calls
shared *broadcasterState
}
// NewLogBroadcaster wraps inner and returns a LogBroadcaster ready for use
// as the process slog default handler.
func NewLogBroadcaster(inner slog.Handler) *LogBroadcaster {
return &LogBroadcaster{
inner: inner,
shared: &broadcasterState{subs: make(map[int]*logSub)},
}
}
// Subscribe registers a subscriber that receives LogEvents at or above
// minLevel. The returned channel is closed when the cancel func is called.
func (b *LogBroadcaster) Subscribe(minLevel slog.Level) (<-chan *LogEvent, func()) {
return b.shared.subscribe(minLevel)
}
// Enabled implements slog.Handler. It returns true when either the inner
// handler wants the record OR at least one gRPC subscriber has a minLevel at
// or below level. This allows a WatchEvents client requesting debug log events
// to receive them even when maglevd's own -log-level is set higher (e.g. error).
func (b *LogBroadcaster) Enabled(ctx context.Context, level slog.Level) bool {
return b.inner.Enabled(ctx, level) || b.shared.hasSubscriberAt(level)
}
// Handle implements slog.Handler. It forwards the record to the inner handler
// only when the inner handler is enabled for that level (avoiding duplicate or
// unwanted stdout output), then fans it out to all interested gRPC subscribers.
func (b *LogBroadcaster) Handle(ctx context.Context, r slog.Record) error {
var err error
if b.inner.Enabled(ctx, r.Level) {
err = b.inner.Handle(ctx, r)
}
attrs := make([]*LogAttr, 0, len(b.preAttrs)+r.NumAttrs())
attrs = append(attrs, b.preAttrs...)
r.Attrs(func(a slog.Attr) bool {
attrs = append(attrs, &LogAttr{Key: b.groupPfx + a.Key, Value: a.Value.String()})
return true
})
ev := &LogEvent{
AtUnixNs: r.Time.UnixNano(),
Level: r.Level.String(),
Msg: r.Message,
Attrs: attrs,
}
b.shared.fanOut(r.Level, ev)
return err
}
// WithAttrs implements slog.Handler.
func (b *LogBroadcaster) WithAttrs(attrs []slog.Attr) slog.Handler {
pre := make([]*LogAttr, len(b.preAttrs), len(b.preAttrs)+len(attrs))
copy(pre, b.preAttrs)
for _, a := range attrs {
pre = append(pre, &LogAttr{Key: b.groupPfx + a.Key, Value: a.Value.String()})
}
return &LogBroadcaster{
inner: b.inner.WithAttrs(attrs),
preAttrs: pre,
groupPfx: b.groupPfx,
shared: b.shared,
}
}
// WithGroup implements slog.Handler.
func (b *LogBroadcaster) WithGroup(name string) slog.Handler {
return &LogBroadcaster{
inner: b.inner.WithGroup(name),
preAttrs: b.preAttrs,
groupPfx: b.groupPfx + name + ".",
shared: b.shared,
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -25,9 +25,13 @@ const (
Maglev_GetBackend_FullMethodName = "/maglev.Maglev/GetBackend"
Maglev_PauseBackend_FullMethodName = "/maglev.Maglev/PauseBackend"
Maglev_ResumeBackend_FullMethodName = "/maglev.Maglev/ResumeBackend"
Maglev_EnableBackend_FullMethodName = "/maglev.Maglev/EnableBackend"
Maglev_DisableBackend_FullMethodName = "/maglev.Maglev/DisableBackend"
Maglev_ListHealthChecks_FullMethodName = "/maglev.Maglev/ListHealthChecks"
Maglev_GetHealthCheck_FullMethodName = "/maglev.Maglev/GetHealthCheck"
Maglev_WatchBackendEvents_FullMethodName = "/maglev.Maglev/WatchBackendEvents"
Maglev_SetFrontendPoolBackendWeight_FullMethodName = "/maglev.Maglev/SetFrontendPoolBackendWeight"
Maglev_WatchEvents_FullMethodName = "/maglev.Maglev/WatchEvents"
Maglev_CheckConfig_FullMethodName = "/maglev.Maglev/CheckConfig"
)
// MaglevClient is the client API for Maglev service.
@@ -40,11 +44,15 @@ type MaglevClient interface {
GetFrontend(ctx context.Context, in *GetFrontendRequest, opts ...grpc.CallOption) (*FrontendInfo, error)
ListBackends(ctx context.Context, in *ListBackendsRequest, opts ...grpc.CallOption) (*ListBackendsResponse, error)
GetBackend(ctx context.Context, in *GetBackendRequest, opts ...grpc.CallOption) (*BackendInfo, error)
PauseBackend(ctx context.Context, in *PauseResumeRequest, opts ...grpc.CallOption) (*BackendInfo, error)
ResumeBackend(ctx context.Context, in *PauseResumeRequest, opts ...grpc.CallOption) (*BackendInfo, error)
PauseBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error)
ResumeBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error)
EnableBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error)
DisableBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error)
ListHealthChecks(ctx context.Context, in *ListHealthChecksRequest, opts ...grpc.CallOption) (*ListHealthChecksResponse, error)
GetHealthCheck(ctx context.Context, in *GetHealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckInfo, error)
WatchBackendEvents(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[BackendEvent], error)
SetFrontendPoolBackendWeight(ctx context.Context, in *SetWeightRequest, opts ...grpc.CallOption) (*FrontendInfo, error)
WatchEvents(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Event], error)
CheckConfig(ctx context.Context, in *CheckConfigRequest, opts ...grpc.CallOption) (*CheckConfigResponse, error)
}
type maglevClient struct {
@@ -95,7 +103,7 @@ func (c *maglevClient) GetBackend(ctx context.Context, in *GetBackendRequest, op
return out, nil
}
func (c *maglevClient) PauseBackend(ctx context.Context, in *PauseResumeRequest, opts ...grpc.CallOption) (*BackendInfo, error) {
func (c *maglevClient) PauseBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(BackendInfo)
err := c.cc.Invoke(ctx, Maglev_PauseBackend_FullMethodName, in, out, cOpts...)
@@ -105,7 +113,7 @@ func (c *maglevClient) PauseBackend(ctx context.Context, in *PauseResumeRequest,
return out, nil
}
func (c *maglevClient) ResumeBackend(ctx context.Context, in *PauseResumeRequest, opts ...grpc.CallOption) (*BackendInfo, error) {
func (c *maglevClient) ResumeBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(BackendInfo)
err := c.cc.Invoke(ctx, Maglev_ResumeBackend_FullMethodName, in, out, cOpts...)
@@ -115,6 +123,26 @@ func (c *maglevClient) ResumeBackend(ctx context.Context, in *PauseResumeRequest
return out, nil
}
func (c *maglevClient) EnableBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(BackendInfo)
err := c.cc.Invoke(ctx, Maglev_EnableBackend_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *maglevClient) DisableBackend(ctx context.Context, in *BackendRequest, opts ...grpc.CallOption) (*BackendInfo, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(BackendInfo)
err := c.cc.Invoke(ctx, Maglev_DisableBackend_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *maglevClient) ListHealthChecks(ctx context.Context, in *ListHealthChecksRequest, opts ...grpc.CallOption) (*ListHealthChecksResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ListHealthChecksResponse)
@@ -135,13 +163,23 @@ func (c *maglevClient) GetHealthCheck(ctx context.Context, in *GetHealthCheckReq
return out, nil
}
func (c *maglevClient) WatchBackendEvents(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[BackendEvent], error) {
func (c *maglevClient) SetFrontendPoolBackendWeight(ctx context.Context, in *SetWeightRequest, opts ...grpc.CallOption) (*FrontendInfo, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &Maglev_ServiceDesc.Streams[0], Maglev_WatchBackendEvents_FullMethodName, cOpts...)
out := new(FrontendInfo)
err := c.cc.Invoke(ctx, Maglev_SetFrontendPoolBackendWeight_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[WatchRequest, BackendEvent]{ClientStream: stream}
return out, nil
}
func (c *maglevClient) WatchEvents(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Event], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &Maglev_ServiceDesc.Streams[0], Maglev_WatchEvents_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[WatchRequest, Event]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
@@ -152,7 +190,17 @@ func (c *maglevClient) WatchBackendEvents(ctx context.Context, in *WatchRequest,
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type Maglev_WatchBackendEventsClient = grpc.ServerStreamingClient[BackendEvent]
type Maglev_WatchEventsClient = grpc.ServerStreamingClient[Event]
func (c *maglevClient) CheckConfig(ctx context.Context, in *CheckConfigRequest, opts ...grpc.CallOption) (*CheckConfigResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CheckConfigResponse)
err := c.cc.Invoke(ctx, Maglev_CheckConfig_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// MaglevServer is the server API for Maglev service.
// All implementations must embed UnimplementedMaglevServer
@@ -164,11 +212,15 @@ type MaglevServer interface {
GetFrontend(context.Context, *GetFrontendRequest) (*FrontendInfo, error)
ListBackends(context.Context, *ListBackendsRequest) (*ListBackendsResponse, error)
GetBackend(context.Context, *GetBackendRequest) (*BackendInfo, error)
PauseBackend(context.Context, *PauseResumeRequest) (*BackendInfo, error)
ResumeBackend(context.Context, *PauseResumeRequest) (*BackendInfo, error)
PauseBackend(context.Context, *BackendRequest) (*BackendInfo, error)
ResumeBackend(context.Context, *BackendRequest) (*BackendInfo, error)
EnableBackend(context.Context, *BackendRequest) (*BackendInfo, error)
DisableBackend(context.Context, *BackendRequest) (*BackendInfo, error)
ListHealthChecks(context.Context, *ListHealthChecksRequest) (*ListHealthChecksResponse, error)
GetHealthCheck(context.Context, *GetHealthCheckRequest) (*HealthCheckInfo, error)
WatchBackendEvents(*WatchRequest, grpc.ServerStreamingServer[BackendEvent]) error
SetFrontendPoolBackendWeight(context.Context, *SetWeightRequest) (*FrontendInfo, error)
WatchEvents(*WatchRequest, grpc.ServerStreamingServer[Event]) error
CheckConfig(context.Context, *CheckConfigRequest) (*CheckConfigResponse, error)
mustEmbedUnimplementedMaglevServer()
}
@@ -191,20 +243,32 @@ func (UnimplementedMaglevServer) ListBackends(context.Context, *ListBackendsRequ
func (UnimplementedMaglevServer) GetBackend(context.Context, *GetBackendRequest) (*BackendInfo, error) {
return nil, status.Error(codes.Unimplemented, "method GetBackend not implemented")
}
func (UnimplementedMaglevServer) PauseBackend(context.Context, *PauseResumeRequest) (*BackendInfo, error) {
func (UnimplementedMaglevServer) PauseBackend(context.Context, *BackendRequest) (*BackendInfo, error) {
return nil, status.Error(codes.Unimplemented, "method PauseBackend not implemented")
}
func (UnimplementedMaglevServer) ResumeBackend(context.Context, *PauseResumeRequest) (*BackendInfo, error) {
func (UnimplementedMaglevServer) ResumeBackend(context.Context, *BackendRequest) (*BackendInfo, error) {
return nil, status.Error(codes.Unimplemented, "method ResumeBackend not implemented")
}
func (UnimplementedMaglevServer) EnableBackend(context.Context, *BackendRequest) (*BackendInfo, error) {
return nil, status.Error(codes.Unimplemented, "method EnableBackend not implemented")
}
func (UnimplementedMaglevServer) DisableBackend(context.Context, *BackendRequest) (*BackendInfo, error) {
return nil, status.Error(codes.Unimplemented, "method DisableBackend not implemented")
}
func (UnimplementedMaglevServer) ListHealthChecks(context.Context, *ListHealthChecksRequest) (*ListHealthChecksResponse, error) {
return nil, status.Error(codes.Unimplemented, "method ListHealthChecks not implemented")
}
func (UnimplementedMaglevServer) GetHealthCheck(context.Context, *GetHealthCheckRequest) (*HealthCheckInfo, error) {
return nil, status.Error(codes.Unimplemented, "method GetHealthCheck not implemented")
}
func (UnimplementedMaglevServer) WatchBackendEvents(*WatchRequest, grpc.ServerStreamingServer[BackendEvent]) error {
return status.Error(codes.Unimplemented, "method WatchBackendEvents not implemented")
func (UnimplementedMaglevServer) SetFrontendPoolBackendWeight(context.Context, *SetWeightRequest) (*FrontendInfo, error) {
return nil, status.Error(codes.Unimplemented, "method SetFrontendPoolBackendWeight not implemented")
}
func (UnimplementedMaglevServer) WatchEvents(*WatchRequest, grpc.ServerStreamingServer[Event]) error {
return status.Error(codes.Unimplemented, "method WatchEvents not implemented")
}
func (UnimplementedMaglevServer) CheckConfig(context.Context, *CheckConfigRequest) (*CheckConfigResponse, error) {
return nil, status.Error(codes.Unimplemented, "method CheckConfig not implemented")
}
func (UnimplementedMaglevServer) mustEmbedUnimplementedMaglevServer() {}
func (UnimplementedMaglevServer) testEmbeddedByValue() {}
@@ -300,7 +364,7 @@ func _Maglev_GetBackend_Handler(srv interface{}, ctx context.Context, dec func(i
}
func _Maglev_PauseBackend_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PauseResumeRequest)
in := new(BackendRequest)
if err := dec(in); err != nil {
return nil, err
}
@@ -312,13 +376,13 @@ func _Maglev_PauseBackend_Handler(srv interface{}, ctx context.Context, dec func
FullMethod: Maglev_PauseBackend_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MaglevServer).PauseBackend(ctx, req.(*PauseResumeRequest))
return srv.(MaglevServer).PauseBackend(ctx, req.(*BackendRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Maglev_ResumeBackend_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PauseResumeRequest)
in := new(BackendRequest)
if err := dec(in); err != nil {
return nil, err
}
@@ -330,7 +394,43 @@ func _Maglev_ResumeBackend_Handler(srv interface{}, ctx context.Context, dec fun
FullMethod: Maglev_ResumeBackend_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MaglevServer).ResumeBackend(ctx, req.(*PauseResumeRequest))
return srv.(MaglevServer).ResumeBackend(ctx, req.(*BackendRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Maglev_EnableBackend_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BackendRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MaglevServer).EnableBackend(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Maglev_EnableBackend_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MaglevServer).EnableBackend(ctx, req.(*BackendRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Maglev_DisableBackend_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BackendRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MaglevServer).DisableBackend(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Maglev_DisableBackend_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MaglevServer).DisableBackend(ctx, req.(*BackendRequest))
}
return interceptor(ctx, in, info, handler)
}
@@ -371,16 +471,52 @@ func _Maglev_GetHealthCheck_Handler(srv interface{}, ctx context.Context, dec fu
return interceptor(ctx, in, info, handler)
}
func _Maglev_WatchBackendEvents_Handler(srv interface{}, stream grpc.ServerStream) error {
func _Maglev_SetFrontendPoolBackendWeight_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SetWeightRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MaglevServer).SetFrontendPoolBackendWeight(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Maglev_SetFrontendPoolBackendWeight_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MaglevServer).SetFrontendPoolBackendWeight(ctx, req.(*SetWeightRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Maglev_WatchEvents_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(WatchRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(MaglevServer).WatchBackendEvents(m, &grpc.GenericServerStream[WatchRequest, BackendEvent]{ServerStream: stream})
return srv.(MaglevServer).WatchEvents(m, &grpc.GenericServerStream[WatchRequest, Event]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type Maglev_WatchBackendEventsServer = grpc.ServerStreamingServer[BackendEvent]
type Maglev_WatchEventsServer = grpc.ServerStreamingServer[Event]
func _Maglev_CheckConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CheckConfigRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MaglevServer).CheckConfig(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Maglev_CheckConfig_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MaglevServer).CheckConfig(ctx, req.(*CheckConfigRequest))
}
return interceptor(ctx, in, info, handler)
}
// Maglev_ServiceDesc is the grpc.ServiceDesc for Maglev service.
// It's only intended for direct use with grpc.RegisterService,
@@ -413,6 +549,14 @@ var Maglev_ServiceDesc = grpc.ServiceDesc{
MethodName: "ResumeBackend",
Handler: _Maglev_ResumeBackend_Handler,
},
{
MethodName: "EnableBackend",
Handler: _Maglev_EnableBackend_Handler,
},
{
MethodName: "DisableBackend",
Handler: _Maglev_DisableBackend_Handler,
},
{
MethodName: "ListHealthChecks",
Handler: _Maglev_ListHealthChecks_Handler,
@@ -421,11 +565,19 @@ var Maglev_ServiceDesc = grpc.ServiceDesc{
MethodName: "GetHealthCheck",
Handler: _Maglev_GetHealthCheck_Handler,
},
{
MethodName: "SetFrontendPoolBackendWeight",
Handler: _Maglev_SetFrontendPoolBackendWeight_Handler,
},
{
MethodName: "CheckConfig",
Handler: _Maglev_CheckConfig_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "WatchBackendEvents",
Handler: _Maglev_WatchBackendEvents_Handler,
StreamName: "WatchEvents",
Handler: _Maglev_WatchEvents_Handler,
ServerStreams: true,
},
},

View File

@@ -4,6 +4,7 @@ package grpcapi
import (
"context"
"log/slog"
"net"
"google.golang.org/grpc/codes"
@@ -19,13 +20,18 @@ type Server struct {
UnimplementedMaglevServer
ctx context.Context
checker *checker.Checker
logs *LogBroadcaster
configPath string
}
// NewServer creates a Server backed by the given Checker. The provided context
// controls the lifetime of streaming RPCs: cancelling it closes all active
// WatchBackendEvents streams so that grpc.Server.GracefulStop can complete.
func NewServer(ctx context.Context, c *checker.Checker) *Server {
return &Server{ctx: ctx, checker: c}
// NewServer creates a Server backed by the given Checker. logs may be nil, in
// which case log events are never sent to WatchEvents streams. configPath is
// used by CheckConfig to reload and validate the configuration file on demand.
// The provided context controls the lifetime of streaming RPCs: cancelling it
// closes all active WatchEvents streams so that grpc.Server.GracefulStop can
// complete.
func NewServer(ctx context.Context, c *checker.Checker, logs *LogBroadcaster, configPath string) *Server {
return &Server{ctx: ctx, checker: c, logs: logs, configPath: configPath}
}
// ListFrontends returns the names of all configured frontends.
@@ -57,7 +63,7 @@ func (s *Server) GetBackend(_ context.Context, req *GetBackendRequest) (*Backend
}
// PauseBackend pauses health checking for a backend by name.
func (s *Server) PauseBackend(_ context.Context, req *PauseResumeRequest) (*BackendInfo, error) {
func (s *Server) PauseBackend(_ context.Context, req *BackendRequest) (*BackendInfo, error) {
b, ok := s.checker.PauseBackend(req.Name)
if !ok {
return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name)
@@ -66,7 +72,7 @@ func (s *Server) PauseBackend(_ context.Context, req *PauseResumeRequest) (*Back
}
// ResumeBackend resumes health checking for a backend by name.
func (s *Server) ResumeBackend(_ context.Context, req *PauseResumeRequest) (*BackendInfo, error) {
func (s *Server) ResumeBackend(_ context.Context, req *BackendRequest) (*BackendInfo, error) {
b, ok := s.checker.ResumeBackend(req.Name)
if !ok {
return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name)
@@ -74,6 +80,36 @@ func (s *Server) ResumeBackend(_ context.Context, req *PauseResumeRequest) (*Bac
return backendToProto(b), nil
}
// EnableBackend re-enables a previously disabled backend.
func (s *Server) EnableBackend(_ context.Context, req *BackendRequest) (*BackendInfo, error) {
b, ok := s.checker.EnableBackend(req.Name)
if !ok {
return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name)
}
return backendToProto(b), nil
}
// DisableBackend disables a backend, stopping its probe goroutine.
func (s *Server) DisableBackend(_ context.Context, req *BackendRequest) (*BackendInfo, error) {
b, ok := s.checker.DisableBackend(req.Name)
if !ok {
return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name)
}
return backendToProto(b), nil
}
// SetFrontendPoolBackendWeight updates the weight of a backend in a pool.
func (s *Server) SetFrontendPoolBackendWeight(_ context.Context, req *SetWeightRequest) (*FrontendInfo, error) {
if req.Weight < 0 || req.Weight > 100 {
return nil, status.Errorf(codes.InvalidArgument, "weight %d out of range [0, 100]", req.Weight)
}
fe, err := s.checker.SetFrontendPoolBackendWeight(req.Frontend, req.Pool, req.Backend, int(req.Weight))
if err != nil {
return nil, status.Errorf(codes.NotFound, "%v", err)
}
return frontendToProto(req.Frontend, fe), nil
}
// ListHealthChecks returns the names of all configured health checks.
func (s *Server) ListHealthChecks(_ context.Context, _ *ListHealthChecksRequest) (*ListHealthChecksResponse, error) {
return &ListHealthChecksResponse{Names: s.checker.ListHealthChecks()}, nil
@@ -88,30 +124,55 @@ func (s *Server) GetHealthCheck(_ context.Context, req *GetHealthCheckRequest) (
return healthCheckToProto(req.Name, hc), nil
}
// WatchBackendEvents streams the current state of all backends on connect, then
// streams live state transitions until the client disconnects.
func (s *Server) WatchBackendEvents(_ *WatchRequest, stream Maglev_WatchBackendEventsServer) error {
// Send current state of all backends as synthetic events.
// WatchEvents streams events to the client. On connect, the current state of
// all backends is sent as synthetic BackendEvents. Afterwards, live events are
// forwarded based on the filter flags in req. An unset (nil) flag defaults to
// true (subscribe). An empty log_level defaults to "info".
func (s *Server) WatchEvents(req *WatchRequest, stream Maglev_WatchEventsServer) error {
wantLog := req.Log == nil || *req.Log
wantBackend := req.Backend == nil || *req.Backend
wantFrontend := req.Frontend == nil || *req.Frontend
_ = wantFrontend // no frontend events emitted yet
logLevel := slog.LevelInfo
if req.LogLevel != "" {
if err := logLevel.UnmarshalText([]byte(req.LogLevel)); err != nil {
return status.Errorf(codes.InvalidArgument, "invalid log_level %q: must be debug, info, warn, or error", req.LogLevel)
}
}
// Subscribe to log events (nil channel blocks forever when not wanted).
var logCh <-chan *LogEvent
if wantLog && s.logs != nil {
var unsub func()
logCh, unsub = s.logs.Subscribe(logLevel)
defer unsub()
}
// Subscribe to backend events; send initial state snapshot first.
var backendCh <-chan checker.Event
if wantBackend {
for _, name := range s.checker.ListBackends() {
snap, ok := s.checker.GetBackend(name)
if !ok {
continue
}
ev := &BackendEvent{
ev := &Event{Event: &Event_Backend{Backend: &BackendEvent{
BackendName: name,
Transition: &TransitionRecord{
From: snap.Health.State.String(),
To: snap.Health.State.String(),
AtUnixNs: 0,
},
}
}}}
if err := stream.Send(ev); err != nil {
return err
}
}
ch, unsub := s.checker.Subscribe()
var unsub func()
backendCh, unsub = s.checker.Subscribe()
defer unsub()
}
for {
select {
@@ -119,21 +180,38 @@ func (s *Server) WatchBackendEvents(_ *WatchRequest, stream Maglev_WatchBackendE
return status.Error(codes.Unavailable, "server shutting down")
case <-stream.Context().Done():
return nil
case e, ok := <-ch:
case le, ok := <-logCh:
if !ok {
return nil
}
ev := &BackendEvent{
if err := stream.Send(&Event{Event: &Event_Log{Log: le}}); err != nil {
return err
}
case e, ok := <-backendCh:
if !ok {
return nil
}
if err := stream.Send(&Event{Event: &Event_Backend{Backend: &BackendEvent{
BackendName: e.BackendName,
Transition: transitionToProto(e.Transition),
}
if err := stream.Send(ev); err != nil {
}}}); err != nil {
return err
}
}
}
}
// CheckConfig reads and validates the configuration file, returning a
// structured result that distinguishes YAML parse errors from semantic errors.
func (s *Server) CheckConfig(_ context.Context, _ *CheckConfigRequest) (*CheckConfigResponse, error) {
_, result := config.Check(s.configPath)
return &CheckConfigResponse{
Ok: result.OK(),
ParseError: result.ParseError,
SemanticError: result.SemanticError,
}, nil
}
// ---- conversion helpers ----------------------------------------------------
func frontendToProto(name string, fe config.Frontend) *FrontendInfo {

View File

@@ -62,7 +62,7 @@ func startTestServer(t *testing.T, ctx context.Context, c *checker.Checker) (Mag
t.Fatalf("listen: %v", err)
}
srv := grpc.NewServer()
RegisterMaglevServer(srv, NewServer(ctx, c))
RegisterMaglevServer(srv, NewServer(ctx, c, nil, ""))
go srv.Serve(lis) //nolint:errcheck
conn, err := grpc.NewClient(lis.Addr().String(),
@@ -198,7 +198,7 @@ func TestPauseResumeBackend(t *testing.T) {
client, cleanup := startTestServer(t, ctx, c)
defer cleanup()
info, err := client.PauseBackend(ctx, &PauseResumeRequest{Name: "be0"})
info, err := client.PauseBackend(ctx, &BackendRequest{Name: "be0"})
if err != nil {
t.Fatalf("PauseBackend: %v", err)
}
@@ -206,7 +206,7 @@ func TestPauseResumeBackend(t *testing.T) {
t.Errorf("after pause: got %q, want paused", info.State)
}
info, err = client.ResumeBackend(ctx, &PauseResumeRequest{Name: "be0"})
info, err = client.ResumeBackend(ctx, &BackendRequest{Name: "be0"})
if err != nil {
t.Fatalf("ResumeBackend: %v", err)
}
@@ -215,6 +215,78 @@ func TestPauseResumeBackend(t *testing.T) {
}
}
func TestSetFrontendPoolBackendWeight(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := makeTestChecker(ctx)
client, cleanup := startTestServer(t, ctx, c)
defer cleanup()
info, err := client.SetFrontendPoolBackendWeight(ctx, &SetWeightRequest{
Frontend: "web",
Pool: "primary",
Backend: "be0",
Weight: 42,
})
if err != nil {
t.Fatalf("SetFrontendPoolBackendWeight: %v", err)
}
if len(info.Pools) == 0 || len(info.Pools[0].Backends) == 0 {
t.Fatal("response missing pools/backends")
}
if info.Pools[0].Backends[0].Weight != 42 {
t.Errorf("weight: got %d, want 42", info.Pools[0].Backends[0].Weight)
}
// Invalid weight.
_, err = client.SetFrontendPoolBackendWeight(ctx, &SetWeightRequest{
Frontend: "web", Pool: "primary", Backend: "be0", Weight: 101,
})
if err == nil {
t.Error("expected error for weight 101")
}
// Unknown frontend.
_, err = client.SetFrontendPoolBackendWeight(ctx, &SetWeightRequest{
Frontend: "nope", Pool: "primary", Backend: "be0", Weight: 50,
})
if err == nil {
t.Error("expected error for unknown frontend")
}
}
func TestEnableDisableBackend(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := makeTestChecker(ctx)
client, cleanup := startTestServer(t, ctx, c)
defer cleanup()
info, err := client.DisableBackend(ctx, &BackendRequest{Name: "be0"})
if err != nil {
t.Fatalf("DisableBackend: %v", err)
}
if info.State != "removed" {
t.Errorf("after disable: got %q, want removed", info.State)
}
if info.Enabled {
t.Error("after disable: Enabled should be false")
}
info, err = client.EnableBackend(ctx, &BackendRequest{Name: "be0"})
if err != nil {
t.Fatalf("EnableBackend: %v", err)
}
if info.State != "unknown" {
t.Errorf("after enable: got %q, want unknown", info.State)
}
if !info.Enabled {
t.Error("after enable: Enabled should be true")
}
}
func TestListHealthChecks(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -266,7 +338,7 @@ func TestGetHealthCheckNotFound(t *testing.T) {
}
}
func TestWatchBackendEventsServerShutdown(t *testing.T) {
func TestWatchEventsServerShutdown(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -277,11 +349,11 @@ func TestWatchBackendEventsServerShutdown(t *testing.T) {
client, cleanup := startTestServer(t, srvCtx, c)
defer cleanup()
stream, err := client.WatchBackendEvents(ctx, &WatchRequest{})
stream, err := client.WatchEvents(ctx, &WatchRequest{})
if err != nil {
t.Fatalf("WatchBackendEvents: %v", err)
t.Fatalf("WatchEvents: %v", err)
}
// Drain the initial synthetic event.
// Drain the initial synthetic backend event.
if _, err := stream.Recv(); err != nil {
t.Fatalf("initial Recv: %v", err)
}
@@ -294,7 +366,7 @@ func TestWatchBackendEventsServerShutdown(t *testing.T) {
}
}
func TestWatchBackendEvents(t *testing.T) {
func TestWatchEventsBackend(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -302,17 +374,72 @@ func TestWatchBackendEvents(t *testing.T) {
client, cleanup := startTestServer(t, ctx, c)
defer cleanup()
stream, err := client.WatchBackendEvents(ctx, &WatchRequest{})
stream, err := client.WatchEvents(ctx, &WatchRequest{})
if err != nil {
t.Fatalf("WatchBackendEvents: %v", err)
t.Fatalf("WatchEvents: %v", err)
}
// Should receive the current state for be0 immediately.
// Should receive the current state for be0 immediately as a BackendEvent.
ev, err := stream.Recv()
if err != nil {
t.Fatalf("Recv: %v", err)
}
if ev.BackendName != "be0" {
t.Errorf("initial event: backend=%q, want be0", ev.BackendName)
be, ok := ev.Event.(*Event_Backend)
if !ok {
t.Fatalf("expected BackendEvent, got %T", ev.Event)
}
if be.Backend.BackendName != "be0" {
t.Errorf("initial event: backend=%q, want be0", be.Backend.BackendName)
}
}
func TestWatchEventsLogOnly(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := makeTestChecker(ctx)
client, cleanup := startTestServer(t, ctx, c)
defer cleanup()
f := false
stream, err := client.WatchEvents(ctx, &WatchRequest{Backend: &f, Frontend: &f})
if err != nil {
t.Fatalf("WatchEvents: %v", err)
}
// No initial snapshot should arrive (backend disabled). Verify by checking
// that the stream has no immediately-readable event.
recvCh := make(chan *Event, 1)
go func() {
ev, _ := stream.Recv()
recvCh <- ev
}()
select {
case ev := <-recvCh:
if _, isLog := ev.Event.(*Event_Log); !isLog {
t.Errorf("expected only LogEvents, got %T", ev.Event)
}
case <-time.After(50 * time.Millisecond):
// expected: no backend snapshot arrived
}
}
func TestWatchEventsInvalidLogLevel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := makeTestChecker(ctx)
client, cleanup := startTestServer(t, ctx, c)
defer cleanup()
// For streaming RPCs the server error arrives on the first Recv, not on the
// initial call.
stream, err := client.WatchEvents(ctx, &WatchRequest{LogLevel: "verbose"})
if err != nil {
t.Fatalf("WatchEvents: %v", err)
}
_, err = stream.Recv()
if err == nil {
t.Fatal("expected error for invalid log_level")
}
}

View File

@@ -10,11 +10,15 @@ service Maglev {
rpc GetFrontend(GetFrontendRequest) returns (FrontendInfo);
rpc ListBackends(ListBackendsRequest) returns (ListBackendsResponse);
rpc GetBackend(GetBackendRequest) returns (BackendInfo);
rpc PauseBackend(PauseResumeRequest) returns (BackendInfo);
rpc ResumeBackend(PauseResumeRequest) returns (BackendInfo);
rpc PauseBackend(BackendRequest) returns (BackendInfo);
rpc ResumeBackend(BackendRequest) returns (BackendInfo);
rpc EnableBackend(BackendRequest) returns (BackendInfo);
rpc DisableBackend(BackendRequest) returns (BackendInfo);
rpc ListHealthChecks(ListHealthChecksRequest) returns (ListHealthChecksResponse);
rpc GetHealthCheck(GetHealthCheckRequest) returns (HealthCheckInfo);
rpc WatchBackendEvents(WatchRequest) returns (stream BackendEvent);
rpc SetFrontendPoolBackendWeight(SetWeightRequest) returns (FrontendInfo);
rpc WatchEvents(WatchRequest) returns (stream Event);
rpc CheckConfig(CheckConfigRequest) returns (CheckConfigResponse);
}
// ---- requests ---------------------------------------------------------------
@@ -31,7 +35,7 @@ message GetBackendRequest {
string name = 1;
}
message PauseResumeRequest {
message BackendRequest {
string name = 1;
}
@@ -41,7 +45,29 @@ message GetHealthCheckRequest {
string name = 1;
}
message WatchRequest {}
message CheckConfigRequest {}
message CheckConfigResponse {
bool ok = 1;
string parse_error = 2; // set when YAML cannot be read or parsed
string semantic_error = 3; // set when YAML is valid but semantically incorrect
}
message SetWeightRequest {
string frontend = 1;
string pool = 2;
string backend = 3;
int32 weight = 4; // 0-100
}
// WatchRequest controls which event types are streamed. All fields default to
// true (i.e. an empty request subscribes to everything at info level).
message WatchRequest {
optional bool log = 1; // include log events (default: true)
string log_level = 2; // minimum log level: debug|info|warn|error (default: info)
optional bool backend = 3; // include backend transition events (default: true)
optional bool frontend = 4; // include frontend events (default: true)
}
// ---- responses --------------------------------------------------------------
@@ -123,7 +149,36 @@ message TransitionRecord {
int64 at_unix_ns = 3;
}
// ---- event stream -----------------------------------------------------------
// LogAttr is a single key/value attribute from a structured log record.
message LogAttr {
string key = 1;
string value = 2;
}
// LogEvent carries a single structured log record.
message LogEvent {
int64 at_unix_ns = 1;
string level = 2;
string msg = 3;
repeated LogAttr attrs = 4;
}
// BackendEvent is emitted on every backend state transition.
message BackendEvent {
string backend_name = 1;
TransitionRecord transition = 2;
}
// FrontendEvent is reserved for future frontend-level events.
message FrontendEvent {}
// Event is the envelope returned by WatchEvents.
message Event {
oneof event {
LogEvent log = 1;
BackendEvent backend = 2;
FrontendEvent frontend = 3;
}
}