package checker import ( "context" "log/slog" "net" "sync" "time" "git.ipng.ch/ipng/vpp-maglev/internal/config" "git.ipng.ch/ipng/vpp-maglev/internal/health" "git.ipng.ch/ipng/vpp-maglev/internal/prober" ) // Event is emitted on every backend state transition. type Event struct { VIPName string Backend net.IP Transition health.Transition } type backendKey struct { VIPName string Backend string // net.IP.String() } type worker struct { backend *health.Backend hc config.HealthCheck vip config.VIP cancel context.CancelFunc } // Checker orchestrates health probing for all VIP:backend tuples. type Checker struct { cfg *config.Frontend mu sync.RWMutex workers map[backendKey]*worker subsMu sync.Mutex nextID int subs map[int]chan Event eventCh chan Event } // New creates a Checker. Call Run to start probing. func New(cfg *config.Frontend) *Checker { return &Checker{ cfg: cfg, workers: make(map[backendKey]*worker), subs: make(map[int]chan Event), eventCh: make(chan Event, 256), } } // Run starts all probe goroutines and blocks until ctx is cancelled. func (c *Checker) Run(ctx context.Context) error { go c.fanOut(ctx) c.mu.Lock() total := totalBackends(c.cfg) pos := 0 for vipName, vip := range c.cfg.VIPs { for _, backend := range vip.Backends { c.startWorker(ctx, vipName, vip, backend, pos, total) pos++ } } c.mu.Unlock() <-ctx.Done() return nil } // Reload applies a new config without restarting the process. // New tuples are added, removed tuples are stopped, changed tuples are restarted. // Existing tuples with unchanged healthcheck config continue uninterrupted. func (c *Checker) Reload(ctx context.Context, cfg *config.Frontend) error { c.mu.Lock() defer c.mu.Unlock() type desired struct { vipName string vip config.VIP backend net.IP } desiredMap := map[backendKey]desired{} for vipName, vip := range cfg.VIPs { for _, backend := range vip.Backends { key := backendKey{VIPName: vipName, Backend: backend.String()} desiredMap[key] = desired{vipName: vipName, vip: vip, backend: backend} } } // Stop workers no longer in config. for key, w := range c.workers { if _, ok := desiredMap[key]; !ok { slog.Info("backend-stop", "vip", key.VIPName, "backend", key.Backend) w.cancel() delete(c.workers, key) } } // Add new or restart changed workers. total := len(desiredMap) pos := 0 for key, d := range desiredMap { if w, ok := c.workers[key]; ok { if healthCheckEqual(w.hc, d.vip.HealthCheck) { pos++ continue } slog.Info("backend-restart", "vip", key.VIPName, "backend", key.Backend) w.cancel() w.hc = d.vip.HealthCheck w.vip = d.vip wCtx, cancel := context.WithCancel(ctx) w.cancel = cancel go c.runProbe(wCtx, key, 0, 1) // no stagger on reload } else { slog.Info("backend-start", "vip", d.vipName, "backend", d.backend) c.startWorker(ctx, d.vipName, d.vip, d.backend, pos, total) } pos++ } c.cfg = cfg return nil } // Subscribe returns a channel that receives Events for every state transition. // Call the returned cancel function to unsubscribe. func (c *Checker) Subscribe() (<-chan Event, func()) { c.subsMu.Lock() defer c.subsMu.Unlock() id := c.nextID c.nextID++ ch := make(chan Event, 64) c.subs[id] = ch return ch, func() { c.subsMu.Lock() defer c.subsMu.Unlock() delete(c.subs, id) close(ch) } } // ListVIPs returns the names of all configured VIPs. func (c *Checker) ListVIPs() []string { c.mu.RLock() defer c.mu.RUnlock() names := make([]string, 0, len(c.cfg.VIPs)) for name := range c.cfg.VIPs { names = append(names, name) } return names } // GetVIP returns the VIP config for the given name. func (c *Checker) GetVIP(name string) (config.VIP, bool) { c.mu.RLock() defer c.mu.RUnlock() v, ok := c.cfg.VIPs[name] return v, ok } // ListBackends returns the backend states for all backends of a VIP. func (c *Checker) ListBackends(vipName string) []*health.Backend { c.mu.RLock() defer c.mu.RUnlock() var out []*health.Backend for key, w := range c.workers { if key.VIPName == vipName { out = append(out, w.backend) } } return out } // GetBackend returns the backend state for a specific VIP:backend tuple. func (c *Checker) GetBackend(vipName, backendAddr string) (*health.Backend, bool) { c.mu.RLock() defer c.mu.RUnlock() key := backendKey{VIPName: vipName, Backend: backendAddr} w, ok := c.workers[key] if !ok { return nil, false } return w.backend, true } // PauseBackend pauses health checking for a specific backend. func (c *Checker) PauseBackend(vipName, backendAddr string) (*health.Backend, bool) { c.mu.Lock() defer c.mu.Unlock() key := backendKey{VIPName: vipName, Backend: backendAddr} w, ok := c.workers[key] if !ok { return nil, false } maxHistory := c.cfg.HealthChecker.TransitionHistory if w.backend.Pause(maxHistory) { slog.Info("backend-pause", "vip", vipName, "backend", backendAddr) c.emit(Event{VIPName: vipName, Backend: w.backend.Address, Transition: w.backend.Transitions[0]}) } return w.backend, true } // ResumeBackend resumes health checking for a specific backend. func (c *Checker) ResumeBackend(vipName, backendAddr string) (*health.Backend, bool) { c.mu.Lock() defer c.mu.Unlock() key := backendKey{VIPName: vipName, Backend: backendAddr} w, ok := c.workers[key] if !ok { return nil, false } maxHistory := c.cfg.HealthChecker.TransitionHistory if w.backend.Resume(maxHistory) { slog.Info("backend-resume", "vip", vipName, "backend", backendAddr) c.emit(Event{VIPName: vipName, Backend: w.backend.Address, Transition: w.backend.Transitions[0]}) } return w.backend, true } // ---- internal -------------------------------------------------------------- // startWorker creates a Backend and launches a probe goroutine. // pos and total are used to compute the startup stagger delay. // Must be called with c.mu held. func (c *Checker) startWorker(ctx context.Context, vipName string, vip config.VIP, backend net.IP, pos, total int) { key := backendKey{VIPName: vipName, Backend: backend.String()} wCtx, cancel := context.WithCancel(ctx) hc := vip.HealthCheck w := &worker{ backend: health.New(vipName, backend, hc.Rise, hc.Fall), hc: hc, vip: vip, cancel: cancel, } c.workers[key] = w go c.runProbe(wCtx, key, pos, total) } // runProbe is the per-backend probe loop. // pos and total drive the initial stagger: delay = interval * pos / total. func (c *Checker) runProbe(ctx context.Context, key backendKey, pos, total int) { // Stagger initial probe to spread startup load. c.mu.RLock() w, ok := c.workers[key] if !ok { c.mu.RUnlock() return } initialDelay := staggerDelay(w.hc.Interval, pos, total) c.mu.RUnlock() if initialDelay > 0 { select { case <-ctx.Done(): return case <-time.After(initialDelay): } } for { c.mu.RLock() w, ok := c.workers[key] if !ok { c.mu.RUnlock() return } hc := w.hc vip := w.vip maxHistory := c.cfg.HealthChecker.TransitionHistory sleepFor := w.backend.NextInterval(hc.Interval, hc.FastInterval, hc.DownInterval) c.mu.RUnlock() select { case <-ctx.Done(): return case <-time.After(sleepFor): } // Determine source IP based on target address family. backendIP := net.ParseIP(key.Backend) var probeSrc net.IP if backendIP.To4() != nil { probeSrc = c.cfg.ProbeIPv4Src } else { probeSrc = c.cfg.ProbeIPv6Src } pcfg := prober.ProbeConfig{ Target: backendIP, Port: vip.Port, ProbeSrc: probeSrc, HealthCheckNetns: c.cfg.HealthCheckNetns, Timeout: hc.Timeout, HTTP: hc.HTTP, TCP: hc.TCP, } probeCtx, cancel := context.WithTimeout(ctx, hc.Timeout) slog.Debug("probe-start", "vip", key.VIPName, "backend", key.Backend, "type", hc.Type) start := time.Now() result := prober.ForType(hc.Type)(probeCtx, pcfg) cancel() slog.Debug("probe-done", "vip", key.VIPName, "backend", key.Backend, "type", hc.Type, "ok", result.OK, "code", result.Code, "detail", result.Detail, "elapsed", time.Since(start).Round(time.Millisecond).String(), ) c.mu.Lock() w, exists := c.workers[key] if !exists { c.mu.Unlock() return } if w.backend.Record(result, maxHistory) { t := w.backend.Transitions[0] addr := w.backend.Address slog.Info("backend-transition", "vip", key.VIPName, "backend", key.Backend, "from", t.From.String(), "to", t.To.String(), "code", result.Code, "detail", result.Detail, ) c.emit(Event{VIPName: key.VIPName, Backend: addr, Transition: t}) } c.mu.Unlock() } } // emit sends an event to the internal fan-out channel (non-blocking). // Must be called with c.mu held. func (c *Checker) emit(e Event) { select { case c.eventCh <- e: default: slog.Warn("event-drop", "vip", e.VIPName, "backend", e.Backend) } } // fanOut reads from eventCh and distributes to all subscribers. func (c *Checker) fanOut(ctx context.Context) { for { select { case <-ctx.Done(): return case e := <-c.eventCh: c.subsMu.Lock() for _, ch := range c.subs { select { case ch <- e: default: // Slow subscriber — drop rather than block. } } c.subsMu.Unlock() } } } // healthCheckEqual returns true if two HealthCheck configs are functionally identical. func healthCheckEqual(a, b config.HealthCheck) bool { if a.Type != b.Type || a.Interval != b.Interval || a.FastInterval != b.FastInterval || a.DownInterval != b.DownInterval || a.Timeout != b.Timeout || a.Rise != b.Rise || a.Fall != b.Fall { return false } return httpParamsEqual(a.HTTP, b.HTTP) && tcpParamsEqual(a.TCP, b.TCP) } func httpParamsEqual(a, b *config.HTTPParams) bool { if a == nil && b == nil { return true } if a == nil || b == nil { return false } aRe, bRe := "", "" if a.ResponseRegexp != nil { aRe = a.ResponseRegexp.String() } if b.ResponseRegexp != nil { bRe = b.ResponseRegexp.String() } return a.Path == b.Path && a.Host == b.Host && a.ResponseCodeMin == b.ResponseCodeMin && a.ResponseCodeMax == b.ResponseCodeMax && aRe == bRe && a.ServerName == b.ServerName && a.InsecureSkipVerify == b.InsecureSkipVerify } func tcpParamsEqual(a, b *config.TCPParams) bool { if a == nil && b == nil { return true } if a == nil || b == nil { return false } return a.SSL == b.SSL && a.ServerName == b.ServerName && a.InsecureSkipVerify == b.InsecureSkipVerify } // staggerDelay computes the initial probe delay for a backend at position pos // out of total backends: delay = interval * pos / total. func staggerDelay(interval time.Duration, pos, total int) time.Duration { if total <= 1 { return 0 } return time.Duration(int64(interval) * int64(pos) / int64(total)) } // totalBackends counts all backends across all VIPs in a config. func totalBackends(cfg *config.Frontend) int { n := 0 for _, vip := range cfg.VIPs { n += len(vip.Backends) } return n }