Files
vpp-maglev/internal/vpp/reconciler.go

159 lines
5.1 KiB
Go

// SPDX-License-Identifier: Apache-2.0
package vpp
import (
"context"
"errors"
"log/slog"
"git.ipng.ch/ipng/vpp-maglev/internal/checker"
)
// EventSource is the subset of checker.Checker that Reconciler needs.
// Decoupling via an interface keeps the dependency direction
// vpp → checker (checker never imports vpp).
type EventSource interface {
Subscribe() (<-chan checker.Event, func())
}
// Reconciler bridges checker state transitions to VPP dataplane changes.
// It subscribes to the checker's event channel and, for every transition,
// runs SyncLBStateVIP for the frontend the backend belongs to. This is
// the ONLY place in the codebase where backend state transitions cause
// VPP calls — every LB change flows through Client.SyncLBStateVIP.
//
// The reconciler carries no state of its own. Idempotency is guaranteed
// by SyncLBStateVIP itself (diff-based, driven by the pure asFromBackend
// mapping in lbsync.go).
type Reconciler struct {
client *Client
events EventSource
stateSrc StateSource
}
// NewReconciler creates a Reconciler. client is the VPP client, events is
// the checker (or anything that implements Subscribe), and stateSrc provides
// the live config for SyncLBStateVIP calls. All three are normally the
// checker/vpp client pair constructed at daemon startup.
func NewReconciler(client *Client, events EventSource, stateSrc StateSource) *Reconciler {
return &Reconciler{client: client, events: events, stateSrc: stateSrc}
}
// Run subscribes to the checker and loops until ctx is cancelled. Each
// received event fires a single-VIP sync for the frontend the transitioned
// backend belongs to.
func (r *Reconciler) Run(ctx context.Context) {
ch, unsub := r.events.Subscribe()
defer unsub()
slog.Info("vpp-reconciler-start")
defer slog.Info("vpp-reconciler-stop")
for {
select {
case <-ctx.Done():
return
case ev, ok := <-ch:
if !ok {
return
}
r.handle(ev)
}
}
}
// handle reconciles one event. Operates only on backend-transition events
// that carry a frontend name (the checker emits one event per frontend that
// references the backend, so a backend shared across multiple frontends
// produces multiple events and all relevant VIPs are reconciled).
// Frontend-transition events are observational only — the dataplane work
// they would imply has already been done by the backend-transition event
// that triggered them.
//
// The handler consults the VPP client's warmup tracker before doing any
// dataplane work. During the startup warmup window the reconciler is
// either fully suppressed (inside min-delay) or per-VIP gated (the
// frontend must have been released before events for it pass through).
// When a transition fires for a VIP that isn't yet released but whose
// backends have now all settled, the handler opportunistically releases
// it here so the per-VIP release fires on the event rather than waiting
// for the next warmup poll tick.
func (r *Reconciler) handle(ev checker.Event) {
if ev.FrontendTransition != nil {
return // frontend-only event; no dataplane work
}
if ev.FrontendName == "" {
return
}
cfg := r.stateSrc.Config()
if cfg == nil {
return
}
w := r.client.warmup
feName := ev.FrontendName
if !w.isReleased(feName) {
// Warmup is still gating this frontend. Decide whether to
// release it now, or defer until a later event / the warmup
// poll / the final max-delay SyncLBStateAll.
if w.inMinDelay() {
slog.Debug("vpp-reconciler-suppressed-min-delay",
"frontend", feName,
"backend", ev.BackendName,
"from", ev.Transition.From.String(),
"to", ev.Transition.To.String(),
"elapsed", w.elapsed(),
"reason", "inside vpp.lb.startup-min-delay window")
return
}
fe, ok := cfg.Frontends[feName]
if !ok {
return
}
if !allBackendsKnown(fe, r.stateSrc) {
slog.Debug("vpp-reconciler-suppressed-warmup",
"frontend", feName,
"backend", ev.BackendName,
"from", ev.Transition.From.String(),
"to", ev.Transition.To.String(),
"elapsed", w.elapsed(),
"reason", "frontend has backends still in StateUnknown; "+
"waiting for all to settle or for max-delay watchdog")
return
}
if !w.tryRelease(feName) {
// Lost a race with finishAll or another release caller.
// Either way the next isReleased call will return true, but
// for this event we've already done the right thing by
// letting the next few lines re-check and proceed.
return
}
slog.Info("vpp-lb-warmup-release",
"frontend", feName,
"trigger", "reconciler-event",
"backend", ev.BackendName,
"elapsed", w.elapsed())
}
slog.Debug("vpp-reconciler-event",
"frontend", feName,
"backend", ev.BackendName,
"from", ev.Transition.From.String(),
"to", ev.Transition.To.String())
if err := r.client.SyncLBStateVIP(cfg, feName, ""); err != nil {
if errors.Is(err, ErrFrontendNotFound) {
// Frontend was removed between the event being emitted and
// us handling it; a periodic SyncLBStateAll will clean it up.
return
}
slog.Warn("vpp-reconciler-error",
"frontend", feName,
"backend", ev.BackendName,
"from", ev.Transition.From.String(),
"to", ev.Transition.To.String(),
"err", err)
}
}