Files
vpp-maglev/internal/vpp/client.go
Pim van Pelt d5fbf5c640 Prometheus: add VPP, LB sync, and gRPC metrics; expand docs
New metrics plus the corresponding documentation for everything that's
accumulated since the last Prometheus pass.

internal/metrics/metrics.go
- New VPPSource interface (IsConnected, VPPInfo) plus a metrics-local
  VPPInfo struct that mirrors vpp.Info. Decoupling via interface +
  struct-mirror keeps the dependency direction one-way (vpp → metrics),
  so vpp can import metrics to update inline counters without a cycle.
- New Collector gauges scraped on demand: maglev_vpp_connected,
  maglev_vpp_uptime_seconds (from /sys/boottime), maglev_vpp_connected_seconds
  (time since maglevd connected), and maglev_vpp_info (static 1-gauge
  carrying version, build_date, and pid as labels).
- New inline counters:
  - maglev_vpp_api_total{msg, direction, result} — bumped from the
    loggedChannel wrapper on every VPP binary-API send/recv. Gives full
    visibility into what maglevd is doing with VPP, broken down by
    message name, direction (send/recv), and result (success/failure).
  - maglev_vpp_lbsync_total{scope, kind} — bumped from the reconciler
    at the end of each SyncLBStateAll/SyncLBStateVIP run. kind ∈
    {vip_added, vip_removed, as_added, as_removed, as_weight_updated};
    scope ∈ {all, vip}. Zero-valued kinds are not emitted so noise
    stays low.
- Register() signature now takes a VPPSource (may be nil) alongside
  the existing StateSource.

internal/vpp/client.go
- New VPPInfo() (metrics.VPPInfo, bool) shim method on *Client that
  satisfies metrics.VPPSource. Returns (_, false) when disconnected so
  the collector skips the vpp_* gauges cleanly.

internal/vpp/apilog.go
- The loggedChannel's SendRequest / SendMultiRequest / ReceiveReply
  paths now call metrics.VPPAPITotal.WithLabelValues(...).Inc() in
  addition to slog.Debug. Since every VPP API call in the codebase
  must go through loggedChannel (NewAPIChannel is unexported), this
  one instrumentation point catches everything.

internal/vpp/lbsync.go
- New recordSyncStats(scope, st) helper called once at the end of
  SyncLBStateAll and SyncLBStateVIP to bump maglev_vpp_lbsync_total.
  Zero-valued stats are skipped.

cmd/maglevd/main.go
- Added github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus
  for the standard gRPC server metrics (grpc_server_started_total,
  grpc_server_handled_total, grpc_server_handling_seconds, etc.,
  labelled by service/method/type/code).
- Constructs grpcprom.NewServerMetrics(WithServerHandlingTimeHistogram())
  before creating the grpc.Server, installs it as UnaryInterceptor +
  StreamInterceptor, then calls InitializeMetrics(srv) after service
  registration so every method appears at 0 on the first scrape
  instead of materialising lazily on first RPC.
- Passes the vppClient (or nil) as a metrics.VPPSource to
  metrics.Register so the vpp_* gauges are emitted when integration
  is enabled and silently omitted otherwise.

docs/user-guide.md
- New 'Prometheus metrics' section in the maglevd chapter,
  tabulating every metric family: backend state gauges, probe
  counters/histogram, transition counters, the new VPP gauges and
  counters, and the standard gRPC server metrics.
- 'show frontends <name>' description updated to mention the two
  weight columns ('weight' = configured from YAML, 'effective' =
  state-aware after pool-failover logic).
- Pause / disable descriptions clarified: transition history is
  preserved across these operator actions.

docs/healthchecks.md
- New 'Static (no-healthcheck) backends' section explaining that
  backends without a healthcheck use rise/fall=1, fire a synthetic
  passing probe immediately on startup (no 30s wait), and idle at
  30s between iterations thereafter.
- New 'Pool failover' section documenting the priority-tier model,
  the active-pool definition, when promotion happens, cascading to
  further tiers, and graceful drain on demotion. Points readers at
  'maglevc show frontends <name>' as the inspection interface.

docs/config-guide.md
- healthcheck field doc now describes static-backend behavior and
  cross-references healthchecks.md.
- pools field doc now explains failover semantics at a high level
  and cross-references the detailed healthchecks.md section.
2026-04-12 13:00:35 +02:00

377 lines
10 KiB
Go

// Copyright (c) 2026, Pim van Pelt <pim@ipng.ch>
// Package vpp manages the connection to a local VPP instance over its
// binary API and stats sockets. The Client reconnects automatically when
// VPP restarts.
package vpp
import (
"context"
"log/slog"
"sync"
"time"
"go.fd.io/govpp/adapter"
"go.fd.io/govpp/adapter/socketclient"
"go.fd.io/govpp/adapter/statsclient"
"go.fd.io/govpp/binapi/vpe"
"go.fd.io/govpp/core"
"git.ipng.ch/ipng/vpp-maglev/internal/config"
"git.ipng.ch/ipng/vpp-maglev/internal/health"
"git.ipng.ch/ipng/vpp-maglev/internal/metrics"
lb "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/lb"
)
// StateSource provides a live view of the running config and the current
// health state of each backend. checker.Checker satisfies this interface via
// its Config() and BackendState() methods. Decoupling via an interface avoids
// an import cycle with the checker package.
type StateSource interface {
Config() *config.Config
BackendState(name string) (health.State, bool)
}
const retryInterval = 5 * time.Second
const pingInterval = 10 * time.Second
const defaultLBSyncInterval = 30 * time.Second
// Info holds VPP version and connection metadata, populated on connect.
type Info struct {
Version string
BuildDate string
BuildDirectory string
PID uint32
BootTime time.Time // when VPP started (from /sys/boottime stats counter)
ConnectedSince time.Time // when maglevd connected to VPP
}
// Client manages connections to both the VPP API and stats sockets.
// Both connections are treated as a unit: if either drops, both are
// torn down and re-established together.
type Client struct {
apiAddr string
statsAddr string
mu sync.Mutex
apiConn *core.Connection
statsConn *core.StatsConnection
statsClient adapter.StatsAPI // raw adapter for DumpStats
info Info // populated on successful connect
stateSrc StateSource // optional; enables periodic LB sync
lastLBConf *lb.LbConf // cached last-pushed lb_conf (dedup)
}
// SetStateSource attaches a live config + health state source. When set, the
// VPP client runs a periodic SyncLBStateAll loop (at the interval from
// cfg.VPP.LB.SyncInterval) for as long as the VPP connection is up, and
// state-aware weights are used throughout the sync path. Must be called
// before Run.
func (c *Client) SetStateSource(src StateSource) {
c.mu.Lock()
defer c.mu.Unlock()
c.stateSrc = src
}
// getStateSource returns the registered state source under the mutex.
func (c *Client) getStateSource() StateSource {
c.mu.Lock()
defer c.mu.Unlock()
return c.stateSrc
}
// New creates a Client for the given socket paths.
func New(apiAddr, statsAddr string) *Client {
return &Client{apiAddr: apiAddr, statsAddr: statsAddr}
}
// Run connects to VPP and maintains the connection until ctx is cancelled.
// If VPP is unavailable or restarts, Run reconnects automatically.
func (c *Client) Run(ctx context.Context) {
for {
if err := c.connect(); err != nil {
slog.Debug("vpp-connect-failed", "err", err)
select {
case <-ctx.Done():
return
case <-time.After(retryInterval):
continue
}
}
// Fetch version info and record connect time.
// fetchInfo uses NewAPIChannel and statsClient which both take c.mu,
// so we must not hold c.mu here.
info := c.fetchInfo()
c.mu.Lock()
c.info = info
c.mu.Unlock()
slog.Info("vpp-connect", "version", c.info.Version,
"build-date", c.info.BuildDate,
"pid", c.info.PID,
"api", c.apiAddr, "stats", c.statsAddr)
// Read the current LB plugin state so we can log what's programmed.
if state, err := c.GetLBStateAll(); err != nil {
slog.Warn("vpp-lb-read-failed", "err", err)
} else {
totalAS := 0
for _, v := range state.VIPs {
totalAS += len(v.ASes)
}
slog.Info("vpp-lb-state",
"vips", len(state.VIPs),
"application-servers", totalAS,
"sticky-buckets-per-core", state.Conf.StickyBucketsPerCore,
"flow-timeout", state.Conf.FlowTimeout)
}
// Push global LB conf (src addresses, buckets, timeout) from the
// running config. On startup this is the initial set; on reconnect
// (VPP restart) VPP has forgotten everything, so we set it again.
c.mu.Lock()
src := c.stateSrc
c.mu.Unlock()
if src != nil {
if cfg := src.Config(); cfg != nil {
if err := c.SetLBConf(cfg); err != nil {
slog.Warn("vpp-lb-conf-set-failed", "err", err)
}
}
}
// Start the LB sync loop for as long as the connection is up.
// It exits when connCtx is cancelled (on disconnect or shutdown).
connCtx, connCancel := context.WithCancel(ctx)
go c.lbSyncLoop(connCtx)
// Hold the connection, pinging periodically to detect VPP restarts.
c.monitor(ctx)
connCancel()
// If ctx is done we're shutting down; otherwise VPP dropped and we retry.
c.disconnect()
if ctx.Err() != nil {
return
}
slog.Warn("vpp-disconnect", "msg", "connection lost, reconnecting")
}
}
// lbSyncLoop periodically runs SyncLBStateAll to catch drift between the
// maglev config and the VPP dataplane. The first run happens immediately
// on loop start (VPP has just connected, so any pre-existing state needs
// reconciliation). Subsequent runs fire every cfg.VPP.LB.SyncInterval.
// Exits when ctx is cancelled.
func (c *Client) lbSyncLoop(ctx context.Context) {
src := c.getStateSource()
if src == nil {
return // no state source registered; nothing to sync
}
// next-run timestamp starts at "now" so the first tick is immediate.
next := time.Now()
for {
wait := time.Until(next)
if wait < 0 {
wait = 0
}
select {
case <-ctx.Done():
return
case <-time.After(wait):
}
cfg := src.Config()
if cfg == nil {
next = time.Now().Add(defaultLBSyncInterval)
continue
}
interval := cfg.VPP.LB.SyncInterval
if interval <= 0 {
interval = defaultLBSyncInterval
}
if err := c.SyncLBStateAll(cfg); err != nil {
slog.Warn("vpp-lbsync-error", "err", err)
}
next = time.Now().Add(interval)
}
}
// IsConnected returns true if both API and stats connections are active.
func (c *Client) IsConnected() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.apiConn != nil && c.statsConn != nil
}
// GetInfo returns the VPP version and connection metadata, or an error
// if VPP is not connected.
func (c *Client) GetInfo() (Info, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.apiConn == nil {
return Info{}, errNotConnected
}
return c.info, nil
}
// VPPInfo satisfies metrics.VPPSource. It returns a copy of the cached
// connection info as a metrics-local struct so the metrics package doesn't
// need to import internal/vpp. Second return is false when VPP is not
// connected (the collector skips the vpp_* gauges in that case).
func (c *Client) VPPInfo() (metrics.VPPInfo, bool) {
c.mu.Lock()
defer c.mu.Unlock()
if c.apiConn == nil {
return metrics.VPPInfo{}, false
}
return metrics.VPPInfo{
Version: c.info.Version,
BuildDate: c.info.BuildDate,
PID: c.info.PID,
BootTime: c.info.BootTime,
ConnectedSince: c.info.ConnectedSince,
}, true
}
// connect establishes both API and stats connections. If either fails,
// both are torn down.
func (c *Client) connect() error {
sc := socketclient.NewVppClient(c.apiAddr)
sc.SetClientName("vpp-maglev")
apiConn, err := core.Connect(sc)
if err != nil {
return err
}
stc := statsclient.NewStatsClient(c.statsAddr)
statsConn, err := core.ConnectStats(stc)
if err != nil {
safeDisconnectAPI(apiConn)
return err
}
c.mu.Lock()
c.apiConn = apiConn
c.statsConn = statsConn
c.statsClient = stc
c.mu.Unlock()
return nil
}
// disconnect tears down both connections.
func (c *Client) disconnect() {
c.mu.Lock()
apiConn := c.apiConn
statsConn := c.statsConn
c.apiConn = nil
c.statsConn = nil
c.statsClient = nil
c.info = Info{}
c.lastLBConf = nil // force re-push of lb_conf on reconnect
c.mu.Unlock()
safeDisconnectAPI(apiConn)
safeDisconnectStats(statsConn)
}
// monitor blocks until the context is cancelled or a liveness ping fails.
func (c *Client) monitor(ctx context.Context) {
ticker := time.NewTicker(pingInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !c.ping() {
return
}
}
}
}
// ping sends a control_ping to VPP and returns true if it succeeds.
func (c *Client) ping() bool {
ch, err := c.apiChannel()
if err != nil {
return false
}
defer ch.Close()
req := &core.ControlPing{}
reply := &core.ControlPingReply{}
if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
slog.Debug("vpp-ping-failed", "err", err)
return false
}
return true
}
// fetchInfo queries VPP for version info, PID, and boot time.
// Must be called after connect succeeds (apiConn and statsClient are set).
func (c *Client) fetchInfo() Info {
info := Info{ConnectedSince: time.Now()}
ch, err := c.apiChannel()
if err != nil {
return info
}
defer ch.Close()
ver := &vpe.ShowVersionReply{}
if err := ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(ver); err == nil {
info.Version = ver.Version
info.BuildDate = ver.BuildDate
info.BuildDirectory = ver.BuildDirectory
}
ping := &core.ControlPingReply{}
if err := ch.SendRequest(&core.ControlPing{}).ReceiveReply(ping); err == nil {
info.PID = ping.VpePID
}
// Read VPP boot time from the stats segment.
c.mu.Lock()
sc := c.statsClient
c.mu.Unlock()
if sc != nil {
if entries, err := sc.DumpStats("/sys/boottime"); err == nil {
for _, e := range entries {
if s, ok := e.Data.(adapter.ScalarStat); ok && s != 0 {
info.BootTime = time.Unix(int64(s), 0)
}
}
}
}
return info
}
// safeDisconnectAPI disconnects an API connection, recovering from any panic
// that GoVPP may raise on a stale connection.
func safeDisconnectAPI(conn *core.Connection) {
if conn == nil {
return
}
defer func() { recover() }() //nolint:errcheck
conn.Disconnect()
}
// safeDisconnectStats disconnects a stats connection, recovering from panics.
func safeDisconnectStats(conn *core.StatsConnection) {
if conn == nil {
return
}
defer func() { recover() }() //nolint:errcheck
conn.Disconnect()
}
type vppError struct{ msg string }
func (e *vppError) Error() string { return e.msg }
var errNotConnected = &vppError{msg: "VPP API connection not established"}