Files
vpp-maglev/internal/checker/checker.go
Pim van Pelt 4347bb9b05 Bug fixes, config validation, SPA tightening, set-weight UI
This session covers three distinct arcs: correctness bug fixes in the
VPP sync path and frontend reducers, new config validation, and a
large polish pass on the web frontend (tighter layout, backend kebab
dialogs, live grouped-table, live config-reload re-sync).

 - encap for a VIP is now derived from the backend address family,
   not the VIP's. A v6 VIP with v4 backends is programmed as IP6_GRE4
   (not the buggy IP6_GRE6), matching the VPP LB plugin's
   requirement that encap reflects the tunnel inner family. desiredVIP
   gained an Encap field populated in desiredFromFrontend.
 - ActivePoolIndex now requires at least one backend in a pool to be
   BOTH in StateUp AND pb.Weight>0 before the pool counts as active.
   Previously a primary pool with every backend manually zeroed would
   still win over a fallback with weight=100, so fallback traffic
   never materialized. New TestActivePoolIndexWeightedFailover table
   pins the rule in five subcases.
 - SyncLBStateVIP gained a flushAddress parameter threaded through
   reconcileVIP; it forces flush=true on the setASWeight call for a
   specific backend regardless of the usual 0→N heuristic. Wires up
   the explicit [flush] knob the CLI exposes.

 - convertFrontend already enforced that backends within one frontend
   share a family. New cross-frontend pass validateVIPFamilyConsistency
   rejects configs where two frontends share a VIP address but carry
   backends in different families — VPP's LB plugin requires every
   VIP on a prefix to have the same encap type, so such a config
   would fail at lb_add_del_vip_v2 time with VNET_API_ERROR_INVALID
   _ARGUMENT (-73). Catching it at config load turns a silent
   runtime failure into a clear startup error.
 - Two new TestValidationErrors cases pin the behavior: mismatched
   families reject, same-family frontends on one VIP address allowed.

 - Proto adds `bool flush = 5` to SetWeightRequest. The RPC now
   drives a VIP sync immediately after mutating config (fixing the
   latent "weight change only takes effect at the next 30s periodic
   reconcile" gap), passing flushAddress = backend IP when req.Flush
   is true.
 - maglevc grows an optional [flush] token: `set frontend F pool P
   backend B weight N [flush]`. Implementation uses two Run closures
   (runSetFrontendPoolBackendWeight and -Flush) because the tree
   walker only puts slot tokens in args — literal keywords like
   `flush` advance the node but don't appear in the arg list.
 - docs/user-guide.md updated with the [flush] optional and a
   three-paragraph explainer of the graceful-drain vs. flush
   semantics at the VPP level.

 - checker.ListFrontends now sorts alphabetically to match the
   existing sort in ListBackends / ListHealthChecks — RPC responses
   no longer shuffle VIPs per call. cmd/frontend/client.go also
   sorts defensively in refreshAll so an old maglevd build renders
   alphabetically too.
 - backendFromProto was returning out.Transitions[n-1] as the
   LastTransition, but maglevd stores (and the proto carries)
   transitions newest-first, so [n-1] was actually the oldest.
   Reverse on read, which normalizes the client's Transitions slice
   to oldest-first and makes [n-1] genuinely the newest. LastTransition
   now points at the actual latest transition record.
 - applyBackendTransition (Go and TS) derives Enabled = state!="disabled"
   so the two fields stay in lockstep — closed a drift window where
   a recently re-enabled backend still rendered with a stuck
   [disabled] tag. The tag was later removed entirely since state
   and enabled carry the same information.

 - Layout tightened substantially: "FRONTENDS" panel header removed,
   zippy-summary and zippy-body paddings cut, backend-table row
   padding dropped to 2px, per-pool <h3> removed. Pools now live in
   a single consolidated table per frontend with a dedicated "pool"
   column that shows the pool name only on the first row of each
   group — classic grouped-table layout, maximally dense.
 - Description moved inline into the Zippy summary as muted italic
   text, freeing a vertical line per frontend card.
 - formatVIPAddress() helper renders IPv6 VIPs as [addr]:port and
   IPv4 as addr:port, matching RFC 3986 authority syntax.
 - Pools with effective_weight=0 on every backend (standby
   fallbacks, fully-drained primaries) render at opacity 0.35 on
   their non-actions cells; the kebab column stays at full contrast
   because its menu is still fully functional on standby backends.
 - Config-reload propagation: a maglevd config-reload-done log
   event triggers triggerConfigResync() on the frontend side —
   refreshAll() runs off the event-dispatch goroutine, then a
   BrowserEvent{Type:"resync"} is published through the broker.
   writeEvent emits type="resync" as a named SSE frame so the
   SPA's existing addEventListener("resync") handler picks it up
   and calls fetchAllState → replaceAll.
 - recomputeEffectiveWeights in stores/state.ts mirrors the
   server-side health.EffectiveWeights logic so the SPA keeps
   pool.effective_weight correct the moment a backend transitions,
   without waiting for the 30s refresh. Fixed a nasty bug where
   applyBackendEffectiveWeight wrote VIP-scoped vpp-lb-sync-as-*
   event weights into every frontend sharing the backend,
   corrupting frontends with different per-pool configured weights.
   The old log-event reducer was removed; applyConfiguredWeight is
   the narrower replacement used by the kebab set-weight flow.
 - applyBackendTransition calls recomputeEffectiveWeights after
   state updates so pool-failover transitions (primary ⇌ fallback)
   reflect instantly in the UI.

 - Confirmation dialogs via a new Modal primitive
   (Portal-mounted to document.body, escape/click-outside close,
   click-outside debounced on mousedown so mid-row-text-selection
   drags don't dismiss).
 - pause/resume/enable/disable each show a Modal with a consequence
   paragraph explaining what hits live traffic ("will keep existing
   flows", "will flush VPP's flow table", etc.). The disable commit
   button is styled btn-danger red.
 - set-weight action shows a Modal with a range slider (0-100,
   seeded from the current configured weight, accent-colored live
   numeric readout via <output>) plus a flush checkbox and a live-
   swapping note/warn paragraph describing what will happen. On
   commit, the SPA also updates its local store via
   applyConfiguredWeight so the operator sees the new weight
   immediately without waiting for the next refresh.

 - ProbeHeartbeat is now state-aware: ▶ (play) at rest for up/
   down/unknown backends, ⏸ (pause) for paused, ⏹ (stop) for
   disabled/removed, ❤️ (heart) during an in-flight probe.
 - Drop the probe-done event listener — fast probes (<10ms)
   could fire probe-done in the same render tick as probe-start
   and the heart would never visibly paint. Each probe-start now
   runs a fixed 400ms scale-pop animation on a timer; subsequent
   probe-start events reset the timer, so fast cadences produce a
   continuous heart pulse.
 - Fixed wrapper box (16x14 px, overflow hidden) so the row
   doesn't jiggle when the glyph swaps between the narrow ▶/⏸/⏹
   text glyphs and the wider ❤️ emoji.

 - Brand wordmark changed from "maglev" to "vpp-maglev" and wrapped
   in an <a> linking to https://git.ipng.ch/ipng/vpp-maglev. Logo
   link changed to https://ipng.ch/. Both open in a new tab with
   rel="noopener".
 - .gitignore fix: `frontend`, `maglevc`, `maglevd` were matching
   ANY file or directory with those names anywhere in the tree,
   silently ignoring cmd/frontend and friends. Anchored with
   leading slashes so only repo-root build artifacts match.
2026-04-12 23:06:42 +02:00

782 lines
23 KiB
Go

// Copyright (c) 2026, Pim van Pelt <pim@ipng.ch>
package checker
import (
"context"
"fmt"
"log/slog"
"net"
"sort"
"sync"
"time"
"git.ipng.ch/ipng/vpp-maglev/internal/config"
"git.ipng.ch/ipng/vpp-maglev/internal/health"
"git.ipng.ch/ipng/vpp-maglev/internal/metrics"
"git.ipng.ch/ipng/vpp-maglev/internal/prober"
)
// BackendSnapshot combines the live health state with the config entry for a backend.
type BackendSnapshot struct {
Health *health.Backend
Config config.Backend
}
// Event is emitted on every state transition the checker observes. There are
// two kinds, distinguished by which of BackendName or FrontendTransition is
// populated:
//
// - Backend transition: FrontendName is the frontend that references the
// backend (one event per frontend per backend transition), BackendName
// and Backend are set, and Transition carries the health.Transition.
// FrontendTransition is nil.
// - Frontend transition: FrontendName is the frontend whose aggregate state
// changed, FrontendTransition is non-nil. BackendName and Backend are
// empty, Transition is the zero value.
//
// Consumers dispatch on FrontendTransition != nil.
type Event struct {
FrontendName string
BackendName string
Backend net.IP
Transition health.Transition
FrontendTransition *health.FrontendTransition
}
type worker struct {
backend *health.Backend
hc config.HealthCheck
entry config.Backend
cancel context.CancelFunc
wakeCh chan struct{} // closed/signalled to interrupt probe sleep on resume
}
// Checker orchestrates health probing for all backends.
// Each backend is probed exactly once, regardless of how many frontends
// reference it.
type Checker struct {
runCtx context.Context // set in Run; used by EnableBackend to start new goroutines
cfg *config.Config
mu sync.RWMutex
workers map[string]*worker // keyed by backend name
// frontendStates tracks the aggregated state of every configured frontend
// (unknown/up/down). Updated whenever a backend transition happens; a
// change emits a frontend-transition Event. The zero value for a missing
// key is FrontendStateUnknown, so initial-reference accesses behave
// correctly even without explicit seeding.
frontendStates map[string]health.FrontendState
subsMu sync.Mutex
nextID int
subs map[int]chan Event
eventCh chan Event
}
// New creates a Checker. Call Run to start probing.
func New(cfg *config.Config) *Checker {
return &Checker{
cfg: cfg,
workers: make(map[string]*worker),
frontendStates: make(map[string]health.FrontendState),
subs: make(map[int]chan Event),
eventCh: make(chan Event, 256),
}
}
// Run starts all probe goroutines and blocks until ctx is cancelled.
func (c *Checker) Run(ctx context.Context) error {
go c.fanOut(ctx)
c.mu.Lock()
c.runCtx = ctx // safe: held under mu before any EnableBackend call can read it
names := activeBackendNames(c.cfg)
maxHistory := c.cfg.HealthChecker.TransitionHistory
for i, name := range names {
b := c.cfg.Backends[name]
hc := c.cfg.HealthChecks[b.HealthCheck]
c.startWorker(ctx, name, b, hc, i, len(names), maxHistory)
}
c.mu.Unlock()
<-ctx.Done()
return nil
}
// Reload applies a new config without restarting the process.
// New backends are added, removed backends are stopped, changed backends are
// restarted. Backends whose healthcheck config is unchanged continue
// uninterrupted, even if the set of frontends referencing them changes.
func (c *Checker) Reload(ctx context.Context, cfg *config.Config) error {
c.mu.Lock()
defer c.mu.Unlock()
maxHistory := cfg.HealthChecker.TransitionHistory
desired := map[string]struct{}{}
for _, name := range activeBackendNames(cfg) {
desired[name] = struct{}{}
}
// Stop workers no longer needed; emit a removed event using the old frontends.
for name, w := range c.workers {
if _, ok := desired[name]; !ok {
slog.Info("backend-stop", "backend", name)
t := w.backend.Remove(maxHistory)
c.emitForBackend(name, w.backend.Address, t, c.cfg.Frontends)
w.cancel()
delete(c.workers, name)
}
}
// Add new or restart changed workers; emit an unknown event using the new frontends.
names := activeBackendNames(cfg)
for i, name := range names {
b := cfg.Backends[name]
hc := cfg.HealthChecks[b.HealthCheck]
if w, ok := c.workers[name]; ok {
if healthCheckEqual(w.hc, hc) {
// Update entry metadata (weight, etc.) in place without restart.
w.entry = b
continue
}
slog.Info("backend-restart", "backend", name)
w.cancel()
c.startWorker(ctx, name, b, hc, i, len(names), maxHistory)
} else {
slog.Info("backend-start", "backend", name)
c.startWorker(ctx, name, b, hc, i, len(names), maxHistory)
}
c.emitForBackend(name, c.workers[name].backend.Address, c.workers[name].backend.Transitions[0], cfg.Frontends)
}
// Drop frontendStates entries for frontends no longer in config.
for feName := range c.frontendStates {
if _, ok := cfg.Frontends[feName]; !ok {
delete(c.frontendStates, feName)
}
}
c.cfg = cfg
return nil
}
// Subscribe returns a channel that receives Events for every state transition.
// Call the returned cancel function to unsubscribe.
func (c *Checker) Subscribe() (<-chan Event, func()) {
c.subsMu.Lock()
defer c.subsMu.Unlock()
id := c.nextID
c.nextID++
ch := make(chan Event, 64)
c.subs[id] = ch
return ch, func() {
c.subsMu.Lock()
defer c.subsMu.Unlock()
delete(c.subs, id)
close(ch)
}
}
// Config returns the live config pointer held by the checker. Callers must
// treat the returned value as read-only. The pointer is swapped on Reload,
// so callers that cache it across reloads may see stale data.
func (c *Checker) Config() *config.Config {
c.mu.RLock()
defer c.mu.RUnlock()
return c.cfg
}
// BackendState returns the current health state of a backend. Returns
// (StateUnknown, false) when the backend has no worker. Satisfies
// vpp.StateSource.
func (c *Checker) BackendState(name string) (health.State, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
w, ok := c.workers[name]
if !ok {
return health.StateUnknown, false
}
return w.backend.State, true
}
// FrontendState returns the current aggregate state of a frontend (unknown,
// up, or down). Returns (FrontendStateUnknown, false) when the frontend is
// not known to the checker.
func (c *Checker) FrontendState(name string) (health.FrontendState, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
if _, ok := c.cfg.Frontends[name]; !ok {
return health.FrontendStateUnknown, false
}
return c.frontendStates[name], true
}
// ListFrontends returns the names of all configured frontends.
func (c *Checker) ListFrontends() []string {
c.mu.RLock()
defer c.mu.RUnlock()
names := make([]string, 0, len(c.cfg.Frontends))
for name := range c.cfg.Frontends {
names = append(names, name)
}
sort.Strings(names)
return names
}
// GetFrontend returns the frontend config for the given name.
func (c *Checker) GetFrontend(name string) (config.Frontend, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
v, ok := c.cfg.Frontends[name]
return v, ok
}
// SetFrontendPoolBackendWeight updates the weight of a backend within a named
// pool of a frontend. Returns the updated FrontendInfo and a descriptive error
// if the frontend, pool, or backend is not found or the weight is out of range.
func (c *Checker) SetFrontendPoolBackendWeight(frontendName, poolName, backendName string, weight int) (config.Frontend, error) {
if weight < 0 || weight > 100 {
return config.Frontend{}, fmt.Errorf("weight %d out of range [0, 100]", weight)
}
c.mu.Lock()
defer c.mu.Unlock()
fe, ok := c.cfg.Frontends[frontendName]
if !ok {
return config.Frontend{}, fmt.Errorf("frontend %q not found", frontendName)
}
for i, pool := range fe.Pools {
if pool.Name != poolName {
continue
}
pb, ok := pool.Backends[backendName]
if !ok {
return config.Frontend{}, fmt.Errorf("backend %q not found in pool %q", backendName, poolName)
}
pb.Weight = weight
fe.Pools[i].Backends[backendName] = pb
c.cfg.Frontends[frontendName] = fe
slog.Info("frontend-pool-weight", "frontend", frontendName, "pool", poolName, "backend", backendName, "weight", weight)
return fe, nil
}
return config.Frontend{}, fmt.Errorf("pool %q not found in frontend %q", poolName, frontendName)
}
// ListHealthChecks returns the names of all configured health checks, sorted.
func (c *Checker) ListHealthChecks() []string {
c.mu.RLock()
defer c.mu.RUnlock()
names := make([]string, 0, len(c.cfg.HealthChecks))
for name := range c.cfg.HealthChecks {
names = append(names, name)
}
sort.Strings(names)
return names
}
// GetHealthCheck returns the config for a health check by name.
func (c *Checker) GetHealthCheck(name string) (config.HealthCheck, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
hc, ok := c.cfg.HealthChecks[name]
return hc, ok
}
// ListBackends returns the names of all active backends.
func (c *Checker) ListBackends() []string {
c.mu.RLock()
defer c.mu.RUnlock()
names := make([]string, 0, len(c.workers))
for name := range c.workers {
names = append(names, name)
}
sort.Strings(names)
return names
}
// ListFrontendBackends returns the backend health states for all backends of a frontend.
func (c *Checker) ListFrontendBackends(frontendName string) []*health.Backend {
c.mu.RLock()
defer c.mu.RUnlock()
fe, ok := c.cfg.Frontends[frontendName]
if !ok {
return nil
}
var out []*health.Backend
seen := map[string]struct{}{}
for _, pool := range fe.Pools {
for name := range pool.Backends {
if _, already := seen[name]; already {
continue
}
seen[name] = struct{}{}
if w, ok := c.workers[name]; ok {
out = append(out, w.backend)
}
}
}
return out
}
// GetBackend returns a snapshot of the health state and config for a backend by name.
func (c *Checker) GetBackend(name string) (BackendSnapshot, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
w, ok := c.workers[name]
if !ok {
return BackendSnapshot{}, false
}
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
}
// GetBackendInfo returns the health state and key config fields for a backend.
// Satisfies metrics.StateSource.
func (c *Checker) GetBackendInfo(name string) (metrics.BackendInfo, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
w, ok := c.workers[name]
if !ok {
return metrics.BackendInfo{}, false
}
return metrics.BackendInfo{
Health: w.backend,
Enabled: w.entry.Enabled,
HCName: w.entry.HealthCheck,
}, true
}
// PauseBackend pauses health checking for a backend by name. The probe
// goroutine is cancelled so no further traffic is sent to the backend. The
// backend's state is set to paused and remains frozen until ResumeBackend is
// called (which starts a fresh probe goroutine).
// Returns an error if the backend is not found or is disabled.
func (c *Checker) PauseBackend(name string) (BackendSnapshot, error) {
c.mu.Lock()
defer c.mu.Unlock()
w, ok := c.workers[name]
if !ok {
return BackendSnapshot{}, fmt.Errorf("backend %q not found", name)
}
if !w.entry.Enabled {
return BackendSnapshot{}, fmt.Errorf("backend %q is disabled; enable it first", name)
}
maxHistory := c.cfg.HealthChecker.TransitionHistory
if w.backend.Pause(maxHistory) {
t := w.backend.Transitions[0]
slog.Info("backend-transition", "backend", name,
"from", t.From.String(),
"to", t.To.String(),
)
c.emitForBackend(name, w.backend.Address, t, c.cfg.Frontends)
}
w.cancel()
return BackendSnapshot{Health: w.backend, Config: w.entry}, nil
}
// ResumeBackend resumes health checking for a backend by name. A fresh probe
// goroutine is started and the backend re-enters StateUnknown. The existing
// transition history is preserved.
// Returns an error if the backend is not found or is disabled.
func (c *Checker) ResumeBackend(name string) (BackendSnapshot, error) {
c.mu.Lock()
defer c.mu.Unlock()
w, ok := c.workers[name]
if !ok {
return BackendSnapshot{}, fmt.Errorf("backend %q not found", name)
}
if !w.entry.Enabled {
return BackendSnapshot{}, fmt.Errorf("backend %q is disabled; enable it first", name)
}
maxHistory := c.cfg.HealthChecker.TransitionHistory
if w.backend.Resume(maxHistory) {
t := w.backend.Transitions[0]
slog.Info("backend-transition", "backend", name,
"from", t.From.String(),
"to", t.To.String(),
)
c.emitForBackend(name, w.backend.Address, t, c.cfg.Frontends)
}
// Launch a fresh probe goroutine with a new cancellable context,
// keeping the existing worker and its transition history.
wCtx, cancel := context.WithCancel(c.runCtx)
w.cancel = cancel
w.wakeCh = make(chan struct{}, 1)
go c.runProbe(wCtx, name, 0, 1)
return BackendSnapshot{Health: w.backend, Config: w.entry}, nil
}
// DisableBackend stops health checking for a backend and removes it from active
// rotation. The worker entry is kept in the map so the backend remains visible
// via GetBackend and can be re-enabled with EnableBackend.
func (c *Checker) DisableBackend(name string) (BackendSnapshot, bool) {
c.mu.Lock()
defer c.mu.Unlock()
w, ok := c.workers[name]
if !ok {
return BackendSnapshot{}, false
}
if !w.entry.Enabled {
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
}
maxHistory := c.cfg.HealthChecker.TransitionHistory
t := w.backend.Disable(maxHistory)
slog.Info("backend-transition", "backend", name,
"from", t.From.String(),
"to", t.To.String(),
)
c.emitForBackend(name, w.backend.Address, t, c.cfg.Frontends)
w.cancel()
w.entry.Enabled = false
if b, ok := c.cfg.Backends[name]; ok {
b.Enabled = false
c.cfg.Backends[name] = b
}
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
}
// EnableBackend re-enables a previously disabled backend. The existing
// Backend struct is reused — its transition history is preserved — and a
// fresh probe goroutine is launched. The backend re-enters StateUnknown.
func (c *Checker) EnableBackend(name string) (BackendSnapshot, bool) {
c.mu.Lock()
defer c.mu.Unlock()
w, ok := c.workers[name]
if !ok {
return BackendSnapshot{}, false
}
if w.entry.Enabled {
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
}
w.entry.Enabled = true
if b, ok := c.cfg.Backends[name]; ok {
b.Enabled = true
c.cfg.Backends[name] = b
}
maxHistory := c.cfg.HealthChecker.TransitionHistory
t := w.backend.Enable(maxHistory)
slog.Info("backend-transition", "backend", name,
"from", t.From.String(),
"to", t.To.String(),
)
c.emitForBackend(name, w.backend.Address, t, c.cfg.Frontends)
// Launch a fresh probe goroutine with a new cancellable context,
// keeping the existing worker and its transition history.
wCtx, cancel := context.WithCancel(c.runCtx)
w.cancel = cancel
w.wakeCh = make(chan struct{}, 1)
go c.runProbe(wCtx, name, 0, 1)
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
}
// ---- internal --------------------------------------------------------------
// startWorker creates a Backend and launches a probe goroutine.
// Must be called with c.mu held.
func (c *Checker) startWorker(ctx context.Context, name string, entry config.Backend, hc config.HealthCheck, pos, total, maxHistory int) {
rise, fall := hc.Rise, hc.Fall
if entry.HealthCheck == "" {
// No healthcheck: one synthetic pass drives the backend to Up immediately.
rise, fall = 1, 1
}
wCtx, cancel := context.WithCancel(ctx)
w := &worker{
backend: health.New(name, entry.Address, rise, fall),
hc: hc,
entry: entry,
cancel: cancel,
wakeCh: make(chan struct{}, 1),
}
w.backend.Start(maxHistory)
c.workers[name] = w
go c.runProbe(wCtx, name, pos, total)
}
// runProbe is the per-backend probe loop.
func (c *Checker) runProbe(ctx context.Context, name string, pos, total int) {
c.mu.RLock()
w, ok := c.workers[name]
if !ok {
c.mu.RUnlock()
return
}
initialDelay := staggerDelay(w.hc.Interval, pos, total)
c.mu.RUnlock()
if initialDelay > 0 {
select {
case <-ctx.Done():
return
case <-time.After(initialDelay):
}
}
first := true
for {
c.mu.RLock()
w, ok := c.workers[name]
if !ok {
c.mu.RUnlock()
return
}
hc := w.hc
entry := w.entry
maxHistory := c.cfg.HealthChecker.TransitionHistory
netns := c.cfg.HealthChecker.Netns
wakeCh := w.wakeCh
var sleepFor time.Duration
if entry.HealthCheck == "" {
// Static (no-healthcheck) backends: the first iteration fires
// the synthetic pass immediately so the backend reaches "up"
// without delay; subsequent iterations idle at 30s since there's
// nothing to do anyway.
if first {
sleepFor = 0
} else {
sleepFor = 30 * time.Second
}
} else {
sleepFor = w.backend.NextInterval(hc.Interval, hc.FastInterval, hc.DownInterval)
}
c.mu.RUnlock()
select {
case <-ctx.Done():
return
case <-time.After(sleepFor):
case <-wakeCh:
}
first = false
var result health.ProbeResult
if entry.HealthCheck == "" {
// No healthcheck configured: synthesise a passing result so the
// backend is assumed healthy without any network activity.
result = health.ProbeResult{OK: true, Layer: health.LayerL7, Code: "L7OK"}
} else {
var probeSrc net.IP
if entry.Address.To4() != nil {
probeSrc = hc.ProbeIPv4Src
} else {
probeSrc = hc.ProbeIPv6Src
}
pcfg := prober.ProbeConfig{
Target: entry.Address,
Port: hc.Port,
ProbeSrc: probeSrc,
HealthCheckNetns: netns,
Timeout: hc.Timeout,
HTTP: hc.HTTP,
TCP: hc.TCP,
}
probeCtx, cancel := context.WithTimeout(ctx, hc.Timeout)
slog.Debug("probe-start", "backend", name, "type", hc.Type)
start := time.Now()
result = prober.ForType(hc.Type)(probeCtx, pcfg)
elapsed := time.Since(start)
cancel()
slog.Debug("probe-done",
"backend", name,
"type", hc.Type,
"ok", result.OK,
"code", result.Code,
"detail", result.Detail,
"elapsed", elapsed.Round(time.Millisecond).String(),
)
res := "success"
if !result.OK {
res = "failure"
}
metrics.ProbeTotal.WithLabelValues(name, hc.Type, res, result.Code).Inc()
metrics.ProbeDuration.WithLabelValues(name, hc.Type).Observe(elapsed.Seconds())
}
c.mu.Lock()
w, exists := c.workers[name]
if !exists {
c.mu.Unlock()
return
}
if w.backend.Record(result, maxHistory) {
t := w.backend.Transitions[0]
addr := w.backend.Address
slog.Info("backend-transition",
"backend", name,
"from", t.From.String(),
"to", t.To.String(),
"code", result.Code,
"detail", result.Detail,
)
metrics.TransitionTotal.WithLabelValues(name, t.From.String(), t.To.String()).Inc()
c.emitForBackend(name, addr, t, c.cfg.Frontends)
}
c.mu.Unlock()
}
}
// emitForBackend emits one backend-transition Event per frontend that
// references backendName (in any pool), using the provided frontends map.
// After emitting the backend event for a frontend, it also re-computes that
// frontend's aggregate state and emits a frontend-transition Event if the
// state has changed. Must be called with c.mu held.
func (c *Checker) emitForBackend(backendName string, addr net.IP, t health.Transition, frontends map[string]config.Frontend) {
for feName, fe := range frontends {
if !frontendReferencesBackend(fe, backendName) {
continue
}
c.emit(Event{FrontendName: feName, BackendName: backendName, Backend: addr, Transition: t})
c.updateFrontendState(feName, fe)
}
}
// frontendReferencesBackend reports whether fe has the named backend in any
// of its pools.
func frontendReferencesBackend(fe config.Frontend, backendName string) bool {
for _, pool := range fe.Pools {
if _, ok := pool.Backends[backendName]; ok {
return true
}
}
return false
}
// updateFrontendState recomputes the aggregate state of fe, compares against
// the last known state, and emits a frontend-transition Event on change.
// Must be called with c.mu held. The current state is read from the worker
// map — so the caller (who already holds c.mu) sees a consistent view.
func (c *Checker) updateFrontendState(feName string, fe config.Frontend) {
states := make(map[string]health.State)
for _, pool := range fe.Pools {
for bName := range pool.Backends {
if w, ok := c.workers[bName]; ok {
states[bName] = w.backend.State
} else {
states[bName] = health.StateUnknown
}
}
}
newState := health.ComputeFrontendState(fe, states)
old := c.frontendStates[feName] // zero value (Unknown) on first access
if old == newState {
return
}
c.frontendStates[feName] = newState
ft := health.FrontendTransition{From: old, To: newState, At: time.Now()}
slog.Info("frontend-transition",
"frontend", feName,
"from", old.String(),
"to", newState.String(),
)
c.emit(Event{FrontendName: feName, FrontendTransition: &ft})
}
// emit sends an event to the internal fan-out channel (non-blocking).
// Must be called with c.mu held.
func (c *Checker) emit(e Event) {
select {
case c.eventCh <- e:
default:
slog.Warn("event-drop", "frontend", e.FrontendName, "backend", e.BackendName)
}
}
// fanOut reads from eventCh and distributes to all subscribers.
func (c *Checker) fanOut(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case e := <-c.eventCh:
c.subsMu.Lock()
for _, ch := range c.subs {
select {
case ch <- e:
default:
// Slow subscriber — drop rather than block.
}
}
c.subsMu.Unlock()
}
}
}
// healthCheckEqual returns true if two HealthCheck configs are functionally identical.
func healthCheckEqual(a, b config.HealthCheck) bool {
if a.Type != b.Type ||
a.Interval != b.Interval ||
a.FastInterval != b.FastInterval ||
a.DownInterval != b.DownInterval ||
a.Timeout != b.Timeout ||
a.Rise != b.Rise ||
a.Fall != b.Fall {
return false
}
return httpParamsEqual(a.HTTP, b.HTTP) && tcpParamsEqual(a.TCP, b.TCP)
}
func httpParamsEqual(a, b *config.HTTPParams) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
aRe, bRe := "", ""
if a.ResponseRegexp != nil {
aRe = a.ResponseRegexp.String()
}
if b.ResponseRegexp != nil {
bRe = b.ResponseRegexp.String()
}
return a.Path == b.Path &&
a.Host == b.Host &&
a.ResponseCodeMin == b.ResponseCodeMin &&
a.ResponseCodeMax == b.ResponseCodeMax &&
aRe == bRe &&
a.ServerName == b.ServerName &&
a.InsecureSkipVerify == b.InsecureSkipVerify
}
func tcpParamsEqual(a, b *config.TCPParams) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return a.SSL == b.SSL &&
a.ServerName == b.ServerName &&
a.InsecureSkipVerify == b.InsecureSkipVerify
}
// activeBackendNames returns a sorted, deduplicated list of backend names that
// are referenced by at least one frontend pool and have Enabled: true.
func activeBackendNames(cfg *config.Config) []string {
seen := map[string]struct{}{}
for _, fe := range cfg.Frontends {
for _, pool := range fe.Pools {
for name := range pool.Backends {
if b, ok := cfg.Backends[name]; ok && b.Enabled {
seen[name] = struct{}{}
}
}
}
}
names := make([]string, 0, len(seen))
for name := range seen {
names = append(names, name)
}
sort.Strings(names)
return names
}
// staggerDelay computes the initial probe delay for position pos out of total.
func staggerDelay(interval time.Duration, pos, total int) time.Duration {
if total <= 1 {
return 0
}
return time.Duration(int64(interval) * int64(pos) / int64(total))
}