Files
vpp-maglev/internal/vpp/client.go
Pim van Pelt fb62532fd5 VPP LB counters, src-ip-sticky, and frontend state aggregation
New feature: per-VIP / per-backend runtime counters
  * New GetVPPLBCounters RPC serving an in-process snapshot refreshed
    by a 5s scrape loop (internal/vpp/lbstats.go). Each cycle pulls
    the LB plugin's four SimpleCounters (next, first, untracked,
    no-server) plus the FIB /net/route/to CombinedCounter for every
    VIP and every backend host prefix via a single DumpStats call.
  * FIB stats-index discovery via ip_route_lookup (internal/vpp/
    fibstats.go); per-worker reduction happens in the collector.
  * Prometheus collector exports vip_packets_total (kind label),
    vip_route_{packets,bytes}_total, and backend_route_{packets,
    bytes}_total. Metrics source interface extended with VIPStats /
    BackendRouteStats; vpp.Client publishes snapshots via
    atomic.Pointer and clears them on disconnect.
  * New 'show vpp lb counters' CLI command. The 'show vpp lbstate'
    and 'sync vpp lbstate' commands are restructured under 'show
    vpp lb {state,counters}' / 'sync vpp lb state' to make room
    for the new verb.

New feature: src-ip-sticky frontends
  * New frontend YAML key 'src-ip-sticky' (bool). Plumbed through
    config.Frontend, desiredVIP, and the lb_add_del_vip_v2 call.
  * Reflected in gRPC FrontendInfo.src_ip_sticky and VPPLBVIP.
    src_ip_sticky, and shown in 'show vpp lb state' output.
  * Scraped back from VPP by parsing 'show lb vips verbose' through
    cli_inband — lb_vip_details does not expose the flag. The same
    scrape also recovers the LB pool index for each VIP, which the
    stats-segment counters are keyed on. This is a documented
    temporary workaround until VPP ships an lb_vip_v2_dump.
  * src_ip_sticky cannot be mutated on a live VIP, so a flipped flag
    triggers a tear-down-and-recreate in reconcileVIP (ASes deleted
    with flush, VIP deleted, then re-added). Flip is logged.

New feature: frontend state aggregation and events
  * New health.FrontendState (unknown/up/down) and FrontendTransition
    types. A frontend is 'up' iff at least one backend has a nonzero
    effective weight, 'unknown' iff no backend has real state yet,
    and 'down' otherwise.
  * Checker tracks per-frontend aggregate state, recomputing after
    each backend transition and emitting a frontend-transition Event
    on change. Reload drops entries for removed frontends.
  * checker.Event gains an optional FrontendTransition pointer;
    backend- vs. frontend-transition events are demultiplexed on
    that field.
  * WatchEvents now sends an initial snapshot of frontend state on
    connect (mirroring the existing backend snapshot), subscribes
    once to the checker stream, and fans out to backend/frontend
    handlers based on the client's filter flags. The proto
    FrontendEvent message grows name + transition fields.
  * New Checker.FrontendState accessor.

Refactor: pure health helpers
  * Moved the priority-failover selector and the (pool idx, active
    pool, state, cfg weight) → (vpp weight, flush) mapping out of
    internal/vpp/lbsync.go into a new internal/health/weights.go so
    the checker can reuse them for frontend-state computation
    without importing internal/vpp.
  * New functions: health.ActivePoolIndex, BackendEffectiveWeight,
    EffectiveWeights, ComputeFrontendState. lbsync.go now calls
    these directly; vpp.EffectiveWeights is a thin wrapper over
    health.EffectiveWeights retained for the gRPC observability
    path. Fully unit-tested in internal/health/weights_test.go.

maglevc polish
  * --color default is now mode-aware: on in the interactive shell,
    off in one-shot mode so piped output is script-safe. Explicit
    --color=true/false still overrides.
  * New stripHostMask helper drops /32 and /128 from VIP display;
    non-host prefixes pass through unchanged.
  * Counter table column order fixed (first before next) and
    packets/bytes columns renamed to fib-packets/fib-bytes to
    clarify they come from the FIB, not the LB plugin.

Docs
  * config-guide: document src-ip-sticky, including the VIP
    recreate-on-change caveat.
  * user-guide, maglevc.1, maglevd.8: updated command tree, new
    counters command, color defaults, and the src-ip-sticky field.
2026-04-12 16:07:39 +02:00

415 lines
12 KiB
Go

// Copyright (c) 2026, Pim van Pelt <pim@ipng.ch>
// Package vpp manages the connection to a local VPP instance over its
// binary API and stats sockets. The Client reconnects automatically when
// VPP restarts.
package vpp
import (
"context"
"log/slog"
"sync"
"sync/atomic"
"time"
"go.fd.io/govpp/adapter"
"go.fd.io/govpp/adapter/socketclient"
"go.fd.io/govpp/adapter/statsclient"
"go.fd.io/govpp/binapi/vpe"
"go.fd.io/govpp/core"
"git.ipng.ch/ipng/vpp-maglev/internal/config"
"git.ipng.ch/ipng/vpp-maglev/internal/health"
"git.ipng.ch/ipng/vpp-maglev/internal/metrics"
lb "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/lb"
)
// StateSource provides a live view of the running config and the current
// health state of each backend. checker.Checker satisfies this interface via
// its Config() and BackendState() methods. Decoupling via an interface avoids
// an import cycle with the checker package.
type StateSource interface {
Config() *config.Config
BackendState(name string) (health.State, bool)
}
const retryInterval = 5 * time.Second
const pingInterval = 10 * time.Second
const defaultLBSyncInterval = 30 * time.Second
// Info holds VPP version and connection metadata, populated on connect.
type Info struct {
Version string
BuildDate string
BuildDirectory string
PID uint32
BootTime time.Time // when VPP started (from /sys/boottime stats counter)
ConnectedSince time.Time // when maglevd connected to VPP
}
// Client manages connections to both the VPP API and stats sockets.
// Both connections are treated as a unit: if either drops, both are
// torn down and re-established together.
type Client struct {
apiAddr string
statsAddr string
mu sync.Mutex
apiConn *core.Connection
statsConn *core.StatsConnection
statsClient adapter.StatsAPI // raw adapter for DumpStats
info Info // populated on successful connect
stateSrc StateSource // optional; enables periodic LB sync
lastLBConf *lb.LbConf // cached last-pushed lb_conf (dedup)
// lbStatsSnap is the most recent per-VIP stats snapshot captured by
// lbStatsLoop. Published as an immutable slice via atomic.Pointer so
// Prometheus scrapes (metrics.Collector.Collect) don't take any lock.
lbStatsSnap atomic.Pointer[[]metrics.VIPStatEntry]
// backendRouteSnap is the most recent per-backend FIB stats snapshot
// captured by lbStatsLoop. Same atomic-pointer publish pattern as
// lbStatsSnap; see logBackendRouteStats in fibstats.go.
backendRouteSnap atomic.Pointer[[]metrics.BackendRouteStat]
}
// SetStateSource attaches a live config + health state source. When set, the
// VPP client runs a periodic SyncLBStateAll loop (at the interval from
// cfg.VPP.LB.SyncInterval) for as long as the VPP connection is up, and
// state-aware weights are used throughout the sync path. Must be called
// before Run.
func (c *Client) SetStateSource(src StateSource) {
c.mu.Lock()
defer c.mu.Unlock()
c.stateSrc = src
}
// getStateSource returns the registered state source under the mutex.
func (c *Client) getStateSource() StateSource {
c.mu.Lock()
defer c.mu.Unlock()
return c.stateSrc
}
// New creates a Client for the given socket paths.
func New(apiAddr, statsAddr string) *Client {
return &Client{apiAddr: apiAddr, statsAddr: statsAddr}
}
// Run connects to VPP and maintains the connection until ctx is cancelled.
// If VPP is unavailable or restarts, Run reconnects automatically.
func (c *Client) Run(ctx context.Context) {
for {
if err := c.connect(); err != nil {
slog.Debug("vpp-connect-failed", "err", err)
select {
case <-ctx.Done():
return
case <-time.After(retryInterval):
continue
}
}
// Fetch version info and record connect time.
// fetchInfo uses NewAPIChannel and statsClient which both take c.mu,
// so we must not hold c.mu here.
info := c.fetchInfo()
c.mu.Lock()
c.info = info
c.mu.Unlock()
slog.Info("vpp-connect", "version", c.info.Version,
"build-date", c.info.BuildDate,
"pid", c.info.PID,
"api", c.apiAddr, "stats", c.statsAddr)
// Read the current LB plugin state so we can log what's programmed.
if state, err := c.GetLBStateAll(); err != nil {
slog.Warn("vpp-lb-read-failed", "err", err)
} else {
totalAS := 0
for _, v := range state.VIPs {
totalAS += len(v.ASes)
}
slog.Info("vpp-lb-state",
"vips", len(state.VIPs),
"application-servers", totalAS,
"sticky-buckets-per-core", state.Conf.StickyBucketsPerCore,
"flow-timeout", state.Conf.FlowTimeout)
}
// Push global LB conf (src addresses, buckets, timeout) from the
// running config. On startup this is the initial set; on reconnect
// (VPP restart) VPP has forgotten everything, so we set it again.
c.mu.Lock()
src := c.stateSrc
c.mu.Unlock()
if src != nil {
if cfg := src.Config(); cfg != nil {
if err := c.SetLBConf(cfg); err != nil {
slog.Warn("vpp-lb-conf-set-failed", "err", err)
}
}
}
// Start the LB sync and stats loops for as long as the connection
// is up. Both exit when connCtx is cancelled.
connCtx, connCancel := context.WithCancel(ctx)
go c.lbSyncLoop(connCtx)
go c.lbStatsLoop(connCtx)
// Hold the connection, pinging periodically to detect VPP restarts.
c.monitor(ctx)
connCancel()
// If ctx is done we're shutting down; otherwise VPP dropped and we retry.
c.disconnect()
if ctx.Err() != nil {
return
}
slog.Warn("vpp-disconnect", "msg", "connection lost, reconnecting")
}
}
// lbSyncLoop periodically runs SyncLBStateAll to catch drift between the
// maglev config and the VPP dataplane. The first run happens immediately
// on loop start (VPP has just connected, so any pre-existing state needs
// reconciliation). Subsequent runs fire every cfg.VPP.LB.SyncInterval.
// Exits when ctx is cancelled.
func (c *Client) lbSyncLoop(ctx context.Context) {
src := c.getStateSource()
if src == nil {
return // no state source registered; nothing to sync
}
// next-run timestamp starts at "now" so the first tick is immediate.
next := time.Now()
for {
wait := time.Until(next)
if wait < 0 {
wait = 0
}
select {
case <-ctx.Done():
return
case <-time.After(wait):
}
cfg := src.Config()
if cfg == nil {
next = time.Now().Add(defaultLBSyncInterval)
continue
}
interval := cfg.VPP.LB.SyncInterval
if interval <= 0 {
interval = defaultLBSyncInterval
}
if err := c.SyncLBStateAll(cfg); err != nil {
slog.Warn("vpp-lbsync-error", "err", err)
}
next = time.Now().Add(interval)
}
}
// IsConnected returns true if both API and stats connections are active.
func (c *Client) IsConnected() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.apiConn != nil && c.statsConn != nil
}
// GetInfo returns the VPP version and connection metadata, or an error
// if VPP is not connected.
func (c *Client) GetInfo() (Info, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.apiConn == nil {
return Info{}, errNotConnected
}
return c.info, nil
}
// VIPStats satisfies metrics.VPPSource. It returns the latest snapshot of
// per-VIP LB stats-segment counters captured by lbStatsLoop. Returns nil
// until the first scrape completes, or after a disconnect (the pointer is
// cleared when the connection drops).
func (c *Client) VIPStats() []metrics.VIPStatEntry {
p := c.lbStatsSnap.Load()
if p == nil {
return nil
}
return *p
}
// BackendRouteStats satisfies metrics.VPPSource. It returns the latest
// snapshot of per-backend FIB combined counters (/net/route/to) captured
// by lbStatsLoop. Returns nil until the first scrape completes, or after
// a disconnect.
func (c *Client) BackendRouteStats() []metrics.BackendRouteStat {
p := c.backendRouteSnap.Load()
if p == nil {
return nil
}
return *p
}
// VPPInfo satisfies metrics.VPPSource. It returns a copy of the cached
// connection info as a metrics-local struct so the metrics package doesn't
// need to import internal/vpp. Second return is false when VPP is not
// connected (the collector skips the vpp_* gauges in that case).
func (c *Client) VPPInfo() (metrics.VPPInfo, bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.apiConn == nil {
return metrics.VPPInfo{}, false
}
return metrics.VPPInfo{
Version: c.info.Version,
BuildDate: c.info.BuildDate,
PID: c.info.PID,
BootTime: c.info.BootTime,
ConnectedSince: c.info.ConnectedSince,
}, true
}
// connect establishes both API and stats connections. If either fails,
// both are torn down.
func (c *Client) connect() error {
sc := socketclient.NewVppClient(c.apiAddr)
sc.SetClientName("vpp-maglev")
apiConn, err := core.Connect(sc)
if err != nil {
return err
}
stc := statsclient.NewStatsClient(c.statsAddr)
statsConn, err := core.ConnectStats(stc)
if err != nil {
safeDisconnectAPI(apiConn)
return err
}
c.mu.Lock()
c.apiConn = apiConn
c.statsConn = statsConn
c.statsClient = stc
c.mu.Unlock()
return nil
}
// disconnect tears down both connections.
func (c *Client) disconnect() {
c.mu.Lock()
apiConn := c.apiConn
statsConn := c.statsConn
c.apiConn = nil
c.statsConn = nil
c.statsClient = nil
c.info = Info{}
c.lastLBConf = nil // force re-push of lb_conf on reconnect
c.mu.Unlock()
c.lbStatsSnap.Store(nil)
c.backendRouteSnap.Store(nil)
safeDisconnectAPI(apiConn)
safeDisconnectStats(statsConn)
}
// monitor blocks until the context is cancelled or a liveness ping fails.
func (c *Client) monitor(ctx context.Context) {
ticker := time.NewTicker(pingInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !c.ping() {
return
}
}
}
}
// ping sends a control_ping to VPP and returns true if it succeeds.
func (c *Client) ping() bool {
ch, err := c.apiChannel()
if err != nil {
return false
}
defer ch.Close()
req := &core.ControlPing{}
reply := &core.ControlPingReply{}
if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
slog.Debug("vpp-ping-failed", "err", err)
return false
}
return true
}
// fetchInfo queries VPP for version info, PID, and boot time.
// Must be called after connect succeeds (apiConn and statsClient are set).
func (c *Client) fetchInfo() Info {
info := Info{ConnectedSince: time.Now()}
ch, err := c.apiChannel()
if err != nil {
return info
}
defer ch.Close()
ver := &vpe.ShowVersionReply{}
if err := ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(ver); err == nil {
info.Version = ver.Version
info.BuildDate = ver.BuildDate
info.BuildDirectory = ver.BuildDirectory
}
ping := &core.ControlPingReply{}
if err := ch.SendRequest(&core.ControlPing{}).ReceiveReply(ping); err == nil {
info.PID = ping.VpePID
}
// Read VPP boot time from the stats segment.
c.mu.Lock()
sc := c.statsClient
c.mu.Unlock()
if sc != nil {
if entries, err := sc.DumpStats("/sys/boottime"); err == nil {
for _, e := range entries {
if s, ok := e.Data.(adapter.ScalarStat); ok && s != 0 {
info.BootTime = time.Unix(int64(s), 0)
}
}
}
}
return info
}
// safeDisconnectAPI disconnects an API connection, recovering from any panic
// that GoVPP may raise on a stale connection.
func safeDisconnectAPI(conn *core.Connection) {
if conn == nil {
return
}
defer func() { recover() }() //nolint:errcheck
conn.Disconnect()
}
// safeDisconnectStats disconnects a stats connection, recovering from panics.
func safeDisconnectStats(conn *core.StatsConnection) {
if conn == nil {
return
}
defer func() { recover() }() //nolint:errcheck
conn.Disconnect()
}
type vppError struct{ msg string }
func (e *vppError) Error() string { return e.msg }
var errNotConnected = &vppError{msg: "VPP API connection not established"}