// Copyright (c) 2026, Pim van Pelt package checker import ( "context" "log/slog" "net" "sort" "sync" "time" "git.ipng.ch/ipng/vpp-maglev/internal/config" "git.ipng.ch/ipng/vpp-maglev/internal/health" "git.ipng.ch/ipng/vpp-maglev/internal/prober" ) // BackendSnapshot combines the live health state with the config entry for a backend. type BackendSnapshot struct { Health *health.Backend Config config.Backend } // Event is emitted on every backend state transition, once per frontend that // references the backend. type Event struct { FrontendName string BackendName string Backend net.IP Transition health.Transition } type worker struct { backend *health.Backend hc config.HealthCheck entry config.Backend cancel context.CancelFunc } // Checker orchestrates health probing for all backends. // Each backend is probed exactly once, regardless of how many frontends // reference it. type Checker struct { cfg *config.Config mu sync.RWMutex workers map[string]*worker // keyed by backend name subsMu sync.Mutex nextID int subs map[int]chan Event eventCh chan Event } // New creates a Checker. Call Run to start probing. func New(cfg *config.Config) *Checker { return &Checker{ cfg: cfg, workers: make(map[string]*worker), subs: make(map[int]chan Event), eventCh: make(chan Event, 256), } } // Run starts all probe goroutines and blocks until ctx is cancelled. func (c *Checker) Run(ctx context.Context) error { go c.fanOut(ctx) c.mu.Lock() names := activeBackendNames(c.cfg) for i, name := range names { b := c.cfg.Backends[name] hc := c.cfg.HealthChecks[b.HealthCheck] c.startWorker(ctx, name, b, hc, i, len(names)) } c.mu.Unlock() <-ctx.Done() return nil } // Reload applies a new config without restarting the process. // New backends are added, removed backends are stopped, changed backends are // restarted. Backends whose healthcheck config is unchanged continue // uninterrupted, even if the set of frontends referencing them changes. func (c *Checker) Reload(ctx context.Context, cfg *config.Config) error { c.mu.Lock() defer c.mu.Unlock() desired := map[string]struct{}{} for _, name := range activeBackendNames(cfg) { desired[name] = struct{}{} } // Stop workers no longer needed. for name, w := range c.workers { if _, ok := desired[name]; !ok { slog.Info("backend-stop", "backend", name) w.cancel() delete(c.workers, name) } } // Add new or restart changed workers. names := activeBackendNames(cfg) for i, name := range names { b := cfg.Backends[name] hc := cfg.HealthChecks[b.HealthCheck] if w, ok := c.workers[name]; ok { if healthCheckEqual(w.hc, hc) { // Update entry metadata (weight, etc.) in place without restart. w.entry = b continue } slog.Info("backend-restart", "backend", name) w.cancel() c.startWorker(ctx, name, b, hc, i, len(names)) } else { slog.Info("backend-start", "backend", name) c.startWorker(ctx, name, b, hc, i, len(names)) } } c.cfg = cfg return nil } // Subscribe returns a channel that receives Events for every state transition. // Call the returned cancel function to unsubscribe. func (c *Checker) Subscribe() (<-chan Event, func()) { c.subsMu.Lock() defer c.subsMu.Unlock() id := c.nextID c.nextID++ ch := make(chan Event, 64) c.subs[id] = ch return ch, func() { c.subsMu.Lock() defer c.subsMu.Unlock() delete(c.subs, id) close(ch) } } // ListFrontends returns the names of all configured frontends. func (c *Checker) ListFrontends() []string { c.mu.RLock() defer c.mu.RUnlock() names := make([]string, 0, len(c.cfg.Frontends)) for name := range c.cfg.Frontends { names = append(names, name) } return names } // GetFrontend returns the frontend config for the given name. func (c *Checker) GetFrontend(name string) (config.Frontend, bool) { c.mu.RLock() defer c.mu.RUnlock() v, ok := c.cfg.Frontends[name] return v, ok } // ListHealthChecks returns the names of all configured health checks, sorted. func (c *Checker) ListHealthChecks() []string { c.mu.RLock() defer c.mu.RUnlock() names := make([]string, 0, len(c.cfg.HealthChecks)) for name := range c.cfg.HealthChecks { names = append(names, name) } sort.Strings(names) return names } // GetHealthCheck returns the config for a health check by name. func (c *Checker) GetHealthCheck(name string) (config.HealthCheck, bool) { c.mu.RLock() defer c.mu.RUnlock() hc, ok := c.cfg.HealthChecks[name] return hc, ok } // ListBackends returns the names of all active backends. func (c *Checker) ListBackends() []string { c.mu.RLock() defer c.mu.RUnlock() names := make([]string, 0, len(c.workers)) for name := range c.workers { names = append(names, name) } sort.Strings(names) return names } // ListFrontendBackends returns the backend health states for all backends of a frontend. func (c *Checker) ListFrontendBackends(frontendName string) []*health.Backend { c.mu.RLock() defer c.mu.RUnlock() fe, ok := c.cfg.Frontends[frontendName] if !ok { return nil } var out []*health.Backend for _, name := range fe.Backends { if w, ok := c.workers[name]; ok { out = append(out, w.backend) } } return out } // GetBackend returns a snapshot of the health state and config for a backend by name. func (c *Checker) GetBackend(name string) (BackendSnapshot, bool) { c.mu.RLock() defer c.mu.RUnlock() w, ok := c.workers[name] if !ok { return BackendSnapshot{}, false } return BackendSnapshot{Health: w.backend, Config: w.entry}, true } // PauseBackend pauses health checking for a backend by name. func (c *Checker) PauseBackend(name string) (BackendSnapshot, bool) { c.mu.Lock() defer c.mu.Unlock() w, ok := c.workers[name] if !ok { return BackendSnapshot{}, false } maxHistory := c.cfg.HealthChecker.TransitionHistory if w.backend.Pause(maxHistory) { slog.Info("backend-pause", "backend", name) c.emitForBackend(name, w.backend.Address, w.backend.Transitions[0]) } return BackendSnapshot{Health: w.backend, Config: w.entry}, true } // ResumeBackend resumes health checking for a backend by name. func (c *Checker) ResumeBackend(name string) (BackendSnapshot, bool) { c.mu.Lock() defer c.mu.Unlock() w, ok := c.workers[name] if !ok { return BackendSnapshot{}, false } maxHistory := c.cfg.HealthChecker.TransitionHistory if w.backend.Resume(maxHistory) { slog.Info("backend-resume", "backend", name) c.emitForBackend(name, w.backend.Address, w.backend.Transitions[0]) } return BackendSnapshot{Health: w.backend, Config: w.entry}, true } // ---- internal -------------------------------------------------------------- // startWorker creates a Backend and launches a probe goroutine. // Must be called with c.mu held. func (c *Checker) startWorker(ctx context.Context, name string, entry config.Backend, hc config.HealthCheck, pos, total int) { rise, fall := hc.Rise, hc.Fall if entry.HealthCheck == "" { // No healthcheck: one synthetic pass drives the backend to Up immediately. rise, fall = 1, 1 } wCtx, cancel := context.WithCancel(ctx) w := &worker{ backend: health.New(name, entry.Address, rise, fall), hc: hc, entry: entry, cancel: cancel, } c.workers[name] = w go c.runProbe(wCtx, name, pos, total) } // runProbe is the per-backend probe loop. func (c *Checker) runProbe(ctx context.Context, name string, pos, total int) { c.mu.RLock() w, ok := c.workers[name] if !ok { c.mu.RUnlock() return } initialDelay := staggerDelay(w.hc.Interval, pos, total) c.mu.RUnlock() if initialDelay > 0 { select { case <-ctx.Done(): return case <-time.After(initialDelay): } } for { c.mu.RLock() w, ok := c.workers[name] if !ok { c.mu.RUnlock() return } hc := w.hc entry := w.entry maxHistory := c.cfg.HealthChecker.TransitionHistory netns := c.cfg.HealthChecker.Netns var sleepFor time.Duration if entry.HealthCheck == "" { sleepFor = 30 * time.Second } else { sleepFor = w.backend.NextInterval(hc.Interval, hc.FastInterval, hc.DownInterval) } c.mu.RUnlock() select { case <-ctx.Done(): return case <-time.After(sleepFor): } var result health.ProbeResult if entry.HealthCheck == "" { // No healthcheck configured: synthesise a passing result so the // backend is assumed healthy without any network activity. result = health.ProbeResult{OK: true, Layer: health.LayerL7, Code: "L7OK"} } else { var probeSrc net.IP if entry.Address.To4() != nil { probeSrc = hc.ProbeIPv4Src } else { probeSrc = hc.ProbeIPv6Src } pcfg := prober.ProbeConfig{ Target: entry.Address, Port: hc.Port, ProbeSrc: probeSrc, HealthCheckNetns: netns, Timeout: hc.Timeout, HTTP: hc.HTTP, TCP: hc.TCP, } probeCtx, cancel := context.WithTimeout(ctx, hc.Timeout) slog.Debug("probe-start", "backend", name, "type", hc.Type) start := time.Now() result = prober.ForType(hc.Type)(probeCtx, pcfg) cancel() slog.Debug("probe-done", "backend", name, "type", hc.Type, "ok", result.OK, "code", result.Code, "detail", result.Detail, "elapsed", time.Since(start).Round(time.Millisecond).String(), ) } c.mu.Lock() w, exists := c.workers[name] if !exists { c.mu.Unlock() return } if w.backend.Record(result, maxHistory) { t := w.backend.Transitions[0] addr := w.backend.Address slog.Info("backend-transition", "backend", name, "from", t.From.String(), "to", t.To.String(), "code", result.Code, "detail", result.Detail, ) c.emitForBackend(name, addr, t) } c.mu.Unlock() } } // emitForBackend emits one Event per frontend that references backendName. // Must be called with c.mu held. func (c *Checker) emitForBackend(backendName string, addr net.IP, t health.Transition) { for feName, fe := range c.cfg.Frontends { for _, name := range fe.Backends { if name == backendName { c.emit(Event{FrontendName: feName, BackendName: backendName, Backend: addr, Transition: t}) break } } } } // emit sends an event to the internal fan-out channel (non-blocking). // Must be called with c.mu held. func (c *Checker) emit(e Event) { select { case c.eventCh <- e: default: slog.Warn("event-drop", "frontend", e.FrontendName, "backend", e.BackendName) } } // fanOut reads from eventCh and distributes to all subscribers. func (c *Checker) fanOut(ctx context.Context) { for { select { case <-ctx.Done(): return case e := <-c.eventCh: c.subsMu.Lock() for _, ch := range c.subs { select { case ch <- e: default: // Slow subscriber — drop rather than block. } } c.subsMu.Unlock() } } } // healthCheckEqual returns true if two HealthCheck configs are functionally identical. func healthCheckEqual(a, b config.HealthCheck) bool { if a.Type != b.Type || a.Interval != b.Interval || a.FastInterval != b.FastInterval || a.DownInterval != b.DownInterval || a.Timeout != b.Timeout || a.Rise != b.Rise || a.Fall != b.Fall { return false } return httpParamsEqual(a.HTTP, b.HTTP) && tcpParamsEqual(a.TCP, b.TCP) } func httpParamsEqual(a, b *config.HTTPParams) bool { if a == nil && b == nil { return true } if a == nil || b == nil { return false } aRe, bRe := "", "" if a.ResponseRegexp != nil { aRe = a.ResponseRegexp.String() } if b.ResponseRegexp != nil { bRe = b.ResponseRegexp.String() } return a.Path == b.Path && a.Host == b.Host && a.ResponseCodeMin == b.ResponseCodeMin && a.ResponseCodeMax == b.ResponseCodeMax && aRe == bRe && a.ServerName == b.ServerName && a.InsecureSkipVerify == b.InsecureSkipVerify } func tcpParamsEqual(a, b *config.TCPParams) bool { if a == nil && b == nil { return true } if a == nil || b == nil { return false } return a.SSL == b.SSL && a.ServerName == b.ServerName && a.InsecureSkipVerify == b.InsecureSkipVerify } // activeBackendNames returns a sorted, deduplicated list of backend names that // are referenced by at least one frontend and have Enabled: true. func activeBackendNames(cfg *config.Config) []string { seen := map[string]struct{}{} for _, fe := range cfg.Frontends { for _, name := range fe.Backends { if b, ok := cfg.Backends[name]; ok && b.Enabled { seen[name] = struct{}{} } } } names := make([]string, 0, len(seen)) for name := range seen { names = append(names, name) } sort.Strings(names) return names } // staggerDelay computes the initial probe delay for position pos out of total. func staggerDelay(interval time.Duration, pos, total int) time.Duration { if total <= 1 { return 0 } return time.Duration(int64(interval) * int64(pos) / int64(total)) }