Add GoVPP integration and GetVPPInfo gRPC call
VPP client (internal/vpp/) - New package managing connections to both VPP API and stats sockets, treated as a unit: if either drops, both are torn down and re-established together. - Run() loop: connect, fetch version via vpe.ShowVersion, read /sys/boottime from the stats segment, log vpp-connect, then monitor with control_ping every 10s. On failure, disconnect both, retry after 5s. - Registers as client name "vpp-maglev" (visible in VPP's "show api clients"). - Flags: --vpp-api-addr (default /run/vpp/api.sock) and --vpp-stats-addr (default /run/vpp/stats.sock). Empty api addr disables VPP integration entirely. gRPC / proto - Add GetVPPInfo RPC returning VPPInfo: version, build_date, build_directory, pid, boottime_ns, connecttime_ns. Both times are unix timestamps in nanoseconds — the client computes durations locally for display. - Returns codes.Unavailable if VPP is disabled or not connected. maglevc - Add 'show vpp info' command displaying version, build-date, build-dir, vpp-pid, vpp-boottime (with duration), and connected time (with duration).
This commit is contained in:
265
internal/vpp/client.go
Normal file
265
internal/vpp/client.go
Normal file
@@ -0,0 +1,265 @@
|
||||
// Copyright (c) 2026, Pim van Pelt <pim@ipng.ch>
|
||||
|
||||
// Package vpp manages the connection to a local VPP instance over its
|
||||
// binary API and stats sockets. The Client reconnects automatically when
|
||||
// VPP restarts.
|
||||
package vpp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.fd.io/govpp/adapter"
|
||||
"go.fd.io/govpp/adapter/socketclient"
|
||||
"go.fd.io/govpp/adapter/statsclient"
|
||||
"go.fd.io/govpp/api"
|
||||
"go.fd.io/govpp/binapi/vpe"
|
||||
"go.fd.io/govpp/core"
|
||||
)
|
||||
|
||||
const retryInterval = 5 * time.Second
|
||||
const pingInterval = 10 * time.Second
|
||||
|
||||
// Info holds VPP version and connection metadata, populated on connect.
|
||||
type Info struct {
|
||||
Version string
|
||||
BuildDate string
|
||||
BuildDirectory string
|
||||
PID uint32
|
||||
BootTime time.Time // when VPP started (from /sys/boottime stats counter)
|
||||
ConnectedSince time.Time // when maglevd connected to VPP
|
||||
}
|
||||
|
||||
// Client manages connections to both the VPP API and stats sockets.
|
||||
// Both connections are treated as a unit: if either drops, both are
|
||||
// torn down and re-established together.
|
||||
type Client struct {
|
||||
apiAddr string
|
||||
statsAddr string
|
||||
|
||||
mu sync.Mutex
|
||||
apiConn *core.Connection
|
||||
statsConn *core.StatsConnection
|
||||
statsClient adapter.StatsAPI // raw adapter for DumpStats
|
||||
info Info // populated on successful connect
|
||||
}
|
||||
|
||||
// New creates a Client for the given socket paths.
|
||||
func New(apiAddr, statsAddr string) *Client {
|
||||
return &Client{apiAddr: apiAddr, statsAddr: statsAddr}
|
||||
}
|
||||
|
||||
// Run connects to VPP and maintains the connection until ctx is cancelled.
|
||||
// If VPP is unavailable or restarts, Run reconnects automatically.
|
||||
func (c *Client) Run(ctx context.Context) {
|
||||
for {
|
||||
if err := c.connect(); err != nil {
|
||||
slog.Debug("vpp-connect-failed", "err", err)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(retryInterval):
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch version info and record connect time.
|
||||
// fetchInfo uses NewAPIChannel and statsClient which both take c.mu,
|
||||
// so we must not hold c.mu here.
|
||||
info := c.fetchInfo()
|
||||
c.mu.Lock()
|
||||
c.info = info
|
||||
c.mu.Unlock()
|
||||
slog.Info("vpp-connect", "version", c.info.Version,
|
||||
"build-date", c.info.BuildDate,
|
||||
"pid", c.info.PID,
|
||||
"api", c.apiAddr, "stats", c.statsAddr)
|
||||
|
||||
// Hold the connection, pinging periodically to detect VPP restarts.
|
||||
c.monitor(ctx)
|
||||
|
||||
// If ctx is done we're shutting down; otherwise VPP dropped and we retry.
|
||||
c.disconnect()
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
slog.Warn("vpp-disconnect", "msg", "connection lost, reconnecting")
|
||||
}
|
||||
}
|
||||
|
||||
// IsConnected returns true if both API and stats connections are active.
|
||||
func (c *Client) IsConnected() bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.apiConn != nil && c.statsConn != nil
|
||||
}
|
||||
|
||||
// NewAPIChannel creates a new API channel for sending VPP binary API requests.
|
||||
// Returns an error if the API connection is not established.
|
||||
func (c *Client) NewAPIChannel() (api.Channel, error) {
|
||||
c.mu.Lock()
|
||||
conn := c.apiConn
|
||||
c.mu.Unlock()
|
||||
if conn == nil {
|
||||
return nil, errNotConnected
|
||||
}
|
||||
return conn.NewAPIChannel()
|
||||
}
|
||||
|
||||
// StatsConnection returns the stats connection, or nil if not connected.
|
||||
func (c *Client) StatsConnection() *core.StatsConnection {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.statsConn
|
||||
}
|
||||
|
||||
// GetInfo returns the VPP version and connection metadata, or an error
|
||||
// if VPP is not connected.
|
||||
func (c *Client) GetInfo() (Info, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.apiConn == nil {
|
||||
return Info{}, errNotConnected
|
||||
}
|
||||
return c.info, nil
|
||||
}
|
||||
|
||||
// connect establishes both API and stats connections. If either fails,
|
||||
// both are torn down.
|
||||
func (c *Client) connect() error {
|
||||
sc := socketclient.NewVppClient(c.apiAddr)
|
||||
sc.SetClientName("vpp-maglev")
|
||||
apiConn, err := core.Connect(sc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stc := statsclient.NewStatsClient(c.statsAddr)
|
||||
statsConn, err := core.ConnectStats(stc)
|
||||
if err != nil {
|
||||
safeDisconnectAPI(apiConn)
|
||||
return err
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.apiConn = apiConn
|
||||
c.statsConn = statsConn
|
||||
c.statsClient = stc
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// disconnect tears down both connections.
|
||||
func (c *Client) disconnect() {
|
||||
c.mu.Lock()
|
||||
apiConn := c.apiConn
|
||||
statsConn := c.statsConn
|
||||
c.apiConn = nil
|
||||
c.statsConn = nil
|
||||
c.statsClient = nil
|
||||
c.info = Info{}
|
||||
c.mu.Unlock()
|
||||
|
||||
safeDisconnectAPI(apiConn)
|
||||
safeDisconnectStats(statsConn)
|
||||
}
|
||||
|
||||
// monitor blocks until the context is cancelled or a liveness ping fails.
|
||||
func (c *Client) monitor(ctx context.Context) {
|
||||
ticker := time.NewTicker(pingInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if !c.ping() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ping sends a control_ping to VPP and returns true if it succeeds.
|
||||
func (c *Client) ping() bool {
|
||||
ch, err := c.NewAPIChannel()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
defer ch.Close()
|
||||
|
||||
req := &core.ControlPing{}
|
||||
reply := &core.ControlPingReply{}
|
||||
if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
|
||||
slog.Debug("vpp-ping-failed", "err", err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// fetchInfo queries VPP for version info, PID, and boot time.
|
||||
// Must be called after connect succeeds (apiConn and statsClient are set).
|
||||
func (c *Client) fetchInfo() Info {
|
||||
info := Info{ConnectedSince: time.Now()}
|
||||
|
||||
ch, err := c.NewAPIChannel()
|
||||
if err != nil {
|
||||
return info
|
||||
}
|
||||
defer ch.Close()
|
||||
|
||||
ver := &vpe.ShowVersionReply{}
|
||||
if err := ch.SendRequest(&vpe.ShowVersion{}).ReceiveReply(ver); err == nil {
|
||||
info.Version = ver.Version
|
||||
info.BuildDate = ver.BuildDate
|
||||
info.BuildDirectory = ver.BuildDirectory
|
||||
}
|
||||
|
||||
ping := &core.ControlPingReply{}
|
||||
if err := ch.SendRequest(&core.ControlPing{}).ReceiveReply(ping); err == nil {
|
||||
info.PID = ping.VpePID
|
||||
}
|
||||
|
||||
// Read VPP boot time from the stats segment.
|
||||
c.mu.Lock()
|
||||
sc := c.statsClient
|
||||
c.mu.Unlock()
|
||||
if sc != nil {
|
||||
if entries, err := sc.DumpStats("/sys/boottime"); err == nil {
|
||||
for _, e := range entries {
|
||||
if s, ok := e.Data.(adapter.ScalarStat); ok && s != 0 {
|
||||
info.BootTime = time.Unix(int64(s), 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return info
|
||||
}
|
||||
|
||||
// safeDisconnectAPI disconnects an API connection, recovering from any panic
|
||||
// that GoVPP may raise on a stale connection.
|
||||
func safeDisconnectAPI(conn *core.Connection) {
|
||||
if conn == nil {
|
||||
return
|
||||
}
|
||||
defer func() { recover() }() //nolint:errcheck
|
||||
conn.Disconnect()
|
||||
}
|
||||
|
||||
// safeDisconnectStats disconnects a stats connection, recovering from panics.
|
||||
func safeDisconnectStats(conn *core.StatsConnection) {
|
||||
if conn == nil {
|
||||
return
|
||||
}
|
||||
defer func() { recover() }() //nolint:errcheck
|
||||
conn.Disconnect()
|
||||
}
|
||||
|
||||
type vppError struct{ msg string }
|
||||
|
||||
func (e *vppError) Error() string { return e.msg }
|
||||
|
||||
var errNotConnected = &vppError{msg: "VPP API connection not established"}
|
||||
Reference in New Issue
Block a user