commit b84b3274b178ec1af0244fd7807efe3bcc6a0748 Author: Pim van Pelt Date: Fri Apr 10 17:30:44 2026 +0200 Initial revisin of healthchecker, inspired by HAProxy diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ec6697d --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +bin/ +/*.yaml +docs/implementation/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..a6ce2f2 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,27 @@ +FROM golang:1.25 AS builder + +WORKDIR /src +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . +RUN make build + +# ---- runtime image ---------------------------------------------------------- +FROM debian:bookworm-slim + +RUN apt-get update && apt-get install -y --no-install-recommends \ + iproute2 \ + && rm -rf /var/lib/apt/lists/* + +COPY --from=builder /src/bin/healthchecker /usr/local/bin/healthchecker + +# Required capabilities: +# CAP_NET_ADMIN — create/delete GRE tunnel interfaces via netlink +# CAP_NET_RAW — open raw ICMP sockets for health probing +# +# Grant these in your container runtime, e.g.: +# docker run --cap-add NET_ADMIN --cap-add NET_RAW ... +# or in Kubernetes via securityContext.capabilities.add + +ENTRYPOINT ["/usr/local/bin/healthchecker"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..8cad6b9 --- /dev/null +++ b/Makefile @@ -0,0 +1,30 @@ +BINARY := healthchecker +MODULE := git.ipng.ch/ipng/vpp-maglev +PROTO_DIR := proto +PROTO_FILE := $(PROTO_DIR)/healthchecker.proto +GEN_FILES := internal/grpcapi/healthchecker.pb.go internal/grpcapi/healthchecker_grpc.pb.go + +.PHONY: all build test proto lint clean + +all: build + +build: $(GEN_FILES) + go build -o bin/$(BINARY) ./cmd/$(BINARY)/ + +test: $(GEN_FILES) + go test ./... + +proto: $(GEN_FILES) + +$(GEN_FILES): $(PROTO_FILE) + protoc \ + --go_out=. --go_opt=module=$(MODULE) \ + --go-grpc_out=. --go-grpc_opt=module=$(MODULE) \ + $(PROTO_FILE) + +lint: + golangci-lint run ./... + +clean: + rm -f bin/$(BINARY) + rm -f $(GEN_FILES) diff --git a/cmd/healthchecker/main.go b/cmd/healthchecker/main.go new file mode 100644 index 0000000..fa211cf --- /dev/null +++ b/cmd/healthchecker/main.go @@ -0,0 +1,111 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log/slog" + "net" + "os" + "os/signal" + "syscall" + + "google.golang.org/grpc" + + "git.ipng.ch/ipng/vpp-maglev/internal/checker" + "git.ipng.ch/ipng/vpp-maglev/internal/config" + "git.ipng.ch/ipng/vpp-maglev/internal/grpcapi" +) + +func main() { + if err := run(); err != nil { + slog.Error("startup-fatal", "err", err) + os.Exit(1) + } +} + +func run() error { + // ---- flags / env -------------------------------------------------------- + configPath := stringFlag("config", "/etc/maglev/frontend.yaml", "MAGLEV_CONFIG", "path to frontend.yaml") + grpcAddr := stringFlag("grpc-addr", ":9090", "MAGLEV_GRPC_ADDR", "gRPC listen address") + logLevel := stringFlag("log-level", "info", "MAGLEV_LOG_LEVEL", "log level (debug|info|warn|error)") + flag.Parse() + + // ---- logging ------------------------------------------------------------ + var level slog.Level + if err := level.UnmarshalText([]byte(*logLevel)); err != nil { + return fmt.Errorf("invalid log level %q: %w", *logLevel, err) + } + slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: level}))) + + // ---- config ------------------------------------------------------------- + cfg, err := config.Load(*configPath) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + slog.Info("config-loaded", "path", *configPath, "vips", len(cfg.VIPs)) + + // ---- checker ------------------------------------------------------------ + chkr := checker.New(cfg) + + ctx, rootCancel := context.WithCancel(context.Background()) + defer rootCancel() + + go func() { + if err := chkr.Run(ctx); err != nil { + slog.Error("checker-exited", "err", err) + } + }() + + // ---- gRPC server -------------------------------------------------------- + lis, err := net.Listen("tcp", *grpcAddr) + if err != nil { + return fmt.Errorf("listen %s: %w", *grpcAddr, err) + } + srv := grpc.NewServer() + grpcapi.RegisterHealthCheckerServer(srv, grpcapi.NewServer(chkr)) + slog.Info("grpc-listening", "addr", *grpcAddr) + + go func() { + if err := srv.Serve(lis); err != nil { + slog.Error("grpc-serve-error", "err", err) + } + }() + + // ---- signal handling ---------------------------------------------------- + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT) + + for sig := range sigCh { + switch sig { + case syscall.SIGHUP: + slog.Info("config-reload-start") + newCfg, err := config.Load(*configPath) + if err != nil { + slog.Error("config-reload-error", "err", err) + continue + } + if err := chkr.Reload(ctx, newCfg); err != nil { + slog.Error("checker-reload-error", "err", err) + continue + } + slog.Info("config-reload-done", "vips", len(newCfg.VIPs)) + + case syscall.SIGTERM, syscall.SIGINT: + slog.Info("shutdown", "signal", sig) + rootCancel() + srv.GracefulStop() + return nil + } + } + return nil +} + +// stringFlag declares a flag that falls back to an environment variable. +func stringFlag(name, defaultVal, envKey, usage string) *string { + val := defaultVal + if v := os.Getenv(envKey); v != "" { + val = v + } + return flag.String(name, val, fmt.Sprintf("%s (env: %s)", usage, envKey)) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..1dcaa71 --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module git.ipng.ch/ipng/vpp-maglev + +go 1.25.0 + +require ( + github.com/vishvananda/netns v0.0.5 + golang.org/x/net v0.52.0 + google.golang.org/grpc v1.80.0 + google.golang.org/protobuf v1.36.11 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + golang.org/x/sys v0.42.0 // indirect + golang.org/x/text v0.35.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..2516efb --- /dev/null +++ b/go.sum @@ -0,0 +1,44 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zdEY= +github.com/vishvananda/netns v0.0.5/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= +go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= +go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= +go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= +go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= +go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= +go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= +go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= +go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= +go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= +golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= +golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= +gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= +gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 h1:sNrWoksmOyF5bvJUcnmbeAmQi8baNhqg5IWaI3llQqU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= +google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= +google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/checker/checker.go b/internal/checker/checker.go new file mode 100644 index 0000000..747f7a3 --- /dev/null +++ b/internal/checker/checker.go @@ -0,0 +1,437 @@ +package checker + +import ( + "context" + "log/slog" + "net" + "sync" + "time" + + "git.ipng.ch/ipng/vpp-maglev/internal/config" + "git.ipng.ch/ipng/vpp-maglev/internal/health" + "git.ipng.ch/ipng/vpp-maglev/internal/prober" +) + +// Event is emitted on every backend state transition. +type Event struct { + VIPName string + Backend net.IP + Transition health.Transition +} + +type backendKey struct { + VIPName string + Backend string // net.IP.String() +} + +type worker struct { + backend *health.Backend + hc config.HealthCheck + vip config.VIP + cancel context.CancelFunc +} + +// Checker orchestrates health probing for all VIP:backend tuples. +type Checker struct { + cfg *config.Frontend + mu sync.RWMutex + workers map[backendKey]*worker + + subsMu sync.Mutex + nextID int + subs map[int]chan Event + eventCh chan Event +} + +// New creates a Checker. Call Run to start probing. +func New(cfg *config.Frontend) *Checker { + return &Checker{ + cfg: cfg, + workers: make(map[backendKey]*worker), + subs: make(map[int]chan Event), + eventCh: make(chan Event, 256), + } +} + +// Run starts all probe goroutines and blocks until ctx is cancelled. +func (c *Checker) Run(ctx context.Context) error { + go c.fanOut(ctx) + + c.mu.Lock() + total := totalBackends(c.cfg) + pos := 0 + for vipName, vip := range c.cfg.VIPs { + for _, backend := range vip.Backends { + c.startWorker(ctx, vipName, vip, backend, pos, total) + pos++ + } + } + c.mu.Unlock() + + <-ctx.Done() + return nil +} + +// Reload applies a new config without restarting the process. +// New tuples are added, removed tuples are stopped, changed tuples are restarted. +// Existing tuples with unchanged healthcheck config continue uninterrupted. +func (c *Checker) Reload(ctx context.Context, cfg *config.Frontend) error { + c.mu.Lock() + defer c.mu.Unlock() + + type desired struct { + vipName string + vip config.VIP + backend net.IP + } + desiredMap := map[backendKey]desired{} + for vipName, vip := range cfg.VIPs { + for _, backend := range vip.Backends { + key := backendKey{VIPName: vipName, Backend: backend.String()} + desiredMap[key] = desired{vipName: vipName, vip: vip, backend: backend} + } + } + + // Stop workers no longer in config. + for key, w := range c.workers { + if _, ok := desiredMap[key]; !ok { + slog.Info("backend-stop", "vip", key.VIPName, "backend", key.Backend) + w.cancel() + delete(c.workers, key) + } + } + + // Add new or restart changed workers. + total := len(desiredMap) + pos := 0 + for key, d := range desiredMap { + if w, ok := c.workers[key]; ok { + if healthCheckEqual(w.hc, d.vip.HealthCheck) { + pos++ + continue + } + slog.Info("backend-restart", "vip", key.VIPName, "backend", key.Backend) + w.cancel() + w.hc = d.vip.HealthCheck + w.vip = d.vip + wCtx, cancel := context.WithCancel(ctx) + w.cancel = cancel + go c.runProbe(wCtx, key, 0, 1) // no stagger on reload + } else { + slog.Info("backend-start", "vip", d.vipName, "backend", d.backend) + c.startWorker(ctx, d.vipName, d.vip, d.backend, pos, total) + } + pos++ + } + + c.cfg = cfg + return nil +} + +// Subscribe returns a channel that receives Events for every state transition. +// Call the returned cancel function to unsubscribe. +func (c *Checker) Subscribe() (<-chan Event, func()) { + c.subsMu.Lock() + defer c.subsMu.Unlock() + id := c.nextID + c.nextID++ + ch := make(chan Event, 64) + c.subs[id] = ch + return ch, func() { + c.subsMu.Lock() + defer c.subsMu.Unlock() + delete(c.subs, id) + close(ch) + } +} + +// ListVIPs returns the names of all configured VIPs. +func (c *Checker) ListVIPs() []string { + c.mu.RLock() + defer c.mu.RUnlock() + names := make([]string, 0, len(c.cfg.VIPs)) + for name := range c.cfg.VIPs { + names = append(names, name) + } + return names +} + +// GetVIP returns the VIP config for the given name. +func (c *Checker) GetVIP(name string) (config.VIP, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + v, ok := c.cfg.VIPs[name] + return v, ok +} + +// ListBackends returns the backend states for all backends of a VIP. +func (c *Checker) ListBackends(vipName string) []*health.Backend { + c.mu.RLock() + defer c.mu.RUnlock() + var out []*health.Backend + for key, w := range c.workers { + if key.VIPName == vipName { + out = append(out, w.backend) + } + } + return out +} + +// GetBackend returns the backend state for a specific VIP:backend tuple. +func (c *Checker) GetBackend(vipName, backendAddr string) (*health.Backend, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + key := backendKey{VIPName: vipName, Backend: backendAddr} + w, ok := c.workers[key] + if !ok { + return nil, false + } + return w.backend, true +} + +// PauseBackend pauses health checking for a specific backend. +func (c *Checker) PauseBackend(vipName, backendAddr string) (*health.Backend, bool) { + c.mu.Lock() + defer c.mu.Unlock() + key := backendKey{VIPName: vipName, Backend: backendAddr} + w, ok := c.workers[key] + if !ok { + return nil, false + } + maxHistory := c.cfg.HealthChecker.TransitionHistory + if w.backend.Pause(maxHistory) { + slog.Info("backend-pause", "vip", vipName, "backend", backendAddr) + c.emit(Event{VIPName: vipName, Backend: w.backend.Address, Transition: w.backend.Transitions[0]}) + } + return w.backend, true +} + +// ResumeBackend resumes health checking for a specific backend. +func (c *Checker) ResumeBackend(vipName, backendAddr string) (*health.Backend, bool) { + c.mu.Lock() + defer c.mu.Unlock() + key := backendKey{VIPName: vipName, Backend: backendAddr} + w, ok := c.workers[key] + if !ok { + return nil, false + } + maxHistory := c.cfg.HealthChecker.TransitionHistory + if w.backend.Resume(maxHistory) { + slog.Info("backend-resume", "vip", vipName, "backend", backendAddr) + c.emit(Event{VIPName: vipName, Backend: w.backend.Address, Transition: w.backend.Transitions[0]}) + } + return w.backend, true +} + +// ---- internal -------------------------------------------------------------- + +// startWorker creates a Backend and launches a probe goroutine. +// pos and total are used to compute the startup stagger delay. +// Must be called with c.mu held. +func (c *Checker) startWorker(ctx context.Context, vipName string, vip config.VIP, backend net.IP, pos, total int) { + key := backendKey{VIPName: vipName, Backend: backend.String()} + wCtx, cancel := context.WithCancel(ctx) + hc := vip.HealthCheck + w := &worker{ + backend: health.New(vipName, backend, hc.Rise, hc.Fall), + hc: hc, + vip: vip, + cancel: cancel, + } + c.workers[key] = w + go c.runProbe(wCtx, key, pos, total) +} + +// runProbe is the per-backend probe loop. +// pos and total drive the initial stagger: delay = interval * pos / total. +func (c *Checker) runProbe(ctx context.Context, key backendKey, pos, total int) { + // Stagger initial probe to spread startup load. + c.mu.RLock() + w, ok := c.workers[key] + if !ok { + c.mu.RUnlock() + return + } + initialDelay := staggerDelay(w.hc.Interval, pos, total) + c.mu.RUnlock() + + if initialDelay > 0 { + select { + case <-ctx.Done(): + return + case <-time.After(initialDelay): + } + } + + for { + c.mu.RLock() + w, ok := c.workers[key] + if !ok { + c.mu.RUnlock() + return + } + hc := w.hc + vip := w.vip + maxHistory := c.cfg.HealthChecker.TransitionHistory + sleepFor := w.backend.NextInterval(hc.Interval, hc.FastInterval, hc.DownInterval) + c.mu.RUnlock() + + select { + case <-ctx.Done(): + return + case <-time.After(sleepFor): + } + + // Determine source IP based on target address family. + backendIP := net.ParseIP(key.Backend) + var probeSrc net.IP + if backendIP.To4() != nil { + probeSrc = c.cfg.ProbeIPv4Src + } else { + probeSrc = c.cfg.ProbeIPv6Src + } + + pcfg := prober.ProbeConfig{ + Target: backendIP, + Port: vip.Port, + ProbeSrc: probeSrc, + HealthCheckNetns: c.cfg.HealthCheckNetns, + Timeout: hc.Timeout, + HTTP: hc.HTTP, + TCP: hc.TCP, + } + + probeCtx, cancel := context.WithTimeout(ctx, hc.Timeout) + slog.Debug("probe-start", "vip", key.VIPName, "backend", key.Backend, "type", hc.Type) + start := time.Now() + result := prober.ForType(hc.Type)(probeCtx, pcfg) + cancel() + slog.Debug("probe-done", + "vip", key.VIPName, + "backend", key.Backend, + "type", hc.Type, + "ok", result.OK, + "code", result.Code, + "detail", result.Detail, + "elapsed", time.Since(start).Round(time.Millisecond).String(), + ) + + c.mu.Lock() + w, exists := c.workers[key] + if !exists { + c.mu.Unlock() + return + } + if w.backend.Record(result, maxHistory) { + t := w.backend.Transitions[0] + addr := w.backend.Address + slog.Info("backend-transition", + "vip", key.VIPName, + "backend", key.Backend, + "from", t.From.String(), + "to", t.To.String(), + "code", result.Code, + "detail", result.Detail, + ) + c.emit(Event{VIPName: key.VIPName, Backend: addr, Transition: t}) + } + c.mu.Unlock() + } +} + +// emit sends an event to the internal fan-out channel (non-blocking). +// Must be called with c.mu held. +func (c *Checker) emit(e Event) { + select { + case c.eventCh <- e: + default: + slog.Warn("event-drop", "vip", e.VIPName, "backend", e.Backend) + } +} + +// fanOut reads from eventCh and distributes to all subscribers. +func (c *Checker) fanOut(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case e := <-c.eventCh: + c.subsMu.Lock() + for _, ch := range c.subs { + select { + case ch <- e: + default: + // Slow subscriber — drop rather than block. + } + } + c.subsMu.Unlock() + } + } +} + +// healthCheckEqual returns true if two HealthCheck configs are functionally identical. +func healthCheckEqual(a, b config.HealthCheck) bool { + if a.Type != b.Type || + a.Interval != b.Interval || + a.FastInterval != b.FastInterval || + a.DownInterval != b.DownInterval || + a.Timeout != b.Timeout || + a.Rise != b.Rise || + a.Fall != b.Fall { + return false + } + return httpParamsEqual(a.HTTP, b.HTTP) && tcpParamsEqual(a.TCP, b.TCP) +} + +func httpParamsEqual(a, b *config.HTTPParams) bool { + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + aRe, bRe := "", "" + if a.ResponseRegexp != nil { + aRe = a.ResponseRegexp.String() + } + if b.ResponseRegexp != nil { + bRe = b.ResponseRegexp.String() + } + return a.Path == b.Path && + a.Host == b.Host && + a.ResponseCodeMin == b.ResponseCodeMin && + a.ResponseCodeMax == b.ResponseCodeMax && + aRe == bRe && + a.ServerName == b.ServerName && + a.InsecureSkipVerify == b.InsecureSkipVerify +} + +func tcpParamsEqual(a, b *config.TCPParams) bool { + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + return a.SSL == b.SSL && + a.ServerName == b.ServerName && + a.InsecureSkipVerify == b.InsecureSkipVerify +} + +// staggerDelay computes the initial probe delay for a backend at position pos +// out of total backends: delay = interval * pos / total. +func staggerDelay(interval time.Duration, pos, total int) time.Duration { + if total <= 1 { + return 0 + } + return time.Duration(int64(interval) * int64(pos) / int64(total)) +} + +// totalBackends counts all backends across all VIPs in a config. +func totalBackends(cfg *config.Frontend) int { + n := 0 + for _, vip := range cfg.VIPs { + n += len(vip.Backends) + } + return n +} diff --git a/internal/checker/checker_test.go b/internal/checker/checker_test.go new file mode 100644 index 0000000..a20f51a --- /dev/null +++ b/internal/checker/checker_test.go @@ -0,0 +1,248 @@ +package checker + +import ( + "context" + "net" + "testing" + "time" + + "git.ipng.ch/ipng/vpp-maglev/internal/config" + "git.ipng.ch/ipng/vpp-maglev/internal/health" +) + +func makeTestConfig(interval time.Duration, fall, rise int) *config.Frontend { + return &config.Frontend{ + HealthCheckNetns: "test", + HealthChecker: config.HealthCheckerConfig{TransitionHistory: 5}, + VIPs: map[string]config.VIP{ + "web": { + Address: net.ParseIP("192.0.2.1"), + Protocol: "tcp", + Port: 80, + Backends: []net.IP{net.ParseIP("10.0.0.2")}, + HealthCheck: config.HealthCheck{ + Type: "icmp", + Interval: interval, + Timeout: time.Second, + Fall: fall, + Rise: rise, + }, + }, + }, + } +} + +func TestHealthCheckEqual(t *testing.T) { + a := config.HealthCheck{ + Type: "http", + Interval: time.Second, + Timeout: 2 * time.Second, + Fall: 3, + Rise: 2, + HTTP: &config.HTTPParams{Path: "/healthz", ResponseCodeMin: 200, ResponseCodeMax: 200}, + } + b := a + if !healthCheckEqual(a, b) { + t.Error("identical configs should be equal") + } + b.Fall = 5 + if healthCheckEqual(a, b) { + t.Error("different Fall should not be equal") + } + b = a + b.FastInterval = 500 * time.Millisecond + if healthCheckEqual(a, b) { + t.Error("different FastInterval should not be equal") + } + b = a + b.HTTP = &config.HTTPParams{Path: "/other", ResponseCodeMin: 200, ResponseCodeMax: 200} + if healthCheckEqual(a, b) { + t.Error("different HTTP.Path should not be equal") + } +} + +func TestStateMachineViaBackend(t *testing.T) { + // Directly test Backend state transitions (rise=2, fall=3) without goroutines. + b := health.New("web", net.ParseIP("10.0.0.2"), 2, 3) + pass := health.ProbeResult{OK: true, Layer: health.LayerL7, Code: "L7OK"} + fail := health.ProbeResult{OK: false, Layer: health.LayerL4, Code: "L4CON"} + + // Unknown → Down on first fail. + if !b.Record(fail, 5) { + t.Error("first fail from Unknown should transition to Down") + } + if b.State != health.StateDown { + t.Errorf("expected down, got %s", b.State) + } + + // rise=2 passes → Up. + if b.Record(pass, 5) { + t.Error("should not transition after 1 pass (rise=2)") + } + if !b.Record(pass, 5) { + t.Error("should transition to Up after 2 passes") + } + if b.State != health.StateUp { + t.Errorf("expected up, got %s", b.State) + } +} + +func TestStaggerDelay(t *testing.T) { + interval := 10 * time.Second + if got := staggerDelay(interval, 0, 10); got != 0 { + t.Errorf("pos=0: got %v, want 0", got) + } + if got := staggerDelay(interval, 5, 10); got != 5*time.Second { + t.Errorf("pos=5/10: got %v, want 5s", got) + } + if got := staggerDelay(interval, 0, 1); got != 0 { + t.Errorf("total=1: got %v, want 0", got) + } +} + +func TestReloadAddsBackend(t *testing.T) { + cfg := makeTestConfig(10*time.Millisecond, 3, 2) + c := New(cfg) + + newCfg := makeTestConfig(10*time.Millisecond, 3, 2) + newCfg.VIPs["web2"] = config.VIP{ + Address: net.ParseIP("192.0.2.2"), + Protocol: "tcp", + Port: 443, + Backends: []net.IP{net.ParseIP("10.0.0.3")}, + HealthCheck: config.HealthCheck{ + Type: "icmp", + Interval: 10 * time.Millisecond, + Timeout: time.Second, + Fall: 3, + Rise: 2, + }, + } + + // Cancelled context: no probe goroutines actually run. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + if err := c.Reload(ctx, newCfg); err != nil { + t.Fatalf("Reload: %v", err) + } + + c.mu.RLock() + _, ok := c.workers[backendKey{VIPName: "web2", Backend: "10.0.0.3"}] + c.mu.RUnlock() + if !ok { + t.Error("new backend not added after Reload") + } +} + +func TestReloadRemovesBackend(t *testing.T) { + cfg := makeTestConfig(10*time.Millisecond, 3, 2) + c := New(cfg) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + // Seed a worker manually. + c.mu.Lock() + key := backendKey{VIPName: "web", Backend: "10.0.0.2"} + wCtx, wCancel := context.WithCancel(context.Background()) + c.workers[key] = &worker{ + backend: health.New("web", net.ParseIP("10.0.0.2"), 2, 3), + hc: cfg.VIPs["web"].HealthCheck, + vip: cfg.VIPs["web"], + cancel: wCancel, + } + c.mu.Unlock() + _ = wCtx + + // New config with "web" VIP removed. + newCfg := &config.Frontend{ + HealthCheckNetns: cfg.HealthCheckNetns, + HealthChecker: cfg.HealthChecker, + VIPs: map[string]config.VIP{}, + } + + if err := c.Reload(ctx, newCfg); err != nil { + t.Fatalf("Reload: %v", err) + } + + c.mu.RLock() + _, ok := c.workers[key] + c.mu.RUnlock() + if ok { + t.Error("removed backend still present after Reload") + } +} + +func TestSubscribe(t *testing.T) { + cfg := makeTestConfig(10*time.Millisecond, 1, 1) + c := New(cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go c.fanOut(ctx) + + ch, unsub := c.Subscribe() + defer unsub() + + e := Event{ + VIPName: "web", + Backend: net.ParseIP("10.0.0.2"), + Transition: health.Transition{ + From: health.StateUnknown, + To: health.StateUp, + }, + } + c.mu.Lock() + c.emit(e) + c.mu.Unlock() + + select { + case got := <-ch: + if got.VIPName != "web" { + t.Errorf("event VIPName: got %q, want %q", got.VIPName, "web") + } + if got.Transition.To != health.StateUp { + t.Errorf("event To state: got %s, want up", got.Transition.To) + } + case <-time.After(time.Second): + t.Error("timed out waiting for event") + } +} + +func TestPauseResume(t *testing.T) { + cfg := makeTestConfig(time.Hour, 3, 2) // long interval so probes never fire + c := New(cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go c.fanOut(ctx) + + // Seed a worker. + c.mu.Lock() + key := backendKey{VIPName: "web", Backend: "10.0.0.2"} + _, wCancel := context.WithCancel(ctx) + c.workers[key] = &worker{ + backend: health.New("web", net.ParseIP("10.0.0.2"), 2, 3), + hc: cfg.VIPs["web"].HealthCheck, + vip: cfg.VIPs["web"], + cancel: wCancel, + } + c.mu.Unlock() + + b, ok := c.PauseBackend("web", "10.0.0.2") + if !ok { + t.Fatal("PauseBackend: not found") + } + if b.State != health.StatePaused { + t.Errorf("after pause: %s", b.State) + } + + b, ok = c.ResumeBackend("web", "10.0.0.2") + if !ok { + t.Fatal("ResumeBackend: not found") + } + if b.State != health.StateUnknown { + t.Errorf("after resume: %s", b.State) + } +} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..eab2abe --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,368 @@ +package config + +import ( + "fmt" + "net" + "os" + "regexp" + "strconv" + "strings" + "time" + + "gopkg.in/yaml.v3" +) + +// Frontend is the parsed and validated representation of frontend.yaml. +type Frontend struct { + ProbeIPv4Src net.IP + ProbeIPv6Src net.IP + HealthCheckNetns string + HealthChecker HealthCheckerConfig + VIPs map[string]VIP +} + +// HealthCheckerConfig holds global health checker settings. +type HealthCheckerConfig struct { + TransitionHistory int +} + +// VIP is a single virtual IP entry. +type VIP struct { + Description string + Address net.IP + Protocol string // "tcp", "udp", or "" (all traffic) + Port uint16 // 0 means omitted (all ports) + Backends []net.IP + HealthCheck HealthCheck +} + +// HealthCheck describes how to probe backends for a VIP. +type HealthCheck struct { + Type string + HTTP *HTTPParams // non-nil for type http and https + TCP *TCPParams // non-nil for type tcp + Interval time.Duration + FastInterval time.Duration // optional; used while health counter is degraded + DownInterval time.Duration // optional; used while fully down + Timeout time.Duration + Rise int // default 2 + Fall int // default 3 +} + +// HTTPParams holds validated parameters for http/https health checks. +type HTTPParams struct { + Path string + Host string // Host header; defaults to backend IP if empty + ResponseCodeMin int // inclusive lower bound; default 200 + ResponseCodeMax int // inclusive upper bound; default 200 + ResponseRegexp *regexp.Regexp // nil if not configured + ServerName string // TLS SNI; falls back to Host if empty (https only) + InsecureSkipVerify bool // skip TLS certificate verification (https only) +} + +// TCPParams holds validated parameters for tcp health checks. +type TCPParams struct { + SSL bool + ServerName string + InsecureSkipVerify bool + BannerRegexp *regexp.Regexp // nil if not configured; matched against the first line sent by the server +} + +// ---- raw YAML types -------------------------------------------------------- + +type rawConfig struct { + Maglev struct { + Frontend rawFrontend `yaml:"frontend"` + } `yaml:"maglev"` +} + +type rawFrontend struct { + ProbeIPv4Src string `yaml:"probe-ipv4-src"` + ProbeIPv6Src string `yaml:"probe-ipv6-src"` + HealthCheckNetns string `yaml:"healthcheck-netns"` + HealthChecker rawHealthCheckerCfg `yaml:"healthchecker"` + VIPs map[string]rawVIP `yaml:"vips"` +} + +type rawHealthCheckerCfg struct { + TransitionHistory int `yaml:"transition-history"` +} + +type rawVIP struct { + Description string `yaml:"description"` + Address string `yaml:"address"` + Protocol string `yaml:"protocol"` + Port uint16 `yaml:"port"` + Backends []string `yaml:"backends"` + HealthCheck rawHealthCheck `yaml:"healthcheck"` +} + +type rawHealthCheck struct { + Type string `yaml:"type"` + Params rawParams `yaml:"params"` + Interval string `yaml:"interval"` + FastInterval string `yaml:"fast-interval"` + DownInterval string `yaml:"down-interval"` + Timeout string `yaml:"timeout"` + Rise int `yaml:"rise"` + Fall int `yaml:"fall"` +} + +type rawParams struct { + // HTTP / HTTPS + Path string `yaml:"path"` + Host string `yaml:"host"` + ResponseCode string `yaml:"response-code"` + ResponseRegexp string `yaml:"response-regexp"` + ServerName string `yaml:"server-name"` + InsecureSkipVerify bool `yaml:"insecure-skip-verify"` + // TCP + SSL bool `yaml:"ssl"` +} + +// ---- Load ------------------------------------------------------------------ + +// Load reads and validates the config file at path. +func Load(path string) (*Frontend, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("read config %q: %w", path, err) + } + return parse(data) +} + +func parse(data []byte) (*Frontend, error) { + var raw rawConfig + if err := yaml.Unmarshal(data, &raw); err != nil { + return nil, fmt.Errorf("parse yaml: %w", err) + } + return convert(&raw.Maglev.Frontend) +} + +func convert(r *rawFrontend) (*Frontend, error) { + f := &Frontend{} + var err error + + if f.ProbeIPv4Src, err = parseOptionalIPFamily(r.ProbeIPv4Src, 4, "probe-ipv4-src"); err != nil { + return nil, err + } + if f.ProbeIPv6Src, err = parseOptionalIPFamily(r.ProbeIPv6Src, 6, "probe-ipv6-src"); err != nil { + return nil, err + } + + f.HealthCheckNetns = r.HealthCheckNetns + + f.HealthChecker.TransitionHistory = r.HealthChecker.TransitionHistory + if f.HealthChecker.TransitionHistory == 0 { + f.HealthChecker.TransitionHistory = 5 + } + if f.HealthChecker.TransitionHistory < 1 { + return nil, fmt.Errorf("healthchecker.transition-history must be >= 1") + } + + f.VIPs = make(map[string]VIP, len(r.VIPs)) + for name, rv := range r.VIPs { + vip, err := convertVIP(name, &rv) + if err != nil { + return nil, fmt.Errorf("vip %q: %w", name, err) + } + f.VIPs[name] = vip + } + + return f, nil +} + +func convertVIP(name string, r *rawVIP) (VIP, error) { + v := VIP{ + Description: r.Description, + Protocol: r.Protocol, + Port: r.Port, + } + + ip := net.ParseIP(r.Address) + if ip == nil { + return VIP{}, fmt.Errorf("invalid address %q", r.Address) + } + v.Address = ip + + switch r.Protocol { + case "", "tcp", "udp": + default: + return VIP{}, fmt.Errorf("protocol must be \"tcp\", \"udp\", or omitted, got %q", r.Protocol) + } + + if r.Port != 0 && r.Protocol == "" { + return VIP{}, fmt.Errorf("port requires protocol to be set") + } + if r.Protocol != "" && r.Port == 0 { + return VIP{}, fmt.Errorf("protocol %q requires port to be set (1-65535)", r.Protocol) + } + + if len(r.Backends) == 0 { + return VIP{}, fmt.Errorf("backends must not be empty") + } + var firstFamily int + for i, bs := range r.Backends { + ip := net.ParseIP(bs) + if ip == nil { + return VIP{}, fmt.Errorf("backend[%d] %q is not a valid IP", i, bs) + } + fam := ipFamily(ip) + if i == 0 { + firstFamily = fam + } else if fam != firstFamily { + return VIP{}, fmt.Errorf("backend[%d] %q has different address family than backend[0]", i, bs) + } + v.Backends = append(v.Backends, ip) + } + + hc, err := convertHealthCheck(&r.HealthCheck) + if err != nil { + return VIP{}, fmt.Errorf("healthcheck: %w", err) + } + v.HealthCheck = hc + + return v, nil +} + +func convertHealthCheck(r *rawHealthCheck) (HealthCheck, error) { + h := HealthCheck{ + Type: r.Type, + } + + switch r.Type { + case "icmp": + // no params + case "tcp": + h.TCP = &TCPParams{ + SSL: r.Params.SSL, + ServerName: r.Params.ServerName, + InsecureSkipVerify: r.Params.InsecureSkipVerify, + } + case "http", "https": + if r.Params.Path == "" { + return HealthCheck{}, fmt.Errorf("type http requires params.path") + } + min, max, err := parseCodeRange(r.Params.ResponseCode, 200) + if err != nil { + return HealthCheck{}, err + } + hp := &HTTPParams{ + Path: r.Params.Path, + Host: r.Params.Host, + ResponseCodeMin: min, + ResponseCodeMax: max, + InsecureSkipVerify: r.Params.InsecureSkipVerify, + } + // TLS SNI: server-name takes precedence, falls back to host. + hp.ServerName = r.Params.ServerName + if hp.ServerName == "" { + hp.ServerName = r.Params.Host + } + if r.Params.ResponseRegexp != "" { + re, err := regexp.Compile(r.Params.ResponseRegexp) + if err != nil { + return HealthCheck{}, fmt.Errorf("invalid response-regexp %q: %w", r.Params.ResponseRegexp, err) + } + hp.ResponseRegexp = re + } + h.HTTP = hp + default: + return HealthCheck{}, fmt.Errorf("type must be \"icmp\", \"tcp\", \"http\", or \"https\", got %q", r.Type) + } + + var err error + if r.Interval == "" { + return HealthCheck{}, fmt.Errorf("interval is required") + } + if h.Interval, err = time.ParseDuration(r.Interval); err != nil || h.Interval <= 0 { + return HealthCheck{}, fmt.Errorf("interval %q must be a positive duration", r.Interval) + } + + if r.FastInterval != "" { + if h.FastInterval, err = time.ParseDuration(r.FastInterval); err != nil || h.FastInterval <= 0 { + return HealthCheck{}, fmt.Errorf("fast-interval %q must be a positive duration", r.FastInterval) + } + } + + if r.DownInterval != "" { + if h.DownInterval, err = time.ParseDuration(r.DownInterval); err != nil || h.DownInterval <= 0 { + return HealthCheck{}, fmt.Errorf("down-interval %q must be a positive duration", r.DownInterval) + } + } + + if r.Timeout == "" { + return HealthCheck{}, fmt.Errorf("timeout is required") + } + if h.Timeout, err = time.ParseDuration(r.Timeout); err != nil || h.Timeout <= 0 { + return HealthCheck{}, fmt.Errorf("timeout %q must be a positive duration", r.Timeout) + } + + h.Fall = r.Fall + if h.Fall == 0 { + h.Fall = 3 + } + if h.Fall < 1 { + return HealthCheck{}, fmt.Errorf("fall must be >= 1") + } + + h.Rise = r.Rise + if h.Rise == 0 { + h.Rise = 2 + } + if h.Rise < 1 { + return HealthCheck{}, fmt.Errorf("rise must be >= 1") + } + + return h, nil +} + +// ---- helpers --------------------------------------------------------------- + +// parseOptionalIPFamily parses s as an IP of the given family. +// Returns nil (no error) if s is empty. +func parseOptionalIPFamily(s string, family int, field string) (net.IP, error) { + if s == "" { + return nil, nil + } + ip := net.ParseIP(s) + if ip == nil { + return nil, fmt.Errorf("%s %q is not a valid IP address", field, s) + } + if ipFamily(ip) != family { + return nil, fmt.Errorf("%s %q must be an IPv%d address", field, s, family) + } + return ip, nil +} + +// ipFamily returns 4 for IPv4, 6 for IPv6. +func ipFamily(ip net.IP) int { + if ip.To4() != nil { + return 4 + } + return 6 +} + +// parseCodeRange parses a response-code value which may be a single integer +// ("200") or an inclusive range ("200-299"). Returns (min, max, err). +func parseCodeRange(s string, defaultCode int) (min, max int, err error) { + if s == "" { + return defaultCode, defaultCode, nil + } + if idx := strings.IndexByte(s, '-'); idx > 0 { + min, err = strconv.Atoi(s[:idx]) + if err != nil { + return 0, 0, fmt.Errorf("invalid response-code range %q", s) + } + max, err = strconv.Atoi(s[idx+1:]) + if err != nil { + return 0, 0, fmt.Errorf("invalid response-code range %q", s) + } + return min, max, nil + } + min, err = strconv.Atoi(s) + if err != nil { + return 0, 0, fmt.Errorf("invalid response-code %q", s) + } + return min, min, nil +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..fb9c56c --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,408 @@ +package config + +import ( + "testing" + "time" +) + +const validConfig = ` +maglev: + frontend: + probe-ipv4-src: 10.0.0.1 + probe-ipv6-src: 2001:db8:1::1 + healthcheck-netns: dataplane + healthchecker: + transition-history: 5 + vips: + web4: + description: "IPv4 VIP" + address: 192.0.2.1 + protocol: tcp + port: 80 + backends: [2001:db8:2::1, 2001:db8:2::2] + healthcheck: + type: http + params: + path: /healthz + host: example.com + response-code: "200" + interval: 2s + timeout: 3s + rise: 2 + fall: 3 + web6: + description: "IPv6 VIP" + address: 2001:db8::1 + protocol: tcp + port: 443 + backends: [2001:db8:2::1, 2001:db8:2::2] + healthcheck: + type: icmp + interval: 1s + timeout: 3s + fall: 5 +` + +func TestValidConfig(t *testing.T) { + f, err := parse([]byte(validConfig)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if f.ProbeIPv4Src.String() != "10.0.0.1" { + t.Errorf("probe-ipv4-src: got %s, want 10.0.0.1", f.ProbeIPv4Src) + } + if f.ProbeIPv6Src.String() != "2001:db8:1::1" { + t.Errorf("probe-ipv6-src: got %s, want 2001:db8:1::1", f.ProbeIPv6Src) + } + if f.HealthCheckNetns != "dataplane" { + t.Errorf("healthcheck-netns: got %q, want %q", f.HealthCheckNetns, "dataplane") + } + if f.HealthChecker.TransitionHistory != 5 { + t.Errorf("transition-history: got %d, want 5", f.HealthChecker.TransitionHistory) + } + if len(f.VIPs) != 2 { + t.Fatalf("vips: got %d, want 2", len(f.VIPs)) + } + web4 := f.VIPs["web4"] + if web4.HealthCheck.Type != "http" { + t.Errorf("web4 healthcheck type: got %q, want http", web4.HealthCheck.Type) + } + if web4.HealthCheck.Fall != 3 { + t.Errorf("web4 fall: got %d, want 3", web4.HealthCheck.Fall) + } + if web4.HealthCheck.Rise != 2 { + t.Errorf("web4 rise: got %d, want 2", web4.HealthCheck.Rise) + } + if web4.HealthCheck.HTTP == nil { + t.Fatal("web4 HTTP params should not be nil") + } + if web4.HealthCheck.HTTP.Path != "/healthz" { + t.Errorf("web4 params.path: got %q, want /healthz", web4.HealthCheck.HTTP.Path) + } + if web4.HealthCheck.HTTP.Host != "example.com" { + t.Errorf("web4 params.host: got %q, want example.com", web4.HealthCheck.HTTP.Host) + } + if web4.HealthCheck.HTTP.ResponseCodeMin != 200 || web4.HealthCheck.HTTP.ResponseCodeMax != 200 { + t.Errorf("web4 response-code: got %d-%d, want 200-200", + web4.HealthCheck.HTTP.ResponseCodeMin, web4.HealthCheck.HTTP.ResponseCodeMax) + } + web6 := f.VIPs["web6"] + if web6.HealthCheck.Fall != 5 { + t.Errorf("web6 fall: got %d, want 5", web6.HealthCheck.Fall) + } +} + +func TestDefaults(t *testing.T) { + cfg := ` +maglev: + frontend: + vips: + v: + address: 192.0.2.1 + backends: [10.0.0.2] + healthcheck: + type: icmp + interval: 1s + timeout: 2s +` + f, err := parse([]byte(cfg)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if f.HealthCheckNetns != "" { + t.Errorf("default healthcheck-netns: got %q, want empty (default netns)", f.HealthCheckNetns) + } + if f.HealthChecker.TransitionHistory != 5 { + t.Errorf("default transition-history: got %d, want 5", f.HealthChecker.TransitionHistory) + } + hc := f.VIPs["v"].HealthCheck + if hc.Rise != 2 { + t.Errorf("default rise: got %d, want 2", hc.Rise) + } + if hc.Fall != 3 { + t.Errorf("default fall: got %d, want 3", hc.Fall) + } + if f.ProbeIPv4Src != nil { + t.Errorf("probe-ipv4-src should be nil when omitted, got %s", f.ProbeIPv4Src) + } +} + +func TestOptionalIntervals(t *testing.T) { + cfg := ` +maglev: + frontend: + vips: + v: + address: 192.0.2.1 + backends: [10.0.0.2] + healthcheck: + type: icmp + interval: 2s + fast-interval: 500ms + down-interval: 30s + timeout: 1s +` + f, err := parse([]byte(cfg)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + hc := f.VIPs["v"].HealthCheck + if hc.Interval != 2*time.Second { + t.Errorf("interval: got %v, want 2s", hc.Interval) + } + if hc.FastInterval != 500*time.Millisecond { + t.Errorf("fast-interval: got %v, want 500ms", hc.FastInterval) + } + if hc.DownInterval != 30*time.Second { + t.Errorf("down-interval: got %v, want 30s", hc.DownInterval) + } +} + +func TestTCPType(t *testing.T) { + cfg := ` +maglev: + frontend: + vips: + v: + address: 192.0.2.1 + protocol: tcp + port: 80 + backends: [10.0.0.2] + healthcheck: + type: tcp + interval: 1s + timeout: 2s +` + f, err := parse([]byte(cfg)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if f.VIPs["v"].HealthCheck.Type != "tcp" { + t.Errorf("type: got %q, want tcp", f.VIPs["v"].HealthCheck.Type) + } +} + +func TestValidationErrors(t *testing.T) { + // Minimal valid base to build error cases on top of. + base := func(override string) string { + return ` +maglev: + frontend: + vips: + v: + address: 192.0.2.1 + backends: [10.0.0.2] + healthcheck: + type: icmp + interval: 1s + timeout: 2s +` + override + } + _ = base + + tests := []struct { + name string + yaml string + errSub string + }{ + { + name: "wrong family probe-ipv4-src", + yaml: ` +maglev: + frontend: + probe-ipv4-src: 2001:db8::1 + vips: + v: + address: 192.0.2.1 + backends: [10.0.0.2] + healthcheck: + type: icmp + interval: 1s + timeout: 2s +`, + errSub: "probe-ipv4-src", + }, + { + name: "mixed backend families", + yaml: ` +maglev: + frontend: + vips: + v: + address: 192.0.2.1 + backends: [10.0.0.2, 2001:db8::1] + healthcheck: + type: icmp + interval: 1s + timeout: 2s +`, + errSub: "address family", + }, + { + name: "port without protocol", + yaml: validPortWithoutProtocol, + + errSub: "port requires protocol", + }, + { + name: "protocol without port", + yaml: ` +maglev: + frontend: + vips: + v: + address: 192.0.2.1 + protocol: tcp + backends: [10.0.0.2] + healthcheck: + type: icmp + interval: 1s + timeout: 2s +`, + errSub: "requires port", + }, + { + name: "invalid healthcheck type", + yaml: ` +maglev: + frontend: + vips: + v: + address: 192.0.2.1 + backends: [10.0.0.2] + healthcheck: + type: dns + interval: 1s + timeout: 2s +`, + errSub: "type must be", + }, + { + name: "http missing path", + yaml: ` +maglev: + frontend: + vips: + v: + address: 192.0.2.1 + backends: [10.0.0.2] + healthcheck: + type: http + interval: 1s + timeout: 2s +`, + errSub: "params.path", + }, + { + name: "negative interval", + yaml: ` +maglev: + frontend: + vips: + v: + address: 192.0.2.1 + backends: [10.0.0.2] + healthcheck: + type: icmp + interval: -1s + timeout: 2s +`, + errSub: "positive duration", + }, + { + name: "invalid fast-interval", + yaml: ` +maglev: + frontend: + vips: + v: + address: 192.0.2.1 + backends: [10.0.0.2] + healthcheck: + type: icmp + interval: 1s + fast-interval: -1s + timeout: 2s +`, + errSub: "positive duration", + }, + { + name: "fall zero becomes default", + yaml: ` +maglev: + frontend: + vips: + v: + address: 192.0.2.1 + backends: [10.0.0.2] + healthcheck: + type: icmp + interval: 1s + timeout: 2s + fall: 0 +`, + // fall: 0 is treated as omitted → default 3; no error + errSub: "", + }, + { + name: "empty backends", + yaml: ` +maglev: + frontend: + vips: + v: + address: 192.0.2.1 + backends: [] + healthcheck: + type: icmp + interval: 1s + timeout: 2s +`, + errSub: "backends must not be empty", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := parse([]byte(tt.yaml)) + if tt.errSub == "" { + if err != nil { + t.Fatalf("expected no error, got: %v", err) + } + return + } + if err == nil { + t.Fatalf("expected error containing %q, got nil", tt.errSub) + } + if !contains(err.Error(), tt.errSub) { + t.Errorf("error %q does not contain %q", err.Error(), tt.errSub) + } + }) + } +} + +const validPortWithoutProtocol = ` +maglev: + frontend: + vips: + v: + address: 192.0.2.1 + port: 80 + backends: [10.0.0.2] + healthcheck: + type: icmp + interval: 1s + timeout: 2s +` + +func contains(s, sub string) bool { + return len(s) >= len(sub) && (s == sub || len(sub) == 0 || + func() bool { + for i := 0; i <= len(s)-len(sub); i++ { + if s[i:i+len(sub)] == sub { + return true + } + } + return false + }()) +} diff --git a/internal/grpcapi/healthchecker.pb.go b/internal/grpcapi/healthchecker.pb.go new file mode 100644 index 0000000..a828620 --- /dev/null +++ b/internal/grpcapi/healthchecker.pb.go @@ -0,0 +1,776 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.11 +// protoc v3.21.12 +// source: proto/healthchecker.proto + +package grpcapi + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ListVIPsRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ListVIPsRequest) Reset() { + *x = ListVIPsRequest{} + mi := &file_proto_healthchecker_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ListVIPsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListVIPsRequest) ProtoMessage() {} + +func (x *ListVIPsRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_healthchecker_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListVIPsRequest.ProtoReflect.Descriptor instead. +func (*ListVIPsRequest) Descriptor() ([]byte, []int) { + return file_proto_healthchecker_proto_rawDescGZIP(), []int{0} +} + +type GetVIPRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + VipName string `protobuf:"bytes,1,opt,name=vip_name,json=vipName,proto3" json:"vip_name,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetVIPRequest) Reset() { + *x = GetVIPRequest{} + mi := &file_proto_healthchecker_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetVIPRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetVIPRequest) ProtoMessage() {} + +func (x *GetVIPRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_healthchecker_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetVIPRequest.ProtoReflect.Descriptor instead. +func (*GetVIPRequest) Descriptor() ([]byte, []int) { + return file_proto_healthchecker_proto_rawDescGZIP(), []int{1} +} + +func (x *GetVIPRequest) GetVipName() string { + if x != nil { + return x.VipName + } + return "" +} + +type ListBackendsRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + VipName string `protobuf:"bytes,1,opt,name=vip_name,json=vipName,proto3" json:"vip_name,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ListBackendsRequest) Reset() { + *x = ListBackendsRequest{} + mi := &file_proto_healthchecker_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ListBackendsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListBackendsRequest) ProtoMessage() {} + +func (x *ListBackendsRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_healthchecker_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListBackendsRequest.ProtoReflect.Descriptor instead. +func (*ListBackendsRequest) Descriptor() ([]byte, []int) { + return file_proto_healthchecker_proto_rawDescGZIP(), []int{2} +} + +func (x *ListBackendsRequest) GetVipName() string { + if x != nil { + return x.VipName + } + return "" +} + +type GetBackendRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + VipName string `protobuf:"bytes,1,opt,name=vip_name,json=vipName,proto3" json:"vip_name,omitempty"` + BackendAddress string `protobuf:"bytes,2,opt,name=backend_address,json=backendAddress,proto3" json:"backend_address,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetBackendRequest) Reset() { + *x = GetBackendRequest{} + mi := &file_proto_healthchecker_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetBackendRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetBackendRequest) ProtoMessage() {} + +func (x *GetBackendRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_healthchecker_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetBackendRequest.ProtoReflect.Descriptor instead. +func (*GetBackendRequest) Descriptor() ([]byte, []int) { + return file_proto_healthchecker_proto_rawDescGZIP(), []int{3} +} + +func (x *GetBackendRequest) GetVipName() string { + if x != nil { + return x.VipName + } + return "" +} + +func (x *GetBackendRequest) GetBackendAddress() string { + if x != nil { + return x.BackendAddress + } + return "" +} + +type PauseResumeRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + VipName string `protobuf:"bytes,1,opt,name=vip_name,json=vipName,proto3" json:"vip_name,omitempty"` + BackendAddress string `protobuf:"bytes,2,opt,name=backend_address,json=backendAddress,proto3" json:"backend_address,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PauseResumeRequest) Reset() { + *x = PauseResumeRequest{} + mi := &file_proto_healthchecker_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PauseResumeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PauseResumeRequest) ProtoMessage() {} + +func (x *PauseResumeRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_healthchecker_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PauseResumeRequest.ProtoReflect.Descriptor instead. +func (*PauseResumeRequest) Descriptor() ([]byte, []int) { + return file_proto_healthchecker_proto_rawDescGZIP(), []int{4} +} + +func (x *PauseResumeRequest) GetVipName() string { + if x != nil { + return x.VipName + } + return "" +} + +func (x *PauseResumeRequest) GetBackendAddress() string { + if x != nil { + return x.BackendAddress + } + return "" +} + +type WatchRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WatchRequest) Reset() { + *x = WatchRequest{} + mi := &file_proto_healthchecker_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WatchRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WatchRequest) ProtoMessage() {} + +func (x *WatchRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_healthchecker_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WatchRequest.ProtoReflect.Descriptor instead. +func (*WatchRequest) Descriptor() ([]byte, []int) { + return file_proto_healthchecker_proto_rawDescGZIP(), []int{5} +} + +type ListVIPsResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + VipNames []string `protobuf:"bytes,1,rep,name=vip_names,json=vipNames,proto3" json:"vip_names,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ListVIPsResponse) Reset() { + *x = ListVIPsResponse{} + mi := &file_proto_healthchecker_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ListVIPsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListVIPsResponse) ProtoMessage() {} + +func (x *ListVIPsResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_healthchecker_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListVIPsResponse.ProtoReflect.Descriptor instead. +func (*ListVIPsResponse) Descriptor() ([]byte, []int) { + return file_proto_healthchecker_proto_rawDescGZIP(), []int{6} +} + +func (x *ListVIPsResponse) GetVipNames() []string { + if x != nil { + return x.VipNames + } + return nil +} + +type VIPInfo struct { + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` + Protocol string `protobuf:"bytes,3,opt,name=protocol,proto3" json:"protocol,omitempty"` + Port uint32 `protobuf:"varint,4,opt,name=port,proto3" json:"port,omitempty"` + Backends []string `protobuf:"bytes,5,rep,name=backends,proto3" json:"backends,omitempty"` + Description string `protobuf:"bytes,6,opt,name=description,proto3" json:"description,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *VIPInfo) Reset() { + *x = VIPInfo{} + mi := &file_proto_healthchecker_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *VIPInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*VIPInfo) ProtoMessage() {} + +func (x *VIPInfo) ProtoReflect() protoreflect.Message { + mi := &file_proto_healthchecker_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use VIPInfo.ProtoReflect.Descriptor instead. +func (*VIPInfo) Descriptor() ([]byte, []int) { + return file_proto_healthchecker_proto_rawDescGZIP(), []int{7} +} + +func (x *VIPInfo) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *VIPInfo) GetAddress() string { + if x != nil { + return x.Address + } + return "" +} + +func (x *VIPInfo) GetProtocol() string { + if x != nil { + return x.Protocol + } + return "" +} + +func (x *VIPInfo) GetPort() uint32 { + if x != nil { + return x.Port + } + return 0 +} + +func (x *VIPInfo) GetBackends() []string { + if x != nil { + return x.Backends + } + return nil +} + +func (x *VIPInfo) GetDescription() string { + if x != nil { + return x.Description + } + return "" +} + +type ListBackendsResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Backends []*BackendInfo `protobuf:"bytes,1,rep,name=backends,proto3" json:"backends,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ListBackendsResponse) Reset() { + *x = ListBackendsResponse{} + mi := &file_proto_healthchecker_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ListBackendsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListBackendsResponse) ProtoMessage() {} + +func (x *ListBackendsResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_healthchecker_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListBackendsResponse.ProtoReflect.Descriptor instead. +func (*ListBackendsResponse) Descriptor() ([]byte, []int) { + return file_proto_healthchecker_proto_rawDescGZIP(), []int{8} +} + +func (x *ListBackendsResponse) GetBackends() []*BackendInfo { + if x != nil { + return x.Backends + } + return nil +} + +type BackendInfo struct { + state protoimpl.MessageState `protogen:"open.v1"` + VipName string `protobuf:"bytes,1,opt,name=vip_name,json=vipName,proto3" json:"vip_name,omitempty"` + Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` + State string `protobuf:"bytes,3,opt,name=state,proto3" json:"state,omitempty"` + Transitions []*TransitionRecord `protobuf:"bytes,4,rep,name=transitions,proto3" json:"transitions,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *BackendInfo) Reset() { + *x = BackendInfo{} + mi := &file_proto_healthchecker_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *BackendInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BackendInfo) ProtoMessage() {} + +func (x *BackendInfo) ProtoReflect() protoreflect.Message { + mi := &file_proto_healthchecker_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BackendInfo.ProtoReflect.Descriptor instead. +func (*BackendInfo) Descriptor() ([]byte, []int) { + return file_proto_healthchecker_proto_rawDescGZIP(), []int{9} +} + +func (x *BackendInfo) GetVipName() string { + if x != nil { + return x.VipName + } + return "" +} + +func (x *BackendInfo) GetAddress() string { + if x != nil { + return x.Address + } + return "" +} + +func (x *BackendInfo) GetState() string { + if x != nil { + return x.State + } + return "" +} + +func (x *BackendInfo) GetTransitions() []*TransitionRecord { + if x != nil { + return x.Transitions + } + return nil +} + +type TransitionRecord struct { + state protoimpl.MessageState `protogen:"open.v1"` + From string `protobuf:"bytes,1,opt,name=from,proto3" json:"from,omitempty"` + To string `protobuf:"bytes,2,opt,name=to,proto3" json:"to,omitempty"` + AtUnixNs int64 `protobuf:"varint,3,opt,name=at_unix_ns,json=atUnixNs,proto3" json:"at_unix_ns,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TransitionRecord) Reset() { + *x = TransitionRecord{} + mi := &file_proto_healthchecker_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TransitionRecord) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TransitionRecord) ProtoMessage() {} + +func (x *TransitionRecord) ProtoReflect() protoreflect.Message { + mi := &file_proto_healthchecker_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TransitionRecord.ProtoReflect.Descriptor instead. +func (*TransitionRecord) Descriptor() ([]byte, []int) { + return file_proto_healthchecker_proto_rawDescGZIP(), []int{10} +} + +func (x *TransitionRecord) GetFrom() string { + if x != nil { + return x.From + } + return "" +} + +func (x *TransitionRecord) GetTo() string { + if x != nil { + return x.To + } + return "" +} + +func (x *TransitionRecord) GetAtUnixNs() int64 { + if x != nil { + return x.AtUnixNs + } + return 0 +} + +type TransitionEvent struct { + state protoimpl.MessageState `protogen:"open.v1"` + VipName string `protobuf:"bytes,1,opt,name=vip_name,json=vipName,proto3" json:"vip_name,omitempty"` + BackendAddress string `protobuf:"bytes,2,opt,name=backend_address,json=backendAddress,proto3" json:"backend_address,omitempty"` + Transition *TransitionRecord `protobuf:"bytes,3,opt,name=transition,proto3" json:"transition,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TransitionEvent) Reset() { + *x = TransitionEvent{} + mi := &file_proto_healthchecker_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TransitionEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TransitionEvent) ProtoMessage() {} + +func (x *TransitionEvent) ProtoReflect() protoreflect.Message { + mi := &file_proto_healthchecker_proto_msgTypes[11] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TransitionEvent.ProtoReflect.Descriptor instead. +func (*TransitionEvent) Descriptor() ([]byte, []int) { + return file_proto_healthchecker_proto_rawDescGZIP(), []int{11} +} + +func (x *TransitionEvent) GetVipName() string { + if x != nil { + return x.VipName + } + return "" +} + +func (x *TransitionEvent) GetBackendAddress() string { + if x != nil { + return x.BackendAddress + } + return "" +} + +func (x *TransitionEvent) GetTransition() *TransitionRecord { + if x != nil { + return x.Transition + } + return nil +} + +var File_proto_healthchecker_proto protoreflect.FileDescriptor + +const file_proto_healthchecker_proto_rawDesc = "" + + "\n" + + "\x19proto/healthchecker.proto\x12\rhealthchecker\"\x11\n" + + "\x0fListVIPsRequest\"*\n" + + "\rGetVIPRequest\x12\x19\n" + + "\bvip_name\x18\x01 \x01(\tR\avipName\"0\n" + + "\x13ListBackendsRequest\x12\x19\n" + + "\bvip_name\x18\x01 \x01(\tR\avipName\"W\n" + + "\x11GetBackendRequest\x12\x19\n" + + "\bvip_name\x18\x01 \x01(\tR\avipName\x12'\n" + + "\x0fbackend_address\x18\x02 \x01(\tR\x0ebackendAddress\"X\n" + + "\x12PauseResumeRequest\x12\x19\n" + + "\bvip_name\x18\x01 \x01(\tR\avipName\x12'\n" + + "\x0fbackend_address\x18\x02 \x01(\tR\x0ebackendAddress\"\x0e\n" + + "\fWatchRequest\"/\n" + + "\x10ListVIPsResponse\x12\x1b\n" + + "\tvip_names\x18\x01 \x03(\tR\bvipNames\"\xa5\x01\n" + + "\aVIPInfo\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\x12\x18\n" + + "\aaddress\x18\x02 \x01(\tR\aaddress\x12\x1a\n" + + "\bprotocol\x18\x03 \x01(\tR\bprotocol\x12\x12\n" + + "\x04port\x18\x04 \x01(\rR\x04port\x12\x1a\n" + + "\bbackends\x18\x05 \x03(\tR\bbackends\x12 \n" + + "\vdescription\x18\x06 \x01(\tR\vdescription\"N\n" + + "\x14ListBackendsResponse\x126\n" + + "\bbackends\x18\x01 \x03(\v2\x1a.healthchecker.BackendInfoR\bbackends\"\x9b\x01\n" + + "\vBackendInfo\x12\x19\n" + + "\bvip_name\x18\x01 \x01(\tR\avipName\x12\x18\n" + + "\aaddress\x18\x02 \x01(\tR\aaddress\x12\x14\n" + + "\x05state\x18\x03 \x01(\tR\x05state\x12A\n" + + "\vtransitions\x18\x04 \x03(\v2\x1f.healthchecker.TransitionRecordR\vtransitions\"T\n" + + "\x10TransitionRecord\x12\x12\n" + + "\x04from\x18\x01 \x01(\tR\x04from\x12\x0e\n" + + "\x02to\x18\x02 \x01(\tR\x02to\x12\x1c\n" + + "\n" + + "at_unix_ns\x18\x03 \x01(\x03R\batUnixNs\"\x96\x01\n" + + "\x0fTransitionEvent\x12\x19\n" + + "\bvip_name\x18\x01 \x01(\tR\avipName\x12'\n" + + "\x0fbackend_address\x18\x02 \x01(\tR\x0ebackendAddress\x12?\n" + + "\n" + + "transition\x18\x03 \x01(\v2\x1f.healthchecker.TransitionRecordR\n" + + "transition2\xb3\x04\n" + + "\rHealthChecker\x12K\n" + + "\bListVIPs\x12\x1e.healthchecker.ListVIPsRequest\x1a\x1f.healthchecker.ListVIPsResponse\x12>\n" + + "\x06GetVIP\x12\x1c.healthchecker.GetVIPRequest\x1a\x16.healthchecker.VIPInfo\x12W\n" + + "\fListBackends\x12\".healthchecker.ListBackendsRequest\x1a#.healthchecker.ListBackendsResponse\x12J\n" + + "\n" + + "GetBackend\x12 .healthchecker.GetBackendRequest\x1a\x1a.healthchecker.BackendInfo\x12M\n" + + "\fPauseBackend\x12!.healthchecker.PauseResumeRequest\x1a\x1a.healthchecker.BackendInfo\x12N\n" + + "\rResumeBackend\x12!.healthchecker.PauseResumeRequest\x1a\x1a.healthchecker.BackendInfo\x12Q\n" + + "\x10WatchTransitions\x12\x1b.healthchecker.WatchRequest\x1a\x1e.healthchecker.TransitionEvent0\x01B.Z,git.ipng.ch/ipng/vpp-maglev/internal/grpcapib\x06proto3" + +var ( + file_proto_healthchecker_proto_rawDescOnce sync.Once + file_proto_healthchecker_proto_rawDescData []byte +) + +func file_proto_healthchecker_proto_rawDescGZIP() []byte { + file_proto_healthchecker_proto_rawDescOnce.Do(func() { + file_proto_healthchecker_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_proto_healthchecker_proto_rawDesc), len(file_proto_healthchecker_proto_rawDesc))) + }) + return file_proto_healthchecker_proto_rawDescData +} + +var file_proto_healthchecker_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_proto_healthchecker_proto_goTypes = []any{ + (*ListVIPsRequest)(nil), // 0: healthchecker.ListVIPsRequest + (*GetVIPRequest)(nil), // 1: healthchecker.GetVIPRequest + (*ListBackendsRequest)(nil), // 2: healthchecker.ListBackendsRequest + (*GetBackendRequest)(nil), // 3: healthchecker.GetBackendRequest + (*PauseResumeRequest)(nil), // 4: healthchecker.PauseResumeRequest + (*WatchRequest)(nil), // 5: healthchecker.WatchRequest + (*ListVIPsResponse)(nil), // 6: healthchecker.ListVIPsResponse + (*VIPInfo)(nil), // 7: healthchecker.VIPInfo + (*ListBackendsResponse)(nil), // 8: healthchecker.ListBackendsResponse + (*BackendInfo)(nil), // 9: healthchecker.BackendInfo + (*TransitionRecord)(nil), // 10: healthchecker.TransitionRecord + (*TransitionEvent)(nil), // 11: healthchecker.TransitionEvent +} +var file_proto_healthchecker_proto_depIdxs = []int32{ + 9, // 0: healthchecker.ListBackendsResponse.backends:type_name -> healthchecker.BackendInfo + 10, // 1: healthchecker.BackendInfo.transitions:type_name -> healthchecker.TransitionRecord + 10, // 2: healthchecker.TransitionEvent.transition:type_name -> healthchecker.TransitionRecord + 0, // 3: healthchecker.HealthChecker.ListVIPs:input_type -> healthchecker.ListVIPsRequest + 1, // 4: healthchecker.HealthChecker.GetVIP:input_type -> healthchecker.GetVIPRequest + 2, // 5: healthchecker.HealthChecker.ListBackends:input_type -> healthchecker.ListBackendsRequest + 3, // 6: healthchecker.HealthChecker.GetBackend:input_type -> healthchecker.GetBackendRequest + 4, // 7: healthchecker.HealthChecker.PauseBackend:input_type -> healthchecker.PauseResumeRequest + 4, // 8: healthchecker.HealthChecker.ResumeBackend:input_type -> healthchecker.PauseResumeRequest + 5, // 9: healthchecker.HealthChecker.WatchTransitions:input_type -> healthchecker.WatchRequest + 6, // 10: healthchecker.HealthChecker.ListVIPs:output_type -> healthchecker.ListVIPsResponse + 7, // 11: healthchecker.HealthChecker.GetVIP:output_type -> healthchecker.VIPInfo + 8, // 12: healthchecker.HealthChecker.ListBackends:output_type -> healthchecker.ListBackendsResponse + 9, // 13: healthchecker.HealthChecker.GetBackend:output_type -> healthchecker.BackendInfo + 9, // 14: healthchecker.HealthChecker.PauseBackend:output_type -> healthchecker.BackendInfo + 9, // 15: healthchecker.HealthChecker.ResumeBackend:output_type -> healthchecker.BackendInfo + 11, // 16: healthchecker.HealthChecker.WatchTransitions:output_type -> healthchecker.TransitionEvent + 10, // [10:17] is the sub-list for method output_type + 3, // [3:10] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_proto_healthchecker_proto_init() } +func file_proto_healthchecker_proto_init() { + if File_proto_healthchecker_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_healthchecker_proto_rawDesc), len(file_proto_healthchecker_proto_rawDesc)), + NumEnums: 0, + NumMessages: 12, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proto_healthchecker_proto_goTypes, + DependencyIndexes: file_proto_healthchecker_proto_depIdxs, + MessageInfos: file_proto_healthchecker_proto_msgTypes, + }.Build() + File_proto_healthchecker_proto = out.File + file_proto_healthchecker_proto_goTypes = nil + file_proto_healthchecker_proto_depIdxs = nil +} diff --git a/internal/grpcapi/healthchecker_grpc.pb.go b/internal/grpcapi/healthchecker_grpc.pb.go new file mode 100644 index 0000000..15f0c98 --- /dev/null +++ b/internal/grpcapi/healthchecker_grpc.pb.go @@ -0,0 +1,357 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.6.1 +// - protoc v3.21.12 +// source: proto/healthchecker.proto + +package grpcapi + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + HealthChecker_ListVIPs_FullMethodName = "/healthchecker.HealthChecker/ListVIPs" + HealthChecker_GetVIP_FullMethodName = "/healthchecker.HealthChecker/GetVIP" + HealthChecker_ListBackends_FullMethodName = "/healthchecker.HealthChecker/ListBackends" + HealthChecker_GetBackend_FullMethodName = "/healthchecker.HealthChecker/GetBackend" + HealthChecker_PauseBackend_FullMethodName = "/healthchecker.HealthChecker/PauseBackend" + HealthChecker_ResumeBackend_FullMethodName = "/healthchecker.HealthChecker/ResumeBackend" + HealthChecker_WatchTransitions_FullMethodName = "/healthchecker.HealthChecker/WatchTransitions" +) + +// HealthCheckerClient is the client API for HealthChecker service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// HealthChecker exposes the state of backend health for all VIPs. +type HealthCheckerClient interface { + ListVIPs(ctx context.Context, in *ListVIPsRequest, opts ...grpc.CallOption) (*ListVIPsResponse, error) + GetVIP(ctx context.Context, in *GetVIPRequest, opts ...grpc.CallOption) (*VIPInfo, error) + ListBackends(ctx context.Context, in *ListBackendsRequest, opts ...grpc.CallOption) (*ListBackendsResponse, error) + GetBackend(ctx context.Context, in *GetBackendRequest, opts ...grpc.CallOption) (*BackendInfo, error) + PauseBackend(ctx context.Context, in *PauseResumeRequest, opts ...grpc.CallOption) (*BackendInfo, error) + ResumeBackend(ctx context.Context, in *PauseResumeRequest, opts ...grpc.CallOption) (*BackendInfo, error) + WatchTransitions(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[TransitionEvent], error) +} + +type healthCheckerClient struct { + cc grpc.ClientConnInterface +} + +func NewHealthCheckerClient(cc grpc.ClientConnInterface) HealthCheckerClient { + return &healthCheckerClient{cc} +} + +func (c *healthCheckerClient) ListVIPs(ctx context.Context, in *ListVIPsRequest, opts ...grpc.CallOption) (*ListVIPsResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ListVIPsResponse) + err := c.cc.Invoke(ctx, HealthChecker_ListVIPs_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *healthCheckerClient) GetVIP(ctx context.Context, in *GetVIPRequest, opts ...grpc.CallOption) (*VIPInfo, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(VIPInfo) + err := c.cc.Invoke(ctx, HealthChecker_GetVIP_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *healthCheckerClient) ListBackends(ctx context.Context, in *ListBackendsRequest, opts ...grpc.CallOption) (*ListBackendsResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ListBackendsResponse) + err := c.cc.Invoke(ctx, HealthChecker_ListBackends_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *healthCheckerClient) GetBackend(ctx context.Context, in *GetBackendRequest, opts ...grpc.CallOption) (*BackendInfo, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(BackendInfo) + err := c.cc.Invoke(ctx, HealthChecker_GetBackend_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *healthCheckerClient) PauseBackend(ctx context.Context, in *PauseResumeRequest, opts ...grpc.CallOption) (*BackendInfo, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(BackendInfo) + err := c.cc.Invoke(ctx, HealthChecker_PauseBackend_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *healthCheckerClient) ResumeBackend(ctx context.Context, in *PauseResumeRequest, opts ...grpc.CallOption) (*BackendInfo, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(BackendInfo) + err := c.cc.Invoke(ctx, HealthChecker_ResumeBackend_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *healthCheckerClient) WatchTransitions(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[TransitionEvent], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &HealthChecker_ServiceDesc.Streams[0], HealthChecker_WatchTransitions_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[WatchRequest, TransitionEvent]{ClientStream: stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type HealthChecker_WatchTransitionsClient = grpc.ServerStreamingClient[TransitionEvent] + +// HealthCheckerServer is the server API for HealthChecker service. +// All implementations must embed UnimplementedHealthCheckerServer +// for forward compatibility. +// +// HealthChecker exposes the state of backend health for all VIPs. +type HealthCheckerServer interface { + ListVIPs(context.Context, *ListVIPsRequest) (*ListVIPsResponse, error) + GetVIP(context.Context, *GetVIPRequest) (*VIPInfo, error) + ListBackends(context.Context, *ListBackendsRequest) (*ListBackendsResponse, error) + GetBackend(context.Context, *GetBackendRequest) (*BackendInfo, error) + PauseBackend(context.Context, *PauseResumeRequest) (*BackendInfo, error) + ResumeBackend(context.Context, *PauseResumeRequest) (*BackendInfo, error) + WatchTransitions(*WatchRequest, grpc.ServerStreamingServer[TransitionEvent]) error + mustEmbedUnimplementedHealthCheckerServer() +} + +// UnimplementedHealthCheckerServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedHealthCheckerServer struct{} + +func (UnimplementedHealthCheckerServer) ListVIPs(context.Context, *ListVIPsRequest) (*ListVIPsResponse, error) { + return nil, status.Error(codes.Unimplemented, "method ListVIPs not implemented") +} +func (UnimplementedHealthCheckerServer) GetVIP(context.Context, *GetVIPRequest) (*VIPInfo, error) { + return nil, status.Error(codes.Unimplemented, "method GetVIP not implemented") +} +func (UnimplementedHealthCheckerServer) ListBackends(context.Context, *ListBackendsRequest) (*ListBackendsResponse, error) { + return nil, status.Error(codes.Unimplemented, "method ListBackends not implemented") +} +func (UnimplementedHealthCheckerServer) GetBackend(context.Context, *GetBackendRequest) (*BackendInfo, error) { + return nil, status.Error(codes.Unimplemented, "method GetBackend not implemented") +} +func (UnimplementedHealthCheckerServer) PauseBackend(context.Context, *PauseResumeRequest) (*BackendInfo, error) { + return nil, status.Error(codes.Unimplemented, "method PauseBackend not implemented") +} +func (UnimplementedHealthCheckerServer) ResumeBackend(context.Context, *PauseResumeRequest) (*BackendInfo, error) { + return nil, status.Error(codes.Unimplemented, "method ResumeBackend not implemented") +} +func (UnimplementedHealthCheckerServer) WatchTransitions(*WatchRequest, grpc.ServerStreamingServer[TransitionEvent]) error { + return status.Error(codes.Unimplemented, "method WatchTransitions not implemented") +} +func (UnimplementedHealthCheckerServer) mustEmbedUnimplementedHealthCheckerServer() {} +func (UnimplementedHealthCheckerServer) testEmbeddedByValue() {} + +// UnsafeHealthCheckerServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to HealthCheckerServer will +// result in compilation errors. +type UnsafeHealthCheckerServer interface { + mustEmbedUnimplementedHealthCheckerServer() +} + +func RegisterHealthCheckerServer(s grpc.ServiceRegistrar, srv HealthCheckerServer) { + // If the following call panics, it indicates UnimplementedHealthCheckerServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&HealthChecker_ServiceDesc, srv) +} + +func _HealthChecker_ListVIPs_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ListVIPsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HealthCheckerServer).ListVIPs(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: HealthChecker_ListVIPs_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HealthCheckerServer).ListVIPs(ctx, req.(*ListVIPsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _HealthChecker_GetVIP_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetVIPRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HealthCheckerServer).GetVIP(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: HealthChecker_GetVIP_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HealthCheckerServer).GetVIP(ctx, req.(*GetVIPRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _HealthChecker_ListBackends_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ListBackendsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HealthCheckerServer).ListBackends(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: HealthChecker_ListBackends_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HealthCheckerServer).ListBackends(ctx, req.(*ListBackendsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _HealthChecker_GetBackend_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetBackendRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HealthCheckerServer).GetBackend(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: HealthChecker_GetBackend_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HealthCheckerServer).GetBackend(ctx, req.(*GetBackendRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _HealthChecker_PauseBackend_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PauseResumeRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HealthCheckerServer).PauseBackend(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: HealthChecker_PauseBackend_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HealthCheckerServer).PauseBackend(ctx, req.(*PauseResumeRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _HealthChecker_ResumeBackend_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PauseResumeRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HealthCheckerServer).ResumeBackend(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: HealthChecker_ResumeBackend_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HealthCheckerServer).ResumeBackend(ctx, req.(*PauseResumeRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _HealthChecker_WatchTransitions_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(WatchRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(HealthCheckerServer).WatchTransitions(m, &grpc.GenericServerStream[WatchRequest, TransitionEvent]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type HealthChecker_WatchTransitionsServer = grpc.ServerStreamingServer[TransitionEvent] + +// HealthChecker_ServiceDesc is the grpc.ServiceDesc for HealthChecker service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var HealthChecker_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "healthchecker.HealthChecker", + HandlerType: (*HealthCheckerServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "ListVIPs", + Handler: _HealthChecker_ListVIPs_Handler, + }, + { + MethodName: "GetVIP", + Handler: _HealthChecker_GetVIP_Handler, + }, + { + MethodName: "ListBackends", + Handler: _HealthChecker_ListBackends_Handler, + }, + { + MethodName: "GetBackend", + Handler: _HealthChecker_GetBackend_Handler, + }, + { + MethodName: "PauseBackend", + Handler: _HealthChecker_PauseBackend_Handler, + }, + { + MethodName: "ResumeBackend", + Handler: _HealthChecker_ResumeBackend_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "WatchTransitions", + Handler: _HealthChecker_WatchTransitions_Handler, + ServerStreams: true, + }, + }, + Metadata: "proto/healthchecker.proto", +} diff --git a/internal/grpcapi/server.go b/internal/grpcapi/server.go new file mode 100644 index 0000000..396c00b --- /dev/null +++ b/internal/grpcapi/server.go @@ -0,0 +1,167 @@ +package grpcapi + +import ( + "context" + "fmt" + "net" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "git.ipng.ch/ipng/vpp-maglev/internal/checker" + "git.ipng.ch/ipng/vpp-maglev/internal/config" + "git.ipng.ch/ipng/vpp-maglev/internal/health" +) + +// Server implements the HealthCheckerServer gRPC interface. +type Server struct { + UnimplementedHealthCheckerServer + checker *checker.Checker +} + +// NewServer creates a Server backed by the given Checker. +func NewServer(c *checker.Checker) *Server { + return &Server{checker: c} +} + +// ListVIPs returns the names of all configured VIPs. +func (s *Server) ListVIPs(_ context.Context, _ *ListVIPsRequest) (*ListVIPsResponse, error) { + return &ListVIPsResponse{VipNames: s.checker.ListVIPs()}, nil +} + +// GetVIP returns configuration details for a single VIP. +func (s *Server) GetVIP(_ context.Context, req *GetVIPRequest) (*VIPInfo, error) { + vip, ok := s.checker.GetVIP(req.VipName) + if !ok { + return nil, status.Errorf(codes.NotFound, "vip %q not found", req.VipName) + } + return vipToProto(req.VipName, vip), nil +} + +// ListBackends returns health state for all backends of a VIP. +func (s *Server) ListBackends(_ context.Context, req *ListBackendsRequest) (*ListBackendsResponse, error) { + if _, ok := s.checker.GetVIP(req.VipName); !ok { + return nil, status.Errorf(codes.NotFound, "vip %q not found", req.VipName) + } + backends := s.checker.ListBackends(req.VipName) + resp := &ListBackendsResponse{} + for _, b := range backends { + resp.Backends = append(resp.Backends, backendToProto(b)) + } + return resp, nil +} + +// GetBackend returns health state for a specific VIP:backend tuple. +func (s *Server) GetBackend(_ context.Context, req *GetBackendRequest) (*BackendInfo, error) { + b, ok := s.checker.GetBackend(req.VipName, req.BackendAddress) + if !ok { + return nil, status.Errorf(codes.NotFound, "backend %q in vip %q not found", + req.BackendAddress, req.VipName) + } + return backendToProto(b), nil +} + +// PauseBackend pauses health checking for a specific backend. +func (s *Server) PauseBackend(_ context.Context, req *PauseResumeRequest) (*BackendInfo, error) { + b, ok := s.checker.PauseBackend(req.VipName, req.BackendAddress) + if !ok { + return nil, status.Errorf(codes.NotFound, "backend %q in vip %q not found", + req.BackendAddress, req.VipName) + } + return backendToProto(b), nil +} + +// ResumeBackend resumes health checking for a specific backend. +func (s *Server) ResumeBackend(_ context.Context, req *PauseResumeRequest) (*BackendInfo, error) { + b, ok := s.checker.ResumeBackend(req.VipName, req.BackendAddress) + if !ok { + return nil, status.Errorf(codes.NotFound, "backend %q in vip %q not found", + req.BackendAddress, req.VipName) + } + return backendToProto(b), nil +} + +// WatchTransitions streams the current state of all backends on connect, then +// streams live state transitions until the client disconnects. +func (s *Server) WatchTransitions(_ *WatchRequest, stream HealthChecker_WatchTransitionsServer) error { + // Send current state of all backends as synthetic events. + for _, vipName := range s.checker.ListVIPs() { + for _, b := range s.checker.ListBackends(vipName) { + ev := &TransitionEvent{ + VipName: vipName, + BackendAddress: b.Address.String(), + Transition: &TransitionRecord{ + From: b.State.String(), + To: b.State.String(), + AtUnixNs: 0, + }, + } + if err := stream.Send(ev); err != nil { + return err + } + } + } + + // Subscribe to live transitions. + ch, unsub := s.checker.Subscribe() + defer unsub() + + for { + select { + case <-stream.Context().Done(): + return nil + case e, ok := <-ch: + if !ok { + return nil + } + ev := &TransitionEvent{ + VipName: e.VIPName, + BackendAddress: e.Backend.String(), + Transition: transitionToProto(e.Transition), + } + if err := stream.Send(ev); err != nil { + return err + } + } + } +} + +// ---- conversion helpers ---------------------------------------------------- + +func vipToProto(name string, v config.VIP) *VIPInfo { + info := &VIPInfo{ + Name: name, + Address: v.Address.String(), + Protocol: v.Protocol, + Port: uint32(v.Port), + Description: v.Description, + } + for _, b := range v.Backends { + info.Backends = append(info.Backends, b.String()) + } + return info +} + +func backendToProto(b *health.Backend) *BackendInfo { + info := &BackendInfo{ + VipName: b.VIPName, + Address: b.Address.String(), + State: b.State.String(), + } + for _, t := range b.Transitions { + info.Transitions = append(info.Transitions, transitionToProto(t)) + } + return info +} + +func transitionToProto(t health.Transition) *TransitionRecord { + return &TransitionRecord{ + From: t.From.String(), + To: t.To.String(), + AtUnixNs: t.At.UnixNano(), + } +} + +// Ensure net.IP is imported (used via b.Address.String()). +var _ = net.IP{} +var _ = fmt.Sprintf diff --git a/internal/grpcapi/server_test.go b/internal/grpcapi/server_test.go new file mode 100644 index 0000000..0151ad1 --- /dev/null +++ b/internal/grpcapi/server_test.go @@ -0,0 +1,188 @@ +package grpcapi + +import ( + "context" + "net" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "git.ipng.ch/ipng/vpp-maglev/internal/checker" + "git.ipng.ch/ipng/vpp-maglev/internal/config" + "git.ipng.ch/ipng/vpp-maglev/internal/health" +) + +func makeTestChecker(ctx context.Context) *checker.Checker { + cfg := &config.Frontend{ + HealthCheckNetns: "test", + HealthChecker: config.HealthCheckerConfig{TransitionHistory: 5}, + VIPs: map[string]config.VIP{ + "web": { + Address: net.ParseIP("192.0.2.1"), + Protocol: "tcp", + Port: 80, + Backends: []net.IP{net.ParseIP("10.0.0.2")}, + HealthCheck: config.HealthCheck{ + Type: "icmp", + Interval: time.Hour, // long interval: probes won't fire during tests + Timeout: time.Second, + Fall: 3, + Rise: 2, + }, + }, + }, + } + c := checker.New(cfg) + go c.Run(ctx) //nolint:errcheck + // Allow the Run goroutine to initialize workers. + time.Sleep(10 * time.Millisecond) + return c +} + +func startTestServer(t *testing.T, c *checker.Checker) (HealthCheckerClient, func()) { + t.Helper() + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + srv := grpc.NewServer() + RegisterHealthCheckerServer(srv, NewServer(c)) + go srv.Serve(lis) //nolint:errcheck + + conn, err := grpc.NewClient(lis.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("dial: %v", err) + } + return NewHealthCheckerClient(conn), func() { + conn.Close() + srv.Stop() + } +} + +func TestListVIPs(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := makeTestChecker(ctx) + client, cleanup := startTestServer(t, c) + defer cleanup() + + resp, err := client.ListVIPs(ctx, &ListVIPsRequest{}) + if err != nil { + t.Fatalf("ListVIPs: %v", err) + } + if len(resp.VipNames) != 1 || resp.VipNames[0] != "web" { + t.Errorf("ListVIPs: got %v, want [web]", resp.VipNames) + } +} + +func TestGetVIP(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := makeTestChecker(ctx) + client, cleanup := startTestServer(t, c) + defer cleanup() + + info, err := client.GetVIP(ctx, &GetVIPRequest{VipName: "web"}) + if err != nil { + t.Fatalf("GetVIP: %v", err) + } + if info.Address != "192.0.2.1" { + t.Errorf("GetVIP address: got %q, want 192.0.2.1", info.Address) + } + if info.Port != 80 { + t.Errorf("GetVIP port: got %d, want 80", info.Port) + } +} + +func TestGetVIPNotFound(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := makeTestChecker(ctx) + client, cleanup := startTestServer(t, c) + defer cleanup() + + _, err := client.GetVIP(ctx, &GetVIPRequest{VipName: "nope"}) + if err == nil { + t.Error("expected error for unknown VIP") + } +} + +func TestGetBackend(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := makeTestChecker(ctx) + client, cleanup := startTestServer(t, c) + defer cleanup() + + info, err := client.GetBackend(ctx, &GetBackendRequest{ + VipName: "web", + BackendAddress: "10.0.0.2", + }) + if err != nil { + t.Fatalf("GetBackend: %v", err) + } + if info.State != health.StateUnknown.String() { + t.Errorf("initial state: got %q, want unknown", info.State) + } +} + +func TestPauseResumeBackend(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := makeTestChecker(ctx) + client, cleanup := startTestServer(t, c) + defer cleanup() + + info, err := client.PauseBackend(ctx, &PauseResumeRequest{ + VipName: "web", + BackendAddress: "10.0.0.2", + }) + if err != nil { + t.Fatalf("PauseBackend: %v", err) + } + if info.State != health.StatePaused.String() { + t.Errorf("after pause: got %q, want paused", info.State) + } + + info, err = client.ResumeBackend(ctx, &PauseResumeRequest{ + VipName: "web", + BackendAddress: "10.0.0.2", + }) + if err != nil { + t.Fatalf("ResumeBackend: %v", err) + } + if info.State != health.StateUnknown.String() { + t.Errorf("after resume: got %q, want unknown", info.State) + } +} + +func TestWatchTransitions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := makeTestChecker(ctx) + client, cleanup := startTestServer(t, c) + defer cleanup() + + stream, err := client.WatchTransitions(ctx, &WatchRequest{}) + if err != nil { + t.Fatalf("WatchTransitions: %v", err) + } + + // Should receive the current state for web:10.0.0.2 immediately. + ev, err := stream.Recv() + if err != nil { + t.Fatalf("Recv: %v", err) + } + if ev.VipName != "web" || ev.BackendAddress != "10.0.0.2" { + t.Errorf("initial event: vip=%q backend=%q", ev.VipName, ev.BackendAddress) + } +} diff --git a/internal/health/state.go b/internal/health/state.go new file mode 100644 index 0000000..ecb4e1b --- /dev/null +++ b/internal/health/state.go @@ -0,0 +1,189 @@ +package health + +import ( + "net" + "time" +) + +// CheckLayer indicates at which network layer a probe stopped. +type CheckLayer int + +const ( + LayerUnknown CheckLayer = iota + LayerL4 // TCP connect + LayerL6 // TLS handshake + LayerL7 // Application (HTTP response, ICMP reply) +) + +// ProbeResult is the outcome of a single probe execution. +type ProbeResult struct { + OK bool + Layer CheckLayer + Code string // "L4OK", "L4TOUT", "L4CON", "L7OK", "L7TOUT", "L7RSP", "L7STS" + Detail string // human-readable, e.g. "HTTP 503", "connection refused" +} + +// State represents the health state of a backend. +type State int + +const ( + StateUnknown State = iota // initial state before first probe + StateUp + StateDown + StatePaused +) + +func (s State) String() string { + switch s { + case StateUnknown: + return "unknown" + case StateUp: + return "up" + case StateDown: + return "down" + case StatePaused: + return "paused" + default: + return "unknown" + } +} + +// Transition records a single state change event. +type Transition struct { + From State + To State + At time.Time + Result ProbeResult +} + +// HealthCounter is HAProxy's single-integer rise/fall model. +// +// Health ∈ [0, Rise+Fall-1]. Server is UP when Health >= Rise, DOWN when +// Health < Rise. On success Health increments (ceiling Rise+Fall-1); on +// failure Health decrements (floor 0). This gives hysteresis: a flapping +// backend stays in the degraded range without bouncing between UP and DOWN. +type HealthCounter struct { + Health int + Rise int + Fall int +} + +func (h *HealthCounter) Max() int { return h.Rise + h.Fall - 1 } +func (h *HealthCounter) IsUp() bool { return h.Health >= h.Rise } +func (h *HealthCounter) IsDegraded() bool { return h.Health > 0 && h.Health < h.Max() } + +// RecordPass increments the counter. Returns true if the server just became UP. +func (h *HealthCounter) RecordPass() bool { + wasUp := h.IsUp() + if h.Health < h.Max() { + h.Health++ + } + return !wasUp && h.IsUp() +} + +// RecordFail decrements the counter. Returns true if the server just went DOWN. +func (h *HealthCounter) RecordFail() bool { + wasDown := !h.IsUp() + if h.Health > 0 { + h.Health-- + } + return !wasDown && !h.IsUp() +} + +// Backend tracks the health state of one VIP:backend tuple. +type Backend struct { + VIPName string + Address net.IP + State State + Counter HealthCounter + Transitions []Transition // newest first, capped at maxHistory +} + +// New creates a Backend in StateUnknown. +func New(vipName string, addr net.IP, rise, fall int) *Backend { + return &Backend{ + VIPName: vipName, + Address: addr, + State: StateUnknown, + Counter: HealthCounter{Rise: rise, Fall: fall}, + } +} + +// Record applies a probe result to the health counter and transitions state if +// needed. Returns true if the state changed. +// +// StateUnknown transitions to StateDown on the first failure (any evidence of +// failure means the backend is not yet confirmed reachable), and to StateUp +// once the counter reaches Rise consecutive passes. +func (b *Backend) Record(r ProbeResult, maxHistory int) bool { + if b.State == StatePaused { + return false + } + if r.OK { + if b.Counter.RecordPass() { + b.transition(StateUp, r, maxHistory) + return true + } + } else { + if b.Counter.RecordFail() || b.State == StateUnknown { + b.transition(StateDown, r, maxHistory) + return true + } + } + return false +} + +// Pause transitions the backend to StatePaused. Returns true if the state changed. +func (b *Backend) Pause(maxHistory int) bool { + if b.State == StatePaused { + return false + } + b.transition(StatePaused, ProbeResult{}, maxHistory) + b.Counter.Health = 0 + return true +} + +// Resume transitions a paused backend back to StateUnknown, resetting the +// counter. Returns true if the state changed. +func (b *Backend) Resume(maxHistory int) bool { + if b.State != StatePaused { + return false + } + b.transition(StateUnknown, ProbeResult{}, maxHistory) + b.Counter.Health = 0 + return true +} + +// NextInterval returns the appropriate probe interval based on state and counter: +// - Unknown (no probes yet): interval — probe promptly to establish initial state +// - Fully healthy (counter at max): interval +// - Fully down (counter at 0): downInterval (falls back to interval) +// - Degraded (anywhere in between): fastInterval (falls back to interval) +func (b *Backend) NextInterval(interval, fastInterval, downInterval time.Duration) time.Duration { + if b.State == StateUnknown { + return interval + } + if b.Counter.Health == b.Counter.Max() { + return interval + } + if b.Counter.Health == 0 { + if downInterval > 0 { + return downInterval + } + return interval + } + if fastInterval > 0 { + return fastInterval + } + return interval +} + +// transition appends a new Transition and updates State. +func (b *Backend) transition(to State, r ProbeResult, maxHistory int) { + t := Transition{From: b.State, To: to, At: time.Now(), Result: r} + b.Transitions = append([]Transition{t}, b.Transitions...) + if len(b.Transitions) > maxHistory { + b.Transitions = b.Transitions[:maxHistory] + } + b.State = to +} diff --git a/internal/health/state_test.go b/internal/health/state_test.go new file mode 100644 index 0000000..ecd4560 --- /dev/null +++ b/internal/health/state_test.go @@ -0,0 +1,307 @@ +package health + +import ( + "net" + "testing" + "time" +) + +func newBackend() *Backend { + return New("web4", net.ParseIP("10.0.0.1"), 2, 3) // rise=2, fall=3 +} + +func pass() ProbeResult { return ProbeResult{OK: true, Layer: LayerL7, Code: "L7OK"} } +func fail() ProbeResult { return ProbeResult{OK: false, Layer: LayerL4, Code: "L4CON"} } + +func TestInitialState(t *testing.T) { + b := newBackend() + if b.State != StateUnknown { + t.Errorf("initial state: got %s, want unknown", b.State) + } + if len(b.Transitions) != 0 { + t.Errorf("initial transitions: got %d, want 0", len(b.Transitions)) + } + if b.Counter.Health != 0 { + t.Errorf("initial counter health: got %d, want 0", b.Counter.Health) + } +} + +// TestRiseToUp: rise=2 passes from Down/Unknown → Up. +func TestRiseToUp(t *testing.T) { + tests := []struct { + name string + initialState State + }{ + {"from unknown", StateUnknown}, + {"from down", StateDown}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := newBackend() + b.State = tt.initialState + // First pass: counter=1, still in DOWN range (rise=2), no transition. + if b.Record(pass(), 5) { + t.Error("should not transition after 1 pass (rise=2)") + } + if b.State != tt.initialState { + t.Errorf("state changed early: got %s", b.State) + } + // Second pass: counter=2=rise, transitions to Up. + if !b.Record(pass(), 5) { + t.Error("should transition to Up after 2 passes") + } + if b.State != StateUp { + t.Errorf("state: got %s, want up", b.State) + } + }) + } +} + +// TestFallToDown: fall=3 failures from fully-healthy → Down. +// +// The fall guarantee applies from counter=max (fully healthy). A backend that +// just became Up is at counter=rise (the floor of the UP range), so a single +// failure already drops it back into the DOWN range. This is correct: hysteresis +// protects a server that has been consistently healthy for a long time, not one +// that just scraped past the rise threshold. +func TestFallToDown(t *testing.T) { + b := newBackend() // rise=2, fall=3, max=4 + + // Drive to fully healthy: need rise + (max-rise) = 4 passes total. + for i := 0; i < b.Counter.Max(); i++ { + b.Record(pass(), 5) + } + if b.State != StateUp { + t.Fatalf("precondition: want up, got %s", b.State) + } + if b.Counter.Health != b.Counter.Max() { + t.Fatalf("precondition: want counter=%d, got %d", b.Counter.Max(), b.Counter.Health) + } + + // fall-1=2 failures: counter 4→3→2, both still in UP range (>=rise=2). + if b.Record(fail(), 5) { + t.Error("should not transition after 1 fail from fully healthy") + } + if b.Record(fail(), 5) { + t.Error("should not transition after 2 fails from fully healthy") + } + if b.State != StateUp { + t.Errorf("state after 2 fails: got %s, want up", b.State) + } + + // Third failure: counter 2→1 < rise=2 → Down. + if !b.Record(fail(), 5) { + t.Error("should transition to Down after fall=3 failures from fully healthy") + } + if b.State != StateDown { + t.Errorf("state: got %s, want down", b.State) + } +} + +// TestUnknownToDownOnFirstFail: any failure while Unknown → Down immediately. +func TestUnknownToDownOnFirstFail(t *testing.T) { + b := newBackend() + if !b.Record(fail(), 5) { + t.Error("first fail from Unknown should transition to Down") + } + if b.State != StateDown { + t.Errorf("state: got %s, want down", b.State) + } +} + +// TestHysteresis: alternating pass/fail keeps backend in degraded range without +// bouncing between Up and Down. This is the key HAProxy counter property. +func TestHysteresis(t *testing.T) { + b := newBackend() + // Drive to Up. + b.Record(pass(), 5) + b.Record(pass(), 5) // counter=2, state=Up + + // Alternate pass/fail. Counter oscillates 3↔2 (both in UP range for rise=2), + // or 2↔1 (crossing the boundary). Let's trace: + // Start: counter=2 (just became Up, was at 2=rise after second pass, then RecordPass incremented to 3... wait) + // Actually: after first pass from Unknown (counter=0), counter=1. After second pass, counter=2=rise, RecordPass returns true → Up. + // But RecordPass increments BEFORE checking: wasUp=false, counter becomes 2, IsUp()=true → returns true. + // So after second pass: counter=2, state=Up. + // max = rise+fall-1 = 2+3-1 = 4. + + // fail: counter=1 < rise=2 → RecordFail: wasDown=false (counter was 2=rise, IsUp=true), + // counter becomes 1, IsUp()=false → returns true → Down! + // Hmm, so one fail from counter=2 (barely Up) → Down? That's with rise=2. + + // The hysteresis is more visible with rise=2, fall=5: max=6. + // Let's use a backend with more headroom. + b2 := New("test", net.ParseIP("10.0.0.2"), 2, 5) // rise=2, fall=5, max=6 + // Drive to fully healthy. + b2.Record(pass(), 5) // counter=1 + b2.Record(pass(), 5) // counter=2=rise → Up + b2.Record(pass(), 5) // counter=3 + b2.Record(pass(), 5) // counter=4 + b2.Record(pass(), 5) // counter=5 + b2.Record(pass(), 5) // counter=6=max + + // Now alternate: fail drops from 6, pass brings back up. + // Should not transition since counter stays in UP range (>=2). + for i := 0; i < 4; i++ { + transitioned := b2.Record(fail(), 5) // 6→5→4→3→2 (all >=rise=2) + if transitioned { + t.Errorf("fail %d: should not transition (counter in UP range)", i+1) + } + if !b2.Counter.IsUp() { + t.Errorf("fail %d: should still be up", i+1) + } + if b2.Record(pass(), 5) { // re-increment + t.Errorf("pass %d: should not transition (already Up)", i+1) + } + } + if b2.State != StateUp { + t.Errorf("after alternating: want up, got %s", b2.State) + } +} + +// TestNextInterval: correct interval selection based on counter state. +func TestNextInterval(t *testing.T) { + interval := 2 * time.Second + fast := 500 * time.Millisecond + down := 30 * time.Second + + b := New("test", net.ParseIP("10.0.0.1"), 2, 3) // max=4 + + // Unknown (no probes yet): always use interval, never downInterval. + if got := b.NextInterval(interval, fast, down); got != interval { + t.Errorf("StateUnknown: got %v, want %v (interval)", got, interval) + } + + // After first fail: counter=0, state=Down → downInterval. + b.Record(ProbeResult{OK: false, Code: "L4CON"}, 5) + if b.State != StateDown { + t.Fatalf("expected StateDown after first fail, got %s", b.State) + } + if got := b.NextInterval(interval, fast, down); got != down { + t.Errorf("StateDown/counter=0: got %v, want %v (down)", got, down) + } + + // Drive to max (fully healthy) → interval. + b.Counter.Health = b.Counter.Max() + if got := b.NextInterval(interval, fast, down); got != interval { + t.Errorf("counter=max: got %v, want %v (interval)", got, interval) + } + + // Degraded (0 < counter < max) → fastInterval. + b.Counter.Health = 1 + if got := b.NextInterval(interval, fast, down); got != fast { + t.Errorf("counter=1 (degraded): got %v, want %v (fast)", got, fast) + } + + // No fastInterval configured → falls back to interval. + if got := b.NextInterval(interval, 0, down); got != interval { + t.Errorf("degraded, no fast: got %v, want %v (interval)", got, interval) + } + + // No downInterval configured → falls back to interval. + b.Counter.Health = 0 + if got := b.NextInterval(interval, fast, 0); got != interval { + t.Errorf("down, no downInterval: got %v, want %v (interval)", got, interval) + } +} + +func TestPauseResume(t *testing.T) { + b := newBackend() + b.State = StateUp + + changed := b.Pause(5) + if !changed { + t.Error("Pause should return true") + } + if b.State != StatePaused { + t.Errorf("after Pause: got %s, want paused", b.State) + } + + // Probes ignored while paused. + if b.Record(pass(), 5) { + t.Error("Record(pass) should not transition while paused") + } + if b.Record(fail(), 5) { + t.Error("Record(fail) should not transition while paused") + } + if b.State != StatePaused { + t.Errorf("state changed while paused: %s", b.State) + } + + // Second Pause is a no-op. + if b.Pause(5) { + t.Error("second Pause should return false") + } + + changed = b.Resume(5) + if !changed { + t.Error("Resume should return true") + } + if b.State != StateUnknown { + t.Errorf("after Resume: got %s, want unknown", b.State) + } + + // Resume on non-paused is a no-op. + if b.Resume(5) { + t.Error("Resume on non-paused should return false") + } +} + +func TestTransitionHistory(t *testing.T) { + b := newBackend() + maxHistory := 3 + + // Drive several state changes. Each cycle: pass×2→Up, fail→Down (Unknown→Down on first fail). + b.Record(fail(), maxHistory) // Unknown→Down + b.Record(pass(), maxHistory) // counter++ + b.Record(pass(), maxHistory) // Down→Up + b.Record(fail(), maxHistory) // Up: counter drops + b.Record(fail(), maxHistory) // Up: counter drops + b.Record(fail(), maxHistory) // Up→Down + b.Record(pass(), maxHistory) // counter++ + b.Record(pass(), maxHistory) // Down→Up + + if len(b.Transitions) != maxHistory { + t.Errorf("transitions capped at %d, got %d", maxHistory, len(b.Transitions)) + } + // Newest first: last transition was →Up. + if b.Transitions[0].To != StateUp { + t.Errorf("newest transition: got %s, want up", b.Transitions[0].To) + } + // Transitions carry ProbeResult. + if b.Transitions[0].Result.Code == "" { + t.Error("transition result code should not be empty") + } +} + +func TestTransitionTimestamp(t *testing.T) { + b := newBackend() + before := time.Now() + b.Record(fail(), 5) + after := time.Now() + + if len(b.Transitions) == 0 { + t.Fatal("expected a transition") + } + ts := b.Transitions[0].At + if ts.Before(before) || ts.After(after) { + t.Errorf("transition timestamp %v outside [%v, %v]", ts, before, after) + } +} + +func TestStateString(t *testing.T) { + cases := []struct { + s State + want string + }{ + {StateUnknown, "unknown"}, + {StateUp, "up"}, + {StateDown, "down"}, + {StatePaused, "paused"}, + } + for _, c := range cases { + if c.s.String() != c.want { + t.Errorf("State(%d).String() = %q, want %q", c.s, c.s.String(), c.want) + } + } +} diff --git a/internal/prober/http.go b/internal/prober/http.go new file mode 100644 index 0000000..9c5d715 --- /dev/null +++ b/internal/prober/http.go @@ -0,0 +1,156 @@ +package prober + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net" + "net/http" + "strconv" + "strings" + + "git.ipng.ch/ipng/vpp-maglev/internal/health" +) + +// HTTPProbe sends a plain HTTP GET to cfg.Target inside the healthcheck netns. +func HTTPProbe(ctx context.Context, cfg ProbeConfig) health.ProbeResult { + return doHTTPProbe(ctx, cfg, false) +} + +// HTTPSProbe sends an HTTP GET over TLS to cfg.Target inside the healthcheck netns. +func HTTPSProbe(ctx context.Context, cfg ProbeConfig) health.ProbeResult { + return doHTTPProbe(ctx, cfg, true) +} + +func doHTTPProbe(ctx context.Context, cfg ProbeConfig, useTLS bool) health.ProbeResult { + if cfg.HTTP == nil { + return health.ProbeResult{OK: false, Layer: health.LayerUnknown, Code: "UNKNOWN", Detail: "missing HTTP params"} + } + p := cfg.HTTP + + port := cfg.Port + if port == 0 { + if useTLS { + port = 443 + } else { + port = 80 + } + } + + scheme := "http" + if useTLS { + scheme = "https" + } + target := fmt.Sprintf("%s://%s%s", scheme, net.JoinHostPort(cfg.Target.String(), strconv.Itoa(int(port))), p.Path) + + hostHeader := p.Host + if hostHeader == "" { + hostHeader = cfg.Target.String() + } + + // Dial (and optionally handshake) inside the healthcheck netns. + // The socket retains its netns after creation, so HTTP can be done outside. + var conn net.Conn + dialErr := inNetns(cfg.HealthCheckNetns, func() error { + dialer := &net.Dialer{Timeout: cfg.Timeout} + if cfg.ProbeSrc != nil { + dialer.LocalAddr = &net.TCPAddr{IP: cfg.ProbeSrc} + } + c, err := dialer.DialContext(ctx, "tcp", net.JoinHostPort(cfg.Target.String(), strconv.Itoa(int(port)))) + if err != nil { + return err + } + if useTLS { + tlsConn := tls.Client(c, tlsConfig(p.ServerName, p.InsecureSkipVerify)) + if err := tlsConn.HandshakeContext(ctx); err != nil { + c.Close() + return err + } + conn = tlsConn + } else { + conn = c + } + return nil + }) + if dialErr != nil { + if isTimeout(dialErr) { + return health.ProbeResult{OK: false, Layer: health.LayerL4, Code: "L4TOUT", Detail: dialErr.Error()} + } + // Distinguish TLS handshake failures (L6) from TCP connect failures (L4). + // conn is non-nil only when TCP succeeded but TLS handshake failed. + if useTLS && conn == nil && isTLSError(dialErr) { + if isTimeout(dialErr) { + return health.ProbeResult{OK: false, Layer: health.LayerL6, Code: "L6TOUT", Detail: dialErr.Error()} + } + return health.ProbeResult{OK: false, Layer: health.LayerL6, Code: "L6RSP", Detail: dialErr.Error()} + } + return health.ProbeResult{OK: false, Layer: health.LayerL4, Code: "L4CON", Detail: dialErr.Error()} + } + defer conn.Close() + + transport := &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return conn, nil + }, + DisableKeepAlives: true, + } + client := &http.Client{ + Transport: transport, + Timeout: cfg.Timeout, + CheckRedirect: func(_ *http.Request, _ []*http.Request) error { + return http.ErrUseLastResponse // never follow redirects + }, + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil) + if err != nil { + return health.ProbeResult{OK: false, Layer: health.LayerL7, Code: "L7RSP", Detail: err.Error()} + } + req.Host = hostHeader + req.Header.Set("User-Agent", "maglev-healthchecker/1.0") + + resp, err := client.Do(req) + if err != nil { + if isTimeout(err) { + return health.ProbeResult{OK: false, Layer: health.LayerL7, Code: "L7TOUT", Detail: err.Error()} + } + return health.ProbeResult{OK: false, Layer: health.LayerL7, Code: "L7RSP", Detail: err.Error()} + } + defer resp.Body.Close() + + if resp.StatusCode < p.ResponseCodeMin || resp.StatusCode > p.ResponseCodeMax { + return health.ProbeResult{ + OK: false, + Layer: health.LayerL7, + Code: "L7STS", + Detail: fmt.Sprintf("HTTP %d (want %d-%d)", resp.StatusCode, p.ResponseCodeMin, p.ResponseCodeMax), + } + } + + if p.ResponseRegexp != nil { + body, err := io.ReadAll(resp.Body) + if err != nil { + return health.ProbeResult{OK: false, Layer: health.LayerL7, Code: "L7TOUT", Detail: err.Error()} + } + if !p.ResponseRegexp.Match(body) { + return health.ProbeResult{ + OK: false, + Layer: health.LayerL7, + Code: "L7RSP", + Detail: fmt.Sprintf("body did not match regexp %q", p.ResponseRegexp), + } + } + } + + return health.ProbeResult{OK: true, Layer: health.LayerL7, Code: "L7OK"} +} + +// isTLSError returns true if err originated from the TLS layer. +func isTLSError(err error) bool { + if err == nil { + return false + } + _, ok := err.(tls.AlertError) + return ok || strings.Contains(err.Error(), "tls:") +} diff --git a/internal/prober/http_test.go b/internal/prober/http_test.go new file mode 100644 index 0000000..afcdc5a --- /dev/null +++ b/internal/prober/http_test.go @@ -0,0 +1,193 @@ +package prober + +import ( + "context" + "fmt" + "io" + "net" + "net/http" + "net/http/httptest" + "regexp" + "testing" + "time" + + "git.ipng.ch/ipng/vpp-maglev/internal/config" +) + +// dialAndProbe dials addr directly (bypassing netns/interface binding) and +// exercises the HTTP probe response-checking logic. +func dialAndProbe(ctx context.Context, addr string, cfg ProbeConfig) (bool, error) { + if cfg.HTTP == nil { + return false, fmt.Errorf("dialAndProbe requires HTTP params") + } + p := cfg.HTTP + + path := p.Path + if path == "" { + path = "/" + } + + conn, err := net.DialTimeout("tcp", addr, cfg.Timeout) + if err != nil { + return false, err + } + + transport := &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return conn, nil + }, + DisableKeepAlives: true, + } + client := &http.Client{ + Transport: transport, + Timeout: cfg.Timeout, + CheckRedirect: func(_ *http.Request, _ []*http.Request) error { + return http.ErrUseLastResponse + }, + } + + target := "http://" + addr + path + req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil) + if err != nil { + return false, err + } + if p.Host != "" { + req.Host = p.Host + } + + resp, err := client.Do(req) + if err != nil { + return false, err + } + defer resp.Body.Close() + + if resp.StatusCode < p.ResponseCodeMin || resp.StatusCode > p.ResponseCodeMax { + return false, nil + } + if p.ResponseRegexp != nil { + body, _ := io.ReadAll(resp.Body) + if !p.ResponseRegexp.Match(body) { + return false, nil + } + } + return true, nil +} + +func TestHTTPProbeStatusCode(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, "healthy") + })) + defer srv.Close() + + cfg := ProbeConfig{ + Timeout: 2 * time.Second, + HTTP: &config.HTTPParams{ + Path: "/healthz", + ResponseCodeMin: 200, + ResponseCodeMax: 200, + }, + } + ok, err := dialAndProbe(context.Background(), srv.Listener.Addr().String(), cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ok { + t.Error("expected probe success") + } +} + +func TestHTTPProbeWrongStatusCode(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + })) + defer srv.Close() + + cfg := ProbeConfig{ + Timeout: 2 * time.Second, + HTTP: &config.HTTPParams{ + Path: "/", + ResponseCodeMin: 200, + ResponseCodeMax: 200, + }, + } + ok, err := dialAndProbe(context.Background(), srv.Listener.Addr().String(), cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if ok { + t.Error("expected probe failure on wrong status code") + } +} + +func TestHTTPProbeRegexpMatch(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + fmt.Fprint(w, `{"status":"ok"}`) + })) + defer srv.Close() + + cfg := ProbeConfig{ + Timeout: 2 * time.Second, + HTTP: &config.HTTPParams{ + Path: "/", + ResponseCodeMin: 200, + ResponseCodeMax: 200, + ResponseRegexp: regexp.MustCompile(`"status":"ok"`), + }, + } + ok, err := dialAndProbe(context.Background(), srv.Listener.Addr().String(), cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ok { + t.Error("expected probe success with matching regexp") + } +} + +func TestHTTPProbeRegexpNoMatch(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + fmt.Fprint(w, `{"status":"degraded"}`) + })) + defer srv.Close() + + cfg := ProbeConfig{ + Timeout: 2 * time.Second, + HTTP: &config.HTTPParams{ + Path: "/", + ResponseCodeMin: 200, + ResponseCodeMax: 200, + ResponseRegexp: regexp.MustCompile(`"status":"ok"`), + }, + } + ok, err := dialAndProbe(context.Background(), srv.Listener.Addr().String(), cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if ok { + t.Error("expected probe failure when regexp does not match") + } +} + +func TestHTTPProbeNoRedirect(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "/other", http.StatusFound) + })) + defer srv.Close() + + // Probe expects 302 — redirect is not followed. + cfg := ProbeConfig{ + Timeout: 2 * time.Second, + HTTP: &config.HTTPParams{ + Path: "/", + ResponseCodeMin: 302, + ResponseCodeMax: 302, + }, + } + ok, err := dialAndProbe(context.Background(), srv.Listener.Addr().String(), cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ok { + t.Error("expected success when probe expects 302 and server returns 302") + } +} diff --git a/internal/prober/icmp.go b/internal/prober/icmp.go new file mode 100644 index 0000000..cf8efa9 --- /dev/null +++ b/internal/prober/icmp.go @@ -0,0 +1,118 @@ +package prober + +import ( + "context" + "fmt" + "math/rand/v2" + "net" + "time" + + "golang.org/x/net/icmp" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" + + "git.ipng.ch/ipng/vpp-maglev/internal/health" +) + +// ICMPProbe sends an ICMP echo request to cfg.Target inside the healthcheck +// netns and waits for a matching reply. +func ICMPProbe(ctx context.Context, cfg ProbeConfig) health.ProbeResult { + isV6 := cfg.Target.To4() == nil + + var ok bool + err := inNetns(cfg.HealthCheckNetns, func() error { + var network string + var proto int + var msgType icmp.Type + if isV6 { + network = "ip6:ipv6-icmp" + proto = 58 + msgType = ipv6.ICMPTypeEchoRequest + } else { + network = "ip4:icmp" + proto = 1 + msgType = ipv4.ICMPTypeEcho + } + + listenAddr := "" + if cfg.ProbeSrc != nil { + listenAddr = cfg.ProbeSrc.String() + } + pc, err := net.ListenPacket(network, listenAddr) + if err != nil { + return fmt.Errorf("listen icmp (%s): %w", network, err) + } + defer pc.Close() + + id := rand.IntN(0xffff) + 1 + seq := rand.IntN(0xffff) + 1 + + msg := icmp.Message{ + Type: msgType, + Code: 0, + Body: &icmp.Echo{ + ID: id, + Seq: seq, + Data: []byte("maglev-hc"), + }, + } + b, err := msg.Marshal(nil) + if err != nil { + return fmt.Errorf("marshal icmp: %w", err) + } + + dst := &net.IPAddr{IP: cfg.Target} + if _, err := pc.WriteTo(b, dst); err != nil { + return fmt.Errorf("write icmp: %w", err) + } + + deadline := time.Now().Add(cfg.Timeout) + pc.SetDeadline(deadline) //nolint:errcheck + + buf := make([]byte, 1500) + for time.Now().Before(deadline) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + n, _, err := pc.ReadFrom(buf) + if err != nil { + if isTimeout(err) { + return nil + } + return fmt.Errorf("read icmp: %w", err) + } + + reply, err := icmp.ParseMessage(proto, buf[:n]) + if err != nil { + continue + } + echo, ok2 := reply.Body.(*icmp.Echo) + if !ok2 { + continue + } + if echo.ID == id && echo.Seq == seq { + ok = true + return nil + } + } + return nil + }) + + if err != nil { + return health.ProbeResult{OK: false, Layer: health.LayerL7, Code: "L7TOUT", Detail: err.Error()} + } + if ok { + return health.ProbeResult{OK: true, Layer: health.LayerL7, Code: "L7OK"} + } + return health.ProbeResult{OK: false, Layer: health.LayerL7, Code: "L7TOUT", Detail: "no reply received"} +} + +func isTimeout(err error) bool { + if netErr, ok := err.(net.Error); ok { + return netErr.Timeout() + } + return false +} diff --git a/internal/prober/netns.go b/internal/prober/netns.go new file mode 100644 index 0000000..7e820cc --- /dev/null +++ b/internal/prober/netns.go @@ -0,0 +1,40 @@ +package prober + +import ( + "fmt" + "runtime" + + "github.com/vishvananda/netns" +) + +// inNetns runs fn while the current OS thread is switched into the named +// network namespace. The thread is locked for the duration so the switch is safe. +// The original netns is restored before returning. +// If nsName is empty, fn is run in the current namespace without any switching. +func inNetns(nsName string, fn func() error) error { + if nsName == "" { + return fn() + } + + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + origNs, err := netns.Get() + if err != nil { + return fmt.Errorf("get current netns: %w", err) + } + defer origNs.Close() + defer netns.Set(origNs) //nolint:errcheck + + targetNs, err := netns.GetFromName(nsName) + if err != nil { + return fmt.Errorf("get netns %q: %w", nsName, err) + } + defer targetNs.Close() + + if err := netns.Set(targetNs); err != nil { + return fmt.Errorf("enter netns %q: %w", nsName, err) + } + + return fn() +} diff --git a/internal/prober/prober.go b/internal/prober/prober.go new file mode 100644 index 0000000..7778d1b --- /dev/null +++ b/internal/prober/prober.go @@ -0,0 +1,49 @@ +package prober + +import ( + "context" + "fmt" + "net" + "time" + + "git.ipng.ch/ipng/vpp-maglev/internal/config" + "git.ipng.ch/ipng/vpp-maglev/internal/health" +) + +// ProbeFunc is the signature for a health check probe. +type ProbeFunc func(ctx context.Context, cfg ProbeConfig) health.ProbeResult + +// ProbeConfig holds all parameters needed to execute a single probe. +type ProbeConfig struct { + Target net.IP // backend address to probe + Port uint16 // destination port (used by TCP and HTTP probers) + ProbeSrc net.IP // source address to bind; nil lets the OS choose + HealthCheckNetns string // network namespace name; sockets are created inside it + Timeout time.Duration + HTTP *config.HTTPParams // non-nil for type http/https + TCP *config.TCPParams // non-nil for type tcp +} + +// ForType returns the ProbeFunc registered for the given healthcheck type. +// Returns a failing stub for unknown types. +func ForType(t string) ProbeFunc { + switch t { + case "icmp": + return ICMPProbe + case "tcp": + return TCPProbe + case "http": + return HTTPProbe + case "https": + return HTTPSProbe + default: + return func(_ context.Context, _ ProbeConfig) health.ProbeResult { + return health.ProbeResult{ + OK: false, + Layer: health.LayerUnknown, + Code: "UNKNOWN", + Detail: fmt.Sprintf("unknown probe type %q", t), + } + } + } +} diff --git a/internal/prober/tcp.go b/internal/prober/tcp.go new file mode 100644 index 0000000..da8c68c --- /dev/null +++ b/internal/prober/tcp.go @@ -0,0 +1,76 @@ +package prober + +import ( + "context" + "crypto/tls" + "net" + "strconv" + "time" + + "git.ipng.ch/ipng/vpp-maglev/internal/health" +) + +// TCPProbe performs a TCP connect to cfg.Target:cfg.Port inside the healthcheck +// netns. If cfg.TCP.SSL is true a TLS handshake is performed after connect, +// making this an L6 check (useful for smtps, imaps, etc.). +// Plain connect returns L4OK/L4TOUT/L4CON. +// TLS handshake returns L6OK/L6TOUT/L6RSP (on top of an L4OK connect). +func TCPProbe(ctx context.Context, cfg ProbeConfig) health.ProbeResult { + port := cfg.Port + if port == 0 { + port = 80 + } + addr := net.JoinHostPort(cfg.Target.String(), strconv.Itoa(int(port))) + + doTLS := cfg.TCP != nil && cfg.TCP.SSL + var serverName string + var insecureSkipVerify bool + if cfg.TCP != nil { + serverName = cfg.TCP.ServerName + insecureSkipVerify = cfg.TCP.InsecureSkipVerify + } + + var result health.ProbeResult + err := inNetns(cfg.HealthCheckNetns, func() error { + dialer := &net.Dialer{Timeout: cfg.Timeout} + if cfg.ProbeSrc != nil { + dialer.LocalAddr = &net.TCPAddr{IP: cfg.ProbeSrc} + } + conn, err := dialer.DialContext(ctx, "tcp", addr) + if err != nil { + if isTimeout(err) { + result = health.ProbeResult{OK: false, Layer: health.LayerL4, Code: "L4TOUT", Detail: err.Error()} + } else { + result = health.ProbeResult{OK: false, Layer: health.LayerL4, Code: "L4CON", Detail: err.Error()} + } + return nil + } + + if !doTLS { + conn.Close() + result = health.ProbeResult{OK: true, Layer: health.LayerL4, Code: "L4OK"} + return nil + } + + // TLS handshake. + tlsConn := tls.Client(conn, tlsConfig(serverName, insecureSkipVerify)) + tlsConn.SetDeadline(time.Now().Add(cfg.Timeout)) //nolint:errcheck + if err := tlsConn.HandshakeContext(ctx); err != nil { + tlsConn.Close() + if isTimeout(err) { + result = health.ProbeResult{OK: false, Layer: health.LayerL6, Code: "L6TOUT", Detail: err.Error()} + } else { + result = health.ProbeResult{OK: false, Layer: health.LayerL6, Code: "L6RSP", Detail: err.Error()} + } + return nil + } + tlsConn.Close() + result = health.ProbeResult{OK: true, Layer: health.LayerL6, Code: "L6OK"} + return nil + }) + + if err != nil { + return health.ProbeResult{OK: false, Layer: health.LayerL4, Code: "L4CON", Detail: err.Error()} + } + return result +} diff --git a/internal/prober/tls.go b/internal/prober/tls.go new file mode 100644 index 0000000..2344d89 --- /dev/null +++ b/internal/prober/tls.go @@ -0,0 +1,13 @@ +package prober + +import ( + "crypto/tls" +) + +// tlsConfig builds a *tls.Config from explicit TLS parameters. +func tlsConfig(serverName string, insecureSkipVerify bool) *tls.Config { + return &tls.Config{ + ServerName: serverName, + InsecureSkipVerify: insecureSkipVerify, //nolint:gosec + } +} diff --git a/proto/healthchecker.proto b/proto/healthchecker.proto new file mode 100644 index 0000000..74ad24c --- /dev/null +++ b/proto/healthchecker.proto @@ -0,0 +1,78 @@ +syntax = "proto3"; + +package healthchecker; + +option go_package = "git.ipng.ch/ipng/vpp-maglev/internal/grpcapi"; + +// HealthChecker exposes the state of backend health for all VIPs. +service HealthChecker { + rpc ListVIPs(ListVIPsRequest) returns (ListVIPsResponse); + rpc GetVIP(GetVIPRequest) returns (VIPInfo); + rpc ListBackends(ListBackendsRequest) returns (ListBackendsResponse); + rpc GetBackend(GetBackendRequest) returns (BackendInfo); + rpc PauseBackend(PauseResumeRequest) returns (BackendInfo); + rpc ResumeBackend(PauseResumeRequest) returns (BackendInfo); + rpc WatchTransitions(WatchRequest) returns (stream TransitionEvent); +} + +// ---- requests --------------------------------------------------------------- + +message ListVIPsRequest {} + +message GetVIPRequest { + string vip_name = 1; +} + +message ListBackendsRequest { + string vip_name = 1; +} + +message GetBackendRequest { + string vip_name = 1; + string backend_address = 2; +} + +message PauseResumeRequest { + string vip_name = 1; + string backend_address = 2; +} + +message WatchRequest {} + +// ---- responses -------------------------------------------------------------- + +message ListVIPsResponse { + repeated string vip_names = 1; +} + +message VIPInfo { + string name = 1; + string address = 2; + string protocol = 3; + uint32 port = 4; + repeated string backends = 5; + string description = 6; +} + +message ListBackendsResponse { + repeated BackendInfo backends = 1; +} + +message BackendInfo { + string vip_name = 1; + string address = 2; + string state = 3; + repeated TransitionRecord transitions = 4; +} + +message TransitionRecord { + string from = 1; + string to = 2; + int64 at_unix_ns = 3; +} + +message TransitionEvent { + string vip_name = 1; + string backend_address = 2; + TransitionRecord transition = 3; +}