Bump VERSION to 1.0.0 and cut the first tagged release of vpp-maglev. Also in this commit: - maglevc: MAGLEV_SERVER env var as an alternative to the --server flag, matching the MAGLEV_CONFIG / MAGLEV_GRPC_ADDR convention on the other binaries. The flag takes precedence when both are set. - Rename cmd/maglevd -> cmd/server and cmd/maglevc -> cmd/client so the source directory names are decoupled from binary names (the frontend and tester commands already followed this convention). Build outputs and the Debian packages are unchanged.
441 lines
13 KiB
Go
441 lines
13 KiB
Go
// Copyright (c) 2026, Pim van Pelt <pim@ipng.ch>
|
|
|
|
// Package vpp manages the connection to a local VPP instance over its
|
|
// binary API and stats sockets. The Client reconnects automatically when
|
|
// VPP restarts.
|
|
package vpp
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"go.fd.io/govpp/adapter"
|
|
"go.fd.io/govpp/adapter/socketclient"
|
|
"go.fd.io/govpp/adapter/statsclient"
|
|
"go.fd.io/govpp/binapi/vpe"
|
|
"go.fd.io/govpp/core"
|
|
|
|
"git.ipng.ch/ipng/vpp-maglev/internal/config"
|
|
"git.ipng.ch/ipng/vpp-maglev/internal/health"
|
|
"git.ipng.ch/ipng/vpp-maglev/internal/metrics"
|
|
lb "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/lb"
|
|
)
|
|
|
|
// StateSource provides a live view of the running config and the current
|
|
// health state of each backend. checker.Checker satisfies this interface via
|
|
// its Config() and BackendState() methods. Decoupling via an interface avoids
|
|
// an import cycle with the checker package.
|
|
type StateSource interface {
|
|
Config() *config.Config
|
|
BackendState(name string) (health.State, bool)
|
|
}
|
|
|
|
const retryInterval = 5 * time.Second
|
|
const pingInterval = 10 * time.Second
|
|
const defaultLBSyncInterval = 30 * time.Second
|
|
|
|
// Info holds VPP version and connection metadata, populated on connect.
|
|
type Info struct {
|
|
Version string
|
|
BuildDate string
|
|
BuildDirectory string
|
|
PID uint32
|
|
BootTime time.Time // when VPP started (from /sys/boottime stats counter)
|
|
ConnectedSince time.Time // when maglevd connected to VPP
|
|
}
|
|
|
|
// Client manages connections to both the VPP API and stats sockets.
|
|
// Both connections are treated as a unit: if either drops, both are
|
|
// torn down and re-established together.
|
|
type Client struct {
|
|
apiAddr string
|
|
statsAddr string
|
|
|
|
mu sync.Mutex
|
|
apiConn *core.Connection
|
|
statsConn *core.StatsConnection
|
|
statsClient adapter.StatsAPI // raw adapter for DumpStats
|
|
info Info // populated on successful connect
|
|
stateSrc StateSource // optional; enables periodic LB sync
|
|
lastLBConf *lb.LbConf // cached last-pushed lb_conf (dedup)
|
|
|
|
// lbStatsSnap is the most recent per-VIP stats snapshot captured by
|
|
// lbStatsLoop. Published as an immutable slice via atomic.Pointer so
|
|
// Prometheus scrapes (metrics.Collector.Collect) don't take any lock.
|
|
lbStatsSnap atomic.Pointer[[]metrics.VIPStatEntry]
|
|
|
|
// warmup gates every VPP LB sync call (both periodic and event-
|
|
// driven) during the first StartupMaxDelay seconds after Client
|
|
// construction. See warmup.go for the state machine. Process-wide,
|
|
// not per-connection: reconnects do not re-enter warmup.
|
|
warmup *warmupTracker
|
|
}
|
|
|
|
// SetStateSource attaches a live config + health state source. When set, the
|
|
// VPP client runs a periodic SyncLBStateAll loop (at the interval from
|
|
// cfg.VPP.LB.SyncInterval) for as long as the VPP connection is up, and
|
|
// state-aware weights are used throughout the sync path. Must be called
|
|
// before Run.
|
|
func (c *Client) SetStateSource(src StateSource) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.stateSrc = src
|
|
}
|
|
|
|
// getStateSource returns the registered state source under the mutex.
|
|
func (c *Client) getStateSource() StateSource {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
return c.stateSrc
|
|
}
|
|
|
|
// New creates a Client for the given socket paths. The warmup tracker's
|
|
// clock starts here — the restart-neutrality window is measured from the
|
|
// moment the Client is constructed, which in practice is a few tens of
|
|
// milliseconds after process start (see cmd/server/main.go startup
|
|
// sequence). If main.go ever grows a long-running initialisation step
|
|
// before vpp.New(), the warmup clock should be moved accordingly.
|
|
func New(apiAddr, statsAddr string) *Client {
|
|
return &Client{
|
|
apiAddr: apiAddr,
|
|
statsAddr: statsAddr,
|
|
warmup: newWarmupTracker(),
|
|
}
|
|
}
|
|
|
|
// Run connects to VPP and maintains the connection until ctx is cancelled.
|
|
// If VPP is unavailable or restarts, Run reconnects automatically.
|
|
func (c *Client) Run(ctx context.Context) {
|
|
for {
|
|
if err := c.connect(); err != nil {
|
|
slog.Debug("vpp-connect-failed", "err", err)
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(retryInterval):
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Fetch version info and record connect time.
|
|
// fetchInfo uses NewAPIChannel and statsClient which both take c.mu,
|
|
// so we must not hold c.mu here.
|
|
info := c.fetchInfo()
|
|
c.mu.Lock()
|
|
c.info = info
|
|
c.mu.Unlock()
|
|
slog.Info("vpp-connect", "version", c.info.Version,
|
|
"build-date", c.info.BuildDate,
|
|
"pid", c.info.PID,
|
|
"api", c.apiAddr, "stats", c.statsAddr)
|
|
|
|
// Read the current LB plugin state so we can log what's programmed.
|
|
if state, err := c.GetLBStateAll(); err != nil {
|
|
slog.Warn("vpp-lb-read-failed", "err", err)
|
|
} else {
|
|
totalAS := 0
|
|
for _, v := range state.VIPs {
|
|
totalAS += len(v.ASes)
|
|
}
|
|
slog.Info("vpp-lb-state",
|
|
"vips", len(state.VIPs),
|
|
"application-servers", totalAS,
|
|
"sticky-buckets-per-core", state.Conf.StickyBucketsPerCore,
|
|
"flow-timeout", state.Conf.FlowTimeout)
|
|
}
|
|
|
|
// Push global LB conf (src addresses, buckets, timeout) from the
|
|
// running config. On startup this is the initial set; on reconnect
|
|
// (VPP restart) VPP has forgotten everything, so we set it again.
|
|
c.mu.Lock()
|
|
src := c.stateSrc
|
|
c.mu.Unlock()
|
|
if src != nil {
|
|
if cfg := src.Config(); cfg != nil {
|
|
if err := c.SetLBConf(cfg); err != nil {
|
|
slog.Warn("vpp-lb-conf-set-failed", "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Start the LB sync and stats loops for as long as the connection
|
|
// is up. Both exit when connCtx is cancelled.
|
|
connCtx, connCancel := context.WithCancel(ctx)
|
|
go c.lbSyncLoop(connCtx)
|
|
go c.lbStatsLoop(connCtx)
|
|
|
|
// Hold the connection, pinging periodically to detect VPP restarts.
|
|
c.monitor(ctx)
|
|
connCancel()
|
|
|
|
// If ctx is done we're shutting down; otherwise VPP dropped and we retry.
|
|
c.disconnect()
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
slog.Warn("vpp-disconnect", "msg", "connection lost, reconnecting")
|
|
}
|
|
}
|
|
|
|
// lbSyncLoop drives the periodic VPP LB sync. On first entry (after the
|
|
// first successful VPP connect) it runs the warmup phase via runWarmup,
|
|
// which enforces the restart-neutrality window and handles the first full
|
|
// sync itself. Subsequent reconnect entries find warmup.allDone == true
|
|
// and skip straight to the periodic ticker. Exits when ctx is cancelled.
|
|
//
|
|
// The warmup phase is intentionally run from inside this loop rather
|
|
// than from Run: it needs the state source registered (which happens
|
|
// only after SetStateSource) and it wants to be cancelled by the same
|
|
// connCtx that cancels the stats loop on disconnect, so a VPP drop
|
|
// during warmup doesn't leak a goroutine.
|
|
func (c *Client) lbSyncLoop(ctx context.Context) {
|
|
src := c.getStateSource()
|
|
if src == nil {
|
|
return // no state source registered; nothing to sync
|
|
}
|
|
|
|
// Warmup phase: runs once per process. On the first successful
|
|
// VPP connect, runWarmup handles the entire window (min-delay
|
|
// hands-off, per-VIP release phase, final SyncLBStateAll at
|
|
// max-delay) and calls finishAll before returning. On any
|
|
// reconnect after that, the gate is already open and we skip
|
|
// straight to the periodic ticker. A VPP drop mid-warmup
|
|
// returns from runWarmup via ctx.Done without closing the gate;
|
|
// the next reconnect re-enters runWarmup, which re-reads the
|
|
// process-relative clock and picks up wherever it left off.
|
|
if !c.warmup.isAllDone() {
|
|
c.runWarmup(ctx)
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
cfg := src.Config()
|
|
if cfg == nil {
|
|
return
|
|
}
|
|
interval := cfg.VPP.LB.SyncInterval
|
|
if interval <= 0 {
|
|
interval = defaultLBSyncInterval
|
|
}
|
|
next := time.Now().Add(interval)
|
|
for {
|
|
wait := time.Until(next)
|
|
if wait < 0 {
|
|
wait = 0
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(wait):
|
|
}
|
|
|
|
cfg = src.Config()
|
|
if cfg == nil {
|
|
next = time.Now().Add(defaultLBSyncInterval)
|
|
continue
|
|
}
|
|
interval = cfg.VPP.LB.SyncInterval
|
|
if interval <= 0 {
|
|
interval = defaultLBSyncInterval
|
|
}
|
|
|
|
if err := c.SyncLBStateAll(cfg); err != nil {
|
|
slog.Warn("vpp-lb-sync-error", "err", err)
|
|
}
|
|
next = time.Now().Add(interval)
|
|
}
|
|
}
|
|
|
|
// IsConnected returns true if both API and stats connections are active.
|
|
func (c *Client) IsConnected() bool {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
return c.apiConn != nil && c.statsConn != nil
|
|
}
|
|
|
|
// GetInfo returns the VPP version and connection metadata, or an error
|
|
// if VPP is not connected.
|
|
func (c *Client) GetInfo() (Info, error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if c.apiConn == nil {
|
|
return Info{}, errNotConnected
|
|
}
|
|
return c.info, nil
|
|
}
|
|
|
|
// VIPStats satisfies metrics.VPPSource. It returns the latest snapshot of
|
|
// per-VIP LB stats-segment counters captured by lbStatsLoop. Returns nil
|
|
// until the first scrape completes, or after a disconnect (the pointer is
|
|
// cleared when the connection drops).
|
|
func (c *Client) VIPStats() []metrics.VIPStatEntry {
|
|
p := c.lbStatsSnap.Load()
|
|
if p == nil {
|
|
return nil
|
|
}
|
|
return *p
|
|
}
|
|
|
|
// VPPInfo satisfies metrics.VPPSource. It returns a copy of the cached
|
|
// connection info as a metrics-local struct so the metrics package doesn't
|
|
// need to import internal/vpp. Second return is false when VPP is not
|
|
// connected (the collector skips the vpp_* gauges in that case).
|
|
func (c *Client) VPPInfo() (metrics.VPPInfo, bool) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if c.apiConn == nil {
|
|
return metrics.VPPInfo{}, false
|
|
}
|
|
return metrics.VPPInfo{
|
|
Version: c.info.Version,
|
|
BuildDate: c.info.BuildDate,
|
|
PID: c.info.PID,
|
|
BootTime: c.info.BootTime,
|
|
ConnectedSince: c.info.ConnectedSince,
|
|
}, true
|
|
}
|
|
|
|
// connect establishes both API and stats connections. If either fails,
|
|
// both are torn down.
|
|
func (c *Client) connect() error {
|
|
sc := socketclient.NewVppClient(c.apiAddr)
|
|
sc.SetClientName("vpp-maglev")
|
|
apiConn, err := core.Connect(sc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
stc := statsclient.NewStatsClient(c.statsAddr)
|
|
statsConn, err := core.ConnectStats(stc)
|
|
if err != nil {
|
|
safeDisconnectAPI(apiConn)
|
|
return err
|
|
}
|
|
|
|
c.mu.Lock()
|
|
c.apiConn = apiConn
|
|
c.statsConn = statsConn
|
|
c.statsClient = stc
|
|
c.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// disconnect tears down both connections.
|
|
func (c *Client) disconnect() {
|
|
c.mu.Lock()
|
|
apiConn := c.apiConn
|
|
statsConn := c.statsConn
|
|
c.apiConn = nil
|
|
c.statsConn = nil
|
|
c.statsClient = nil
|
|
c.info = Info{}
|
|
c.lastLBConf = nil // force re-push of lb_conf on reconnect
|
|
c.mu.Unlock()
|
|
c.lbStatsSnap.Store(nil)
|
|
|
|
safeDisconnectAPI(apiConn)
|
|
safeDisconnectStats(statsConn)
|
|
}
|
|
|
|
// monitor blocks until the context is cancelled or a liveness ping fails.
|
|
func (c *Client) monitor(ctx context.Context) {
|
|
ticker := time.NewTicker(pingInterval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if !c.ping() {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ping sends a control_ping to VPP and returns true if it succeeds.
|
|
func (c *Client) ping() bool {
|
|
ch, err := c.apiChannel()
|
|
if err != nil {
|
|
return false
|
|
}
|
|
defer ch.Close()
|
|
|
|
req := &core.ControlPing{}
|
|
reply := &core.ControlPingReply{}
|
|
if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
|
|
slog.Debug("vpp-ping-failed", "err", err)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// fetchInfo queries VPP for version info, PID, and boot time.
|
|
// Must be called after connect succeeds (apiConn and statsClient are set).
|
|
func (c *Client) fetchInfo() Info {
|
|
info := Info{ConnectedSince: time.Now()}
|
|
|
|
ch, err := c.apiChannel()
|
|
if err != nil {
|
|
return info
|
|
}
|
|
defer ch.Close()
|
|
|
|
ver := &vpe.ShowVersionReply{}
|
|
if err := ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(ver); err == nil {
|
|
info.Version = ver.Version
|
|
info.BuildDate = ver.BuildDate
|
|
info.BuildDirectory = ver.BuildDirectory
|
|
}
|
|
|
|
ping := &core.ControlPingReply{}
|
|
if err := ch.SendRequest(&core.ControlPing{}).ReceiveReply(ping); err == nil {
|
|
info.PID = ping.VpePID
|
|
}
|
|
|
|
// Read VPP boot time from the stats segment.
|
|
c.mu.Lock()
|
|
sc := c.statsClient
|
|
c.mu.Unlock()
|
|
if sc != nil {
|
|
if entries, err := sc.DumpStats("/sys/boottime"); err == nil {
|
|
for _, e := range entries {
|
|
if s, ok := e.Data.(adapter.ScalarStat); ok && s != 0 {
|
|
info.BootTime = time.Unix(int64(s), 0)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return info
|
|
}
|
|
|
|
// safeDisconnectAPI disconnects an API connection, recovering from any panic
|
|
// that GoVPP may raise on a stale connection.
|
|
func safeDisconnectAPI(conn *core.Connection) {
|
|
if conn == nil {
|
|
return
|
|
}
|
|
defer func() { recover() }() //nolint:errcheck
|
|
conn.Disconnect()
|
|
}
|
|
|
|
// safeDisconnectStats disconnects a stats connection, recovering from panics.
|
|
func safeDisconnectStats(conn *core.StatsConnection) {
|
|
if conn == nil {
|
|
return
|
|
}
|
|
defer func() { recover() }() //nolint:errcheck
|
|
conn.Disconnect()
|
|
}
|
|
|
|
type vppError struct{ msg string }
|
|
|
|
func (e *vppError) Error() string { return e.msg }
|
|
|
|
var errNotConnected = &vppError{msg: "VPP API connection not established"}
|