Files
vpp-maglev/internal/vpp/client.go
Pim van Pelt 6d78921edd Restart-neutral VPP LB sync; deterministic AS ordering; maglevt cadence; v0.9.5
Three reliability fixes bundled with docs updates.

Restart-neutral VPP LB sync via a startup warmup window
(internal/vpp/warmup.go). Before this, a maglevd restart would
immediately issue SyncLBStateAll with every backend still in
StateUnknown — mapped through BackendEffectiveWeight to weight
0 — and VPP would black-hole all new flows until the checker's
rise counters caught up, several seconds later. The new warmup
tracker owns a process-wide state machine gated by two config
knobs: vpp.lb.startup-min-delay (default 5s) is an absolute
hands-off window during which neither the periodic sync loop
nor the per-transition reconciler touches VPP; vpp.lb.
startup-max-delay (default 30s) is the watchdog for a per-VIP
release phase that runs between the two, releasing each frontend
as soon as every backend it references reaches a non-Unknown
state. At max-delay a final SyncLBStateAll runs for any stragglers
still in Unknown. Config reload does not reset the clock. Both
delays can be set to 0 to disable the warmup entirely. The
reconciler's suppressed-during-warmup events log at DEBUG so
operators can still see them with --log-level debug. Unit tests
cover the tracker state machine, allBackendsKnown precondition,
and the zero-delay escape hatch.

Deterministic AS iteration in VPP LB sync. reconcileVIP and
recreateVIP now issue their lb_as_add_del / lb_as_set_weight
calls in numeric IP order (IPv4 before IPv6, ascending within
each family) via a new sortedIPKeys helper, instead of Go map
iteration order. VPP's LB plugin breaks per-bucket ties in the
Maglev lookup table by insertion position in its internal AS
vec, so without a stable call order two maglevd instances on
the same config could push identical AS sets into VPP in
different orders and produce divergent new-flow tables. Numeric
sort is used in preference to lexicographic so the sync log
stays human-readable: string order would place 10.0.0.10 before
10.0.0.2, and the same problem in v6. Unit tests cover empty,
single, v4/v6 numeric vs lexicographic, v4-before-v6 grouping,
a 1000-iteration stability loop against Go's randomised map
iteration, insertion-order invariance, and the desiredAS
call-site type.

maglevt interval fix. runProbeLoop used to sleep the full
jittered interval after every probe, so a 100ms --interval
with a 30ms probe actually produced a 130ms period. The sleep
now subtracts result.Duration so cadence matches the flag.
Probes that overrun clamp sleep to zero and fire the next
probe immediately without trying to catch up on missed cycles
— a slow backend doesn't get flooded with back-to-back probes
at the moment it's already struggling.

Docs. config-guide now documents flush-on-down and the new
startup-min-delay / startup-max-delay knobs; user-guide's
maglevd section explains the restart-neutrality property, the
three warmup phases, and the relevant slog lines operators
should watch for during a bounce.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 12:53:42 +02:00

441 lines
13 KiB
Go

// Copyright (c) 2026, Pim van Pelt <pim@ipng.ch>
// Package vpp manages the connection to a local VPP instance over its
// binary API and stats sockets. The Client reconnects automatically when
// VPP restarts.
package vpp
import (
"context"
"log/slog"
"sync"
"sync/atomic"
"time"
"go.fd.io/govpp/adapter"
"go.fd.io/govpp/adapter/socketclient"
"go.fd.io/govpp/adapter/statsclient"
"go.fd.io/govpp/binapi/vpe"
"go.fd.io/govpp/core"
"git.ipng.ch/ipng/vpp-maglev/internal/config"
"git.ipng.ch/ipng/vpp-maglev/internal/health"
"git.ipng.ch/ipng/vpp-maglev/internal/metrics"
lb "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/lb"
)
// StateSource provides a live view of the running config and the current
// health state of each backend. checker.Checker satisfies this interface via
// its Config() and BackendState() methods. Decoupling via an interface avoids
// an import cycle with the checker package.
type StateSource interface {
Config() *config.Config
BackendState(name string) (health.State, bool)
}
const retryInterval = 5 * time.Second
const pingInterval = 10 * time.Second
const defaultLBSyncInterval = 30 * time.Second
// Info holds VPP version and connection metadata, populated on connect.
type Info struct {
Version string
BuildDate string
BuildDirectory string
PID uint32
BootTime time.Time // when VPP started (from /sys/boottime stats counter)
ConnectedSince time.Time // when maglevd connected to VPP
}
// Client manages connections to both the VPP API and stats sockets.
// Both connections are treated as a unit: if either drops, both are
// torn down and re-established together.
type Client struct {
apiAddr string
statsAddr string
mu sync.Mutex
apiConn *core.Connection
statsConn *core.StatsConnection
statsClient adapter.StatsAPI // raw adapter for DumpStats
info Info // populated on successful connect
stateSrc StateSource // optional; enables periodic LB sync
lastLBConf *lb.LbConf // cached last-pushed lb_conf (dedup)
// lbStatsSnap is the most recent per-VIP stats snapshot captured by
// lbStatsLoop. Published as an immutable slice via atomic.Pointer so
// Prometheus scrapes (metrics.Collector.Collect) don't take any lock.
lbStatsSnap atomic.Pointer[[]metrics.VIPStatEntry]
// warmup gates every VPP LB sync call (both periodic and event-
// driven) during the first StartupMaxDelay seconds after Client
// construction. See warmup.go for the state machine. Process-wide,
// not per-connection: reconnects do not re-enter warmup.
warmup *warmupTracker
}
// SetStateSource attaches a live config + health state source. When set, the
// VPP client runs a periodic SyncLBStateAll loop (at the interval from
// cfg.VPP.LB.SyncInterval) for as long as the VPP connection is up, and
// state-aware weights are used throughout the sync path. Must be called
// before Run.
func (c *Client) SetStateSource(src StateSource) {
c.mu.Lock()
defer c.mu.Unlock()
c.stateSrc = src
}
// getStateSource returns the registered state source under the mutex.
func (c *Client) getStateSource() StateSource {
c.mu.Lock()
defer c.mu.Unlock()
return c.stateSrc
}
// New creates a Client for the given socket paths. The warmup tracker's
// clock starts here — the restart-neutrality window is measured from the
// moment the Client is constructed, which in practice is a few tens of
// milliseconds after process start (see cmd/maglevd/main.go startup
// sequence). If main.go ever grows a long-running initialisation step
// before vpp.New(), the warmup clock should be moved accordingly.
func New(apiAddr, statsAddr string) *Client {
return &Client{
apiAddr: apiAddr,
statsAddr: statsAddr,
warmup: newWarmupTracker(),
}
}
// Run connects to VPP and maintains the connection until ctx is cancelled.
// If VPP is unavailable or restarts, Run reconnects automatically.
func (c *Client) Run(ctx context.Context) {
for {
if err := c.connect(); err != nil {
slog.Debug("vpp-connect-failed", "err", err)
select {
case <-ctx.Done():
return
case <-time.After(retryInterval):
continue
}
}
// Fetch version info and record connect time.
// fetchInfo uses NewAPIChannel and statsClient which both take c.mu,
// so we must not hold c.mu here.
info := c.fetchInfo()
c.mu.Lock()
c.info = info
c.mu.Unlock()
slog.Info("vpp-connect", "version", c.info.Version,
"build-date", c.info.BuildDate,
"pid", c.info.PID,
"api", c.apiAddr, "stats", c.statsAddr)
// Read the current LB plugin state so we can log what's programmed.
if state, err := c.GetLBStateAll(); err != nil {
slog.Warn("vpp-lb-read-failed", "err", err)
} else {
totalAS := 0
for _, v := range state.VIPs {
totalAS += len(v.ASes)
}
slog.Info("vpp-lb-state",
"vips", len(state.VIPs),
"application-servers", totalAS,
"sticky-buckets-per-core", state.Conf.StickyBucketsPerCore,
"flow-timeout", state.Conf.FlowTimeout)
}
// Push global LB conf (src addresses, buckets, timeout) from the
// running config. On startup this is the initial set; on reconnect
// (VPP restart) VPP has forgotten everything, so we set it again.
c.mu.Lock()
src := c.stateSrc
c.mu.Unlock()
if src != nil {
if cfg := src.Config(); cfg != nil {
if err := c.SetLBConf(cfg); err != nil {
slog.Warn("vpp-lb-conf-set-failed", "err", err)
}
}
}
// Start the LB sync and stats loops for as long as the connection
// is up. Both exit when connCtx is cancelled.
connCtx, connCancel := context.WithCancel(ctx)
go c.lbSyncLoop(connCtx)
go c.lbStatsLoop(connCtx)
// Hold the connection, pinging periodically to detect VPP restarts.
c.monitor(ctx)
connCancel()
// If ctx is done we're shutting down; otherwise VPP dropped and we retry.
c.disconnect()
if ctx.Err() != nil {
return
}
slog.Warn("vpp-disconnect", "msg", "connection lost, reconnecting")
}
}
// lbSyncLoop drives the periodic VPP LB sync. On first entry (after the
// first successful VPP connect) it runs the warmup phase via runWarmup,
// which enforces the restart-neutrality window and handles the first full
// sync itself. Subsequent reconnect entries find warmup.allDone == true
// and skip straight to the periodic ticker. Exits when ctx is cancelled.
//
// The warmup phase is intentionally run from inside this loop rather
// than from Run: it needs the state source registered (which happens
// only after SetStateSource) and it wants to be cancelled by the same
// connCtx that cancels the stats loop on disconnect, so a VPP drop
// during warmup doesn't leak a goroutine.
func (c *Client) lbSyncLoop(ctx context.Context) {
src := c.getStateSource()
if src == nil {
return // no state source registered; nothing to sync
}
// Warmup phase: runs once per process. On the first successful
// VPP connect, runWarmup handles the entire window (min-delay
// hands-off, per-VIP release phase, final SyncLBStateAll at
// max-delay) and calls finishAll before returning. On any
// reconnect after that, the gate is already open and we skip
// straight to the periodic ticker. A VPP drop mid-warmup
// returns from runWarmup via ctx.Done without closing the gate;
// the next reconnect re-enters runWarmup, which re-reads the
// process-relative clock and picks up wherever it left off.
if !c.warmup.isAllDone() {
c.runWarmup(ctx)
if ctx.Err() != nil {
return
}
}
cfg := src.Config()
if cfg == nil {
return
}
interval := cfg.VPP.LB.SyncInterval
if interval <= 0 {
interval = defaultLBSyncInterval
}
next := time.Now().Add(interval)
for {
wait := time.Until(next)
if wait < 0 {
wait = 0
}
select {
case <-ctx.Done():
return
case <-time.After(wait):
}
cfg = src.Config()
if cfg == nil {
next = time.Now().Add(defaultLBSyncInterval)
continue
}
interval = cfg.VPP.LB.SyncInterval
if interval <= 0 {
interval = defaultLBSyncInterval
}
if err := c.SyncLBStateAll(cfg); err != nil {
slog.Warn("vpp-lb-sync-error", "err", err)
}
next = time.Now().Add(interval)
}
}
// IsConnected returns true if both API and stats connections are active.
func (c *Client) IsConnected() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.apiConn != nil && c.statsConn != nil
}
// GetInfo returns the VPP version and connection metadata, or an error
// if VPP is not connected.
func (c *Client) GetInfo() (Info, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.apiConn == nil {
return Info{}, errNotConnected
}
return c.info, nil
}
// VIPStats satisfies metrics.VPPSource. It returns the latest snapshot of
// per-VIP LB stats-segment counters captured by lbStatsLoop. Returns nil
// until the first scrape completes, or after a disconnect (the pointer is
// cleared when the connection drops).
func (c *Client) VIPStats() []metrics.VIPStatEntry {
p := c.lbStatsSnap.Load()
if p == nil {
return nil
}
return *p
}
// VPPInfo satisfies metrics.VPPSource. It returns a copy of the cached
// connection info as a metrics-local struct so the metrics package doesn't
// need to import internal/vpp. Second return is false when VPP is not
// connected (the collector skips the vpp_* gauges in that case).
func (c *Client) VPPInfo() (metrics.VPPInfo, bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.apiConn == nil {
return metrics.VPPInfo{}, false
}
return metrics.VPPInfo{
Version: c.info.Version,
BuildDate: c.info.BuildDate,
PID: c.info.PID,
BootTime: c.info.BootTime,
ConnectedSince: c.info.ConnectedSince,
}, true
}
// connect establishes both API and stats connections. If either fails,
// both are torn down.
func (c *Client) connect() error {
sc := socketclient.NewVppClient(c.apiAddr)
sc.SetClientName("vpp-maglev")
apiConn, err := core.Connect(sc)
if err != nil {
return err
}
stc := statsclient.NewStatsClient(c.statsAddr)
statsConn, err := core.ConnectStats(stc)
if err != nil {
safeDisconnectAPI(apiConn)
return err
}
c.mu.Lock()
c.apiConn = apiConn
c.statsConn = statsConn
c.statsClient = stc
c.mu.Unlock()
return nil
}
// disconnect tears down both connections.
func (c *Client) disconnect() {
c.mu.Lock()
apiConn := c.apiConn
statsConn := c.statsConn
c.apiConn = nil
c.statsConn = nil
c.statsClient = nil
c.info = Info{}
c.lastLBConf = nil // force re-push of lb_conf on reconnect
c.mu.Unlock()
c.lbStatsSnap.Store(nil)
safeDisconnectAPI(apiConn)
safeDisconnectStats(statsConn)
}
// monitor blocks until the context is cancelled or a liveness ping fails.
func (c *Client) monitor(ctx context.Context) {
ticker := time.NewTicker(pingInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !c.ping() {
return
}
}
}
}
// ping sends a control_ping to VPP and returns true if it succeeds.
func (c *Client) ping() bool {
ch, err := c.apiChannel()
if err != nil {
return false
}
defer ch.Close()
req := &core.ControlPing{}
reply := &core.ControlPingReply{}
if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
slog.Debug("vpp-ping-failed", "err", err)
return false
}
return true
}
// fetchInfo queries VPP for version info, PID, and boot time.
// Must be called after connect succeeds (apiConn and statsClient are set).
func (c *Client) fetchInfo() Info {
info := Info{ConnectedSince: time.Now()}
ch, err := c.apiChannel()
if err != nil {
return info
}
defer ch.Close()
ver := &vpe.ShowVersionReply{}
if err := ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(ver); err == nil {
info.Version = ver.Version
info.BuildDate = ver.BuildDate
info.BuildDirectory = ver.BuildDirectory
}
ping := &core.ControlPingReply{}
if err := ch.SendRequest(&core.ControlPing{}).ReceiveReply(ping); err == nil {
info.PID = ping.VpePID
}
// Read VPP boot time from the stats segment.
c.mu.Lock()
sc := c.statsClient
c.mu.Unlock()
if sc != nil {
if entries, err := sc.DumpStats("/sys/boottime"); err == nil {
for _, e := range entries {
if s, ok := e.Data.(adapter.ScalarStat); ok && s != 0 {
info.BootTime = time.Unix(int64(s), 0)
}
}
}
}
return info
}
// safeDisconnectAPI disconnects an API connection, recovering from any panic
// that GoVPP may raise on a stale connection.
func safeDisconnectAPI(conn *core.Connection) {
if conn == nil {
return
}
defer func() { recover() }() //nolint:errcheck
conn.Disconnect()
}
// safeDisconnectStats disconnects a stats connection, recovering from panics.
func safeDisconnectStats(conn *core.StatsConnection) {
if conn == nil {
return
}
defer func() { recover() }() //nolint:errcheck
conn.Disconnect()
}
type vppError struct{ msg string }
func (e *vppError) Error() string { return e.msg }
var errNotConnected = &vppError{msg: "VPP API connection not established"}