Files
vpp-maglev/internal/checker/checker.go
Pim van Pelt 8bde00eb61 Fix pause to cancel probe goroutine; add Robot Framework integration tests
Pause semantics
- PauseBackend now cancels the probe goroutine so no HTTP/TCP/ICMP
  traffic is sent while the backend is paused. Previously the goroutine
  kept running and results were silently discarded.
- ResumeBackend launches a fresh probe goroutine on the existing worker,
  preserving transition history. The backend re-enters unknown state.

Integration tests (tests/01-maglevd/)
- Containerlab topology with 3 nginx:alpine backends on a dedicated
  management network (172.20.30.0/24) with static IPs.
- maglevd config with 200ms HTTP health-check interval for fast test
  convergence (rise=2, fall=2).
- 8 test cases: deploy lab, start maglevd, all backends reach up,
  nginx logs confirm probes arriving, pause stops probes (probe count
  stable), resume restarts probes, disable stops probes, enable
  restarts probes.

VPP dataplane test (tests/02-vpp-lb/)
- Rewrite 01-e2e-lab.robot to match the actual single-VPP topology:
  test client-to-server ping through VPP bridge domains and verify
  nginx is serving on all app servers. The previous version referenced
  a non-existent topology file and tested OSPF/BFD between two VPP
  nodes that don't exist in this lab.

Build infrastructure
- Add 'make robot-test' target with TEST= for suite selection.
- Add tests/.venv target for Robot Framework virtualenv.
- Make IMAGE optional in rf-run.sh.
- Add .gitignore entries for test output, venv, logs, and clab state.
2026-04-11 20:19:36 +02:00

629 lines
18 KiB
Go

// Copyright (c) 2026, Pim van Pelt <pim@ipng.ch>
package checker
import (
"context"
"fmt"
"log/slog"
"net"
"sort"
"sync"
"time"
"git.ipng.ch/ipng/vpp-maglev/internal/config"
"git.ipng.ch/ipng/vpp-maglev/internal/health"
"git.ipng.ch/ipng/vpp-maglev/internal/prober"
)
// BackendSnapshot combines the live health state with the config entry for a backend.
type BackendSnapshot struct {
Health *health.Backend
Config config.Backend
}
// Event is emitted on every backend state transition, once per frontend that
// references the backend.
type Event struct {
FrontendName string
BackendName string
Backend net.IP
Transition health.Transition
}
type worker struct {
backend *health.Backend
hc config.HealthCheck
entry config.Backend
cancel context.CancelFunc
wakeCh chan struct{} // closed/signalled to interrupt probe sleep on resume
}
// Checker orchestrates health probing for all backends.
// Each backend is probed exactly once, regardless of how many frontends
// reference it.
type Checker struct {
runCtx context.Context // set in Run; used by EnableBackend to start new goroutines
cfg *config.Config
mu sync.RWMutex
workers map[string]*worker // keyed by backend name
subsMu sync.Mutex
nextID int
subs map[int]chan Event
eventCh chan Event
}
// New creates a Checker. Call Run to start probing.
func New(cfg *config.Config) *Checker {
return &Checker{
cfg: cfg,
workers: make(map[string]*worker),
subs: make(map[int]chan Event),
eventCh: make(chan Event, 256),
}
}
// Run starts all probe goroutines and blocks until ctx is cancelled.
func (c *Checker) Run(ctx context.Context) error {
go c.fanOut(ctx)
c.mu.Lock()
c.runCtx = ctx // safe: held under mu before any EnableBackend call can read it
names := activeBackendNames(c.cfg)
maxHistory := c.cfg.HealthChecker.TransitionHistory
for i, name := range names {
b := c.cfg.Backends[name]
hc := c.cfg.HealthChecks[b.HealthCheck]
c.startWorker(ctx, name, b, hc, i, len(names), maxHistory)
}
c.mu.Unlock()
<-ctx.Done()
return nil
}
// Reload applies a new config without restarting the process.
// New backends are added, removed backends are stopped, changed backends are
// restarted. Backends whose healthcheck config is unchanged continue
// uninterrupted, even if the set of frontends referencing them changes.
func (c *Checker) Reload(ctx context.Context, cfg *config.Config) error {
c.mu.Lock()
defer c.mu.Unlock()
maxHistory := cfg.HealthChecker.TransitionHistory
desired := map[string]struct{}{}
for _, name := range activeBackendNames(cfg) {
desired[name] = struct{}{}
}
// Stop workers no longer needed; emit a removed event using the old frontends.
for name, w := range c.workers {
if _, ok := desired[name]; !ok {
slog.Info("backend-stop", "backend", name)
t := w.backend.Remove(maxHistory)
c.emitForBackend(name, w.backend.Address, t, c.cfg.Frontends)
w.cancel()
delete(c.workers, name)
}
}
// Add new or restart changed workers; emit an unknown event using the new frontends.
names := activeBackendNames(cfg)
for i, name := range names {
b := cfg.Backends[name]
hc := cfg.HealthChecks[b.HealthCheck]
if w, ok := c.workers[name]; ok {
if healthCheckEqual(w.hc, hc) {
// Update entry metadata (weight, etc.) in place without restart.
w.entry = b
continue
}
slog.Info("backend-restart", "backend", name)
w.cancel()
c.startWorker(ctx, name, b, hc, i, len(names), maxHistory)
} else {
slog.Info("backend-start", "backend", name)
c.startWorker(ctx, name, b, hc, i, len(names), maxHistory)
}
c.emitForBackend(name, c.workers[name].backend.Address, c.workers[name].backend.Transitions[0], cfg.Frontends)
}
c.cfg = cfg
return nil
}
// Subscribe returns a channel that receives Events for every state transition.
// Call the returned cancel function to unsubscribe.
func (c *Checker) Subscribe() (<-chan Event, func()) {
c.subsMu.Lock()
defer c.subsMu.Unlock()
id := c.nextID
c.nextID++
ch := make(chan Event, 64)
c.subs[id] = ch
return ch, func() {
c.subsMu.Lock()
defer c.subsMu.Unlock()
delete(c.subs, id)
close(ch)
}
}
// ListFrontends returns the names of all configured frontends.
func (c *Checker) ListFrontends() []string {
c.mu.RLock()
defer c.mu.RUnlock()
names := make([]string, 0, len(c.cfg.Frontends))
for name := range c.cfg.Frontends {
names = append(names, name)
}
return names
}
// GetFrontend returns the frontend config for the given name.
func (c *Checker) GetFrontend(name string) (config.Frontend, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
v, ok := c.cfg.Frontends[name]
return v, ok
}
// SetFrontendPoolBackendWeight updates the weight of a backend within a named
// pool of a frontend. Returns the updated FrontendInfo and a descriptive error
// if the frontend, pool, or backend is not found or the weight is out of range.
func (c *Checker) SetFrontendPoolBackendWeight(frontendName, poolName, backendName string, weight int) (config.Frontend, error) {
if weight < 0 || weight > 100 {
return config.Frontend{}, fmt.Errorf("weight %d out of range [0, 100]", weight)
}
c.mu.Lock()
defer c.mu.Unlock()
fe, ok := c.cfg.Frontends[frontendName]
if !ok {
return config.Frontend{}, fmt.Errorf("frontend %q not found", frontendName)
}
for i, pool := range fe.Pools {
if pool.Name != poolName {
continue
}
pb, ok := pool.Backends[backendName]
if !ok {
return config.Frontend{}, fmt.Errorf("backend %q not found in pool %q", backendName, poolName)
}
pb.Weight = weight
fe.Pools[i].Backends[backendName] = pb
c.cfg.Frontends[frontendName] = fe
slog.Info("frontend-pool-weight", "frontend", frontendName, "pool", poolName, "backend", backendName, "weight", weight)
return fe, nil
}
return config.Frontend{}, fmt.Errorf("pool %q not found in frontend %q", poolName, frontendName)
}
// ListHealthChecks returns the names of all configured health checks, sorted.
func (c *Checker) ListHealthChecks() []string {
c.mu.RLock()
defer c.mu.RUnlock()
names := make([]string, 0, len(c.cfg.HealthChecks))
for name := range c.cfg.HealthChecks {
names = append(names, name)
}
sort.Strings(names)
return names
}
// GetHealthCheck returns the config for a health check by name.
func (c *Checker) GetHealthCheck(name string) (config.HealthCheck, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
hc, ok := c.cfg.HealthChecks[name]
return hc, ok
}
// ListBackends returns the names of all active backends.
func (c *Checker) ListBackends() []string {
c.mu.RLock()
defer c.mu.RUnlock()
names := make([]string, 0, len(c.workers))
for name := range c.workers {
names = append(names, name)
}
sort.Strings(names)
return names
}
// ListFrontendBackends returns the backend health states for all backends of a frontend.
func (c *Checker) ListFrontendBackends(frontendName string) []*health.Backend {
c.mu.RLock()
defer c.mu.RUnlock()
fe, ok := c.cfg.Frontends[frontendName]
if !ok {
return nil
}
var out []*health.Backend
seen := map[string]struct{}{}
for _, pool := range fe.Pools {
for name := range pool.Backends {
if _, already := seen[name]; already {
continue
}
seen[name] = struct{}{}
if w, ok := c.workers[name]; ok {
out = append(out, w.backend)
}
}
}
return out
}
// GetBackend returns a snapshot of the health state and config for a backend by name.
func (c *Checker) GetBackend(name string) (BackendSnapshot, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
w, ok := c.workers[name]
if !ok {
return BackendSnapshot{}, false
}
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
}
// PauseBackend pauses health checking for a backend by name. The probe
// goroutine is cancelled so no further traffic is sent to the backend. The
// backend's state is set to paused and remains frozen until ResumeBackend is
// called (which starts a fresh probe goroutine).
func (c *Checker) PauseBackend(name string) (BackendSnapshot, bool) {
c.mu.Lock()
defer c.mu.Unlock()
w, ok := c.workers[name]
if !ok {
return BackendSnapshot{}, false
}
maxHistory := c.cfg.HealthChecker.TransitionHistory
if w.backend.Pause(maxHistory) {
t := w.backend.Transitions[0]
slog.Info("backend-transition", "backend", name,
"from", t.From.String(),
"to", t.To.String(),
)
c.emitForBackend(name, w.backend.Address, t, c.cfg.Frontends)
}
w.cancel()
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
}
// ResumeBackend resumes health checking for a backend by name. A fresh probe
// goroutine is started and the backend re-enters StateUnknown. The existing
// transition history is preserved.
func (c *Checker) ResumeBackend(name string) (BackendSnapshot, bool) {
c.mu.Lock()
defer c.mu.Unlock()
w, ok := c.workers[name]
if !ok {
return BackendSnapshot{}, false
}
maxHistory := c.cfg.HealthChecker.TransitionHistory
if w.backend.Resume(maxHistory) {
t := w.backend.Transitions[0]
slog.Info("backend-transition", "backend", name,
"from", t.From.String(),
"to", t.To.String(),
)
c.emitForBackend(name, w.backend.Address, t, c.cfg.Frontends)
}
// Launch a fresh probe goroutine with a new cancellable context,
// keeping the existing worker and its transition history.
wCtx, cancel := context.WithCancel(c.runCtx)
w.cancel = cancel
w.wakeCh = make(chan struct{}, 1)
go c.runProbe(wCtx, name, 0, 1)
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
}
// DisableBackend stops health checking for a backend and removes it from active
// rotation. The worker entry is kept in the map so the backend remains visible
// via GetBackend and can be re-enabled with EnableBackend.
func (c *Checker) DisableBackend(name string) (BackendSnapshot, bool) {
c.mu.Lock()
defer c.mu.Unlock()
w, ok := c.workers[name]
if !ok {
return BackendSnapshot{}, false
}
if !w.entry.Enabled {
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
}
maxHistory := c.cfg.HealthChecker.TransitionHistory
t := w.backend.Remove(maxHistory)
slog.Info("backend-disable", "backend", name)
c.emitForBackend(name, w.backend.Address, t, c.cfg.Frontends)
w.cancel()
w.entry.Enabled = false
if b, ok := c.cfg.Backends[name]; ok {
b.Enabled = false
c.cfg.Backends[name] = b
}
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
}
// EnableBackend re-enables a previously disabled backend. A fresh probe
// goroutine is started and the backend re-enters StateUnknown.
func (c *Checker) EnableBackend(name string) (BackendSnapshot, bool) {
c.mu.Lock()
defer c.mu.Unlock()
w, ok := c.workers[name]
if !ok {
return BackendSnapshot{}, false
}
if w.entry.Enabled {
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
}
entry := w.entry
entry.Enabled = true
if b, ok := c.cfg.Backends[name]; ok {
b.Enabled = true
c.cfg.Backends[name] = b
}
maxHistory := c.cfg.HealthChecker.TransitionHistory
hc := c.cfg.HealthChecks[entry.HealthCheck]
slog.Info("backend-enable", "backend", name)
c.startWorker(c.runCtx, name, entry, hc, 0, 1, maxHistory)
nw := c.workers[name]
c.emitForBackend(name, nw.backend.Address, nw.backend.Transitions[0], c.cfg.Frontends)
return BackendSnapshot{Health: nw.backend, Config: nw.entry}, true
}
// ---- internal --------------------------------------------------------------
// startWorker creates a Backend and launches a probe goroutine.
// Must be called with c.mu held.
func (c *Checker) startWorker(ctx context.Context, name string, entry config.Backend, hc config.HealthCheck, pos, total, maxHistory int) {
rise, fall := hc.Rise, hc.Fall
if entry.HealthCheck == "" {
// No healthcheck: one synthetic pass drives the backend to Up immediately.
rise, fall = 1, 1
}
wCtx, cancel := context.WithCancel(ctx)
w := &worker{
backend: health.New(name, entry.Address, rise, fall),
hc: hc,
entry: entry,
cancel: cancel,
wakeCh: make(chan struct{}, 1),
}
w.backend.Start(maxHistory)
c.workers[name] = w
go c.runProbe(wCtx, name, pos, total)
}
// runProbe is the per-backend probe loop.
func (c *Checker) runProbe(ctx context.Context, name string, pos, total int) {
c.mu.RLock()
w, ok := c.workers[name]
if !ok {
c.mu.RUnlock()
return
}
initialDelay := staggerDelay(w.hc.Interval, pos, total)
c.mu.RUnlock()
if initialDelay > 0 {
select {
case <-ctx.Done():
return
case <-time.After(initialDelay):
}
}
for {
c.mu.RLock()
w, ok := c.workers[name]
if !ok {
c.mu.RUnlock()
return
}
hc := w.hc
entry := w.entry
maxHistory := c.cfg.HealthChecker.TransitionHistory
netns := c.cfg.HealthChecker.Netns
wakeCh := w.wakeCh
var sleepFor time.Duration
if entry.HealthCheck == "" {
sleepFor = 30 * time.Second
} else {
sleepFor = w.backend.NextInterval(hc.Interval, hc.FastInterval, hc.DownInterval)
}
c.mu.RUnlock()
select {
case <-ctx.Done():
return
case <-time.After(sleepFor):
case <-wakeCh:
}
var result health.ProbeResult
if entry.HealthCheck == "" {
// No healthcheck configured: synthesise a passing result so the
// backend is assumed healthy without any network activity.
result = health.ProbeResult{OK: true, Layer: health.LayerL7, Code: "L7OK"}
} else {
var probeSrc net.IP
if entry.Address.To4() != nil {
probeSrc = hc.ProbeIPv4Src
} else {
probeSrc = hc.ProbeIPv6Src
}
pcfg := prober.ProbeConfig{
Target: entry.Address,
Port: hc.Port,
ProbeSrc: probeSrc,
HealthCheckNetns: netns,
Timeout: hc.Timeout,
HTTP: hc.HTTP,
TCP: hc.TCP,
}
probeCtx, cancel := context.WithTimeout(ctx, hc.Timeout)
slog.Debug("probe-start", "backend", name, "type", hc.Type)
start := time.Now()
result = prober.ForType(hc.Type)(probeCtx, pcfg)
cancel()
slog.Debug("probe-done",
"backend", name,
"type", hc.Type,
"ok", result.OK,
"code", result.Code,
"detail", result.Detail,
"elapsed", time.Since(start).Round(time.Millisecond).String(),
)
}
c.mu.Lock()
w, exists := c.workers[name]
if !exists {
c.mu.Unlock()
return
}
if w.backend.Record(result, maxHistory) {
t := w.backend.Transitions[0]
addr := w.backend.Address
slog.Info("backend-transition",
"backend", name,
"from", t.From.String(),
"to", t.To.String(),
"code", result.Code,
"detail", result.Detail,
)
c.emitForBackend(name, addr, t, c.cfg.Frontends)
}
c.mu.Unlock()
}
}
// emitForBackend emits one Event per frontend that references backendName
// (in any pool), using the provided frontends map. Must be called with c.mu held.
func (c *Checker) emitForBackend(backendName string, addr net.IP, t health.Transition, frontends map[string]config.Frontend) {
for feName, fe := range frontends {
emitted := false
for _, pool := range fe.Pools {
if emitted {
break
}
for name := range pool.Backends {
if name == backendName {
c.emit(Event{FrontendName: feName, BackendName: backendName, Backend: addr, Transition: t})
emitted = true
break
}
}
}
}
}
// emit sends an event to the internal fan-out channel (non-blocking).
// Must be called with c.mu held.
func (c *Checker) emit(e Event) {
select {
case c.eventCh <- e:
default:
slog.Warn("event-drop", "frontend", e.FrontendName, "backend", e.BackendName)
}
}
// fanOut reads from eventCh and distributes to all subscribers.
func (c *Checker) fanOut(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case e := <-c.eventCh:
c.subsMu.Lock()
for _, ch := range c.subs {
select {
case ch <- e:
default:
// Slow subscriber — drop rather than block.
}
}
c.subsMu.Unlock()
}
}
}
// healthCheckEqual returns true if two HealthCheck configs are functionally identical.
func healthCheckEqual(a, b config.HealthCheck) bool {
if a.Type != b.Type ||
a.Interval != b.Interval ||
a.FastInterval != b.FastInterval ||
a.DownInterval != b.DownInterval ||
a.Timeout != b.Timeout ||
a.Rise != b.Rise ||
a.Fall != b.Fall {
return false
}
return httpParamsEqual(a.HTTP, b.HTTP) && tcpParamsEqual(a.TCP, b.TCP)
}
func httpParamsEqual(a, b *config.HTTPParams) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
aRe, bRe := "", ""
if a.ResponseRegexp != nil {
aRe = a.ResponseRegexp.String()
}
if b.ResponseRegexp != nil {
bRe = b.ResponseRegexp.String()
}
return a.Path == b.Path &&
a.Host == b.Host &&
a.ResponseCodeMin == b.ResponseCodeMin &&
a.ResponseCodeMax == b.ResponseCodeMax &&
aRe == bRe &&
a.ServerName == b.ServerName &&
a.InsecureSkipVerify == b.InsecureSkipVerify
}
func tcpParamsEqual(a, b *config.TCPParams) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return a.SSL == b.SSL &&
a.ServerName == b.ServerName &&
a.InsecureSkipVerify == b.InsecureSkipVerify
}
// activeBackendNames returns a sorted, deduplicated list of backend names that
// are referenced by at least one frontend pool and have Enabled: true.
func activeBackendNames(cfg *config.Config) []string {
seen := map[string]struct{}{}
for _, fe := range cfg.Frontends {
for _, pool := range fe.Pools {
for name := range pool.Backends {
if b, ok := cfg.Backends[name]; ok && b.Enabled {
seen[name] = struct{}{}
}
}
}
}
names := make([]string, 0, len(seen))
for name := range seen {
names = append(names, name)
}
sort.Strings(names)
return names
}
// staggerDelay computes the initial probe delay for position pos out of total.
func staggerDelay(interval time.Duration, pos, total int) time.Duration {
if total <= 1 {
return 0
}
return time.Duration(int64(interval) * int64(pos) / int64(total))
}