Refactor vpp.go to have the connection mgmt and vpp_*.go to have one Manager each

This commit is contained in:
Pim van Pelt
2025-06-24 07:00:52 +02:00
parent 96b9dd501d
commit bdaa2e366b
4 changed files with 364 additions and 206 deletions

View File

@ -45,11 +45,19 @@ func main() {
log.Fatalf("Failed to start AgentX: %v", err)
}
// Set up interface event callback to update interface details
vpp.SetInterfaceEventCallback(interfaceMIB.UpdateInterfaceDetails)
// Create VPP client and managers
vppClient := vpp.NewVPPClient()
interfaceManager := vpp.NewInterfaceManager(vppClient)
statsManager := vpp.NewStatsManager(vppClient, interfaceManager)
// Start VPP stats routine with callback to update MIB
vpp.StartStatsRoutine(interfaceMIB.UpdateStats)
// Set up interface event callback to update interface details
interfaceManager.SetEventCallback(interfaceMIB.UpdateInterfaceDetails)
// Set up stats callback to update MIB
statsManager.SetStatsCallback(interfaceMIB.UpdateStats)
// Start VPP stats routine
statsManager.StartStatsRoutine()
// Set up signal handling for graceful shutdown
sigChan := make(chan os.Signal, 1)
@ -59,6 +67,10 @@ func main() {
<-sigChan
logger.Printf("Shutting down...")
// Stop stats routine and disconnect
statsManager.StopStatsRoutine()
vppClient.Disconnect()
// Flush any buffered log entries
logger.Sync()
}

178
src/vpp/vpp.go Normal file
View File

@ -0,0 +1,178 @@
// Copyright 2025, IPng Networks GmbH, Pim van Pelt <pim@ipng.ch>
package vpp
import (
"flag"
"go.fd.io/govpp/adapter/socketclient"
"go.fd.io/govpp/adapter/statsclient"
"go.fd.io/govpp/api"
"go.fd.io/govpp/binapi/vpe"
"go.fd.io/govpp/core"
"govpp-snmp-agentx/logger"
)
var (
// Flags for VPP configuration
ApiAddr = flag.String("vppstats.api.addr", "/var/run/vpp/api.sock", "VPP API socket path")
StatsAddr = flag.String("vppstats.stats.addr", "/var/run/vpp/stats.sock", "VPP stats socket path")
IfIndexOffset = flag.Int("vppstats.ifindex-offset", 1000, "Offset to add to VPP interface indices for SNMP")
Period = flag.Int("vppstats.period", 10, "Interval in seconds for querying VPP interface stats")
)
// VPPClient manages VPP connections and provides a unified interface
type VPPClient struct {
apiConn *core.Connection
statsConn *core.StatsConnection
connected bool
}
// NewVPPClient creates a new VPP client instance
func NewVPPClient() *VPPClient {
return &VPPClient{}
}
// Connect establishes connections to both VPP API and Stats sockets
func (c *VPPClient) Connect() error {
logger.Debugf("Connecting to VPP (API: %s, Stats: %s)", *ApiAddr, *StatsAddr)
// Connect to API socket
apiConn, err := core.Connect(socketclient.NewVppClient(*ApiAddr))
if err != nil {
return err
}
// Connect to stats socket
statsClient := statsclient.NewStatsClient(*StatsAddr)
statsConn, err := core.ConnectStats(statsClient)
if err != nil {
// Clean up API connection on stats failure
func() {
defer func() {
if r := recover(); r != nil {
logger.Debugf("Recovered from API disconnect during stats error: %v", r)
}
}()
apiConn.Disconnect()
}()
return err
}
c.apiConn = apiConn
c.statsConn = statsConn
c.connected = true
logger.Printf("Connected to VPP (API: %s, Stats: %s)", *ApiAddr, *StatsAddr)
return nil
}
// Disconnect closes all VPP connections safely
func (c *VPPClient) Disconnect() {
if c.apiConn != nil {
func() {
defer func() {
if r := recover(); r != nil {
logger.Debugf("Recovered from API disconnect panic: %v", r)
}
}()
c.apiConn.Disconnect()
}()
c.apiConn = nil
}
if c.statsConn != nil {
func() {
defer func() {
if r := recover(); r != nil {
logger.Debugf("Recovered from stats disconnect panic: %v", r)
}
}()
c.statsConn.Disconnect()
}()
c.statsConn = nil
}
c.connected = false
}
// IsConnected returns true if both API and Stats connections are active
func (c *VPPClient) IsConnected() bool {
return c.connected && c.apiConn != nil && c.statsConn != nil
}
// GetAPIConnection returns the API connection for direct use
func (c *VPPClient) GetAPIConnection() *core.Connection {
return c.apiConn
}
// GetStatsConnection returns the stats connection for direct use
func (c *VPPClient) GetStatsConnection() *core.StatsConnection {
return c.statsConn
}
// NewAPIChannel creates a new API channel from the connection
func (c *VPPClient) NewAPIChannel() (api.Channel, error) {
if c.apiConn == nil {
return nil, &VPPError{Message: "API connection not established"}
}
return c.apiConn.NewAPIChannel()
}
// CheckLiveness performs a VPP liveness check using ShowVersion API call
func (c *VPPClient) CheckLiveness() bool {
if !c.IsConnected() {
return false
}
ch, err := c.NewAPIChannel()
if err != nil {
logger.Debugf("Failed to create API channel for liveness check: %v", err)
return false
}
var channelClosed bool
defer func() {
if !channelClosed {
defer func() {
if r := recover(); r != nil {
logger.Debugf("Recovered from channel close panic: %v", r)
}
}()
ch.Close()
}
}()
req := &vpe.ShowVersion{}
reply := &vpe.ShowVersionReply{}
if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
logger.Debugf("VPP ShowVersion failed: %v", err)
func() {
defer func() {
if r := recover(); r != nil {
logger.Debugf("Channel already closed during error handling")
}
}()
ch.Close()
channelClosed = true
}()
return false
}
ch.Close()
channelClosed = true
logger.Debugf("VPP liveness check passed (version: %s)", string(reply.Version))
return true
}
// VPPError represents a VPP-specific error
type VPPError struct {
Message string
}
func (e *VPPError) Error() string {
return e.Message
}

View File

@ -26,8 +26,68 @@ type InterfaceDetails struct {
// InterfaceEventCallback is called when interface events occur
type InterfaceEventCallback func(details []InterfaceDetails)
// InterfaceManager handles interface-related VPP operations
type InterfaceManager struct {
client *VPPClient
eventCallback InterfaceEventCallback
}
// NewInterfaceManager creates a new interface manager
func NewInterfaceManager(client *VPPClient) *InterfaceManager {
return &InterfaceManager{
client: client,
}
}
// SetEventCallback sets the callback for interface events
func (im *InterfaceManager) SetEventCallback(callback InterfaceEventCallback) {
im.eventCallback = callback
}
// GetAllInterfaceDetails retrieves detailed information for all interfaces
func GetAllInterfaceDetails(ch api.Channel) ([]InterfaceDetails, error) {
func (im *InterfaceManager) GetAllInterfaceDetails() ([]InterfaceDetails, error) {
if !im.client.IsConnected() {
return nil, &VPPError{Message: "VPP client not connected"}
}
ch, err := im.client.NewAPIChannel()
if err != nil {
return nil, err
}
defer ch.Close()
return getAllInterfaceDetails(ch)
}
// StartEventWatcher starts watching for interface events
func (im *InterfaceManager) StartEventWatcher() error {
if !im.client.IsConnected() {
return &VPPError{Message: "VPP client not connected"}
}
ch, err := im.client.NewAPIChannel()
if err != nil {
return err
}
return watchInterfaceEvents(ch, im.handleInterfaceEvent)
}
// handleInterfaceEvent handles interface events and calls the callback
func (im *InterfaceManager) handleInterfaceEvent() {
if im.eventCallback != nil {
details, err := im.GetAllInterfaceDetails()
if err != nil {
logger.Debugf("Failed to retrieve interface details after event: %v", err)
} else {
logger.Debugf("Calling interface event callback with %d interfaces", len(details))
im.eventCallback(details)
}
}
}
// getAllInterfaceDetails retrieves detailed information for all interfaces (internal function)
func getAllInterfaceDetails(ch api.Channel) ([]InterfaceDetails, error) {
logger.Debugf("Retrieving all interface details from VPP")
// Get all interfaces
@ -73,7 +133,8 @@ func GetAllInterfaceDetails(ch api.Channel) ([]InterfaceDetails, error) {
return details, nil
}
func WatchInterfaceEvents(ch api.Channel, callback InterfaceEventCallback) error {
// watchInterfaceEvents watches for VPP interface events (internal function)
func watchInterfaceEvents(ch api.Channel, callback func()) error {
logger.Debugf("WatchInterfaceEvents() called - starting interface event monitoring")
notifChan := make(chan api.Message, 100)
@ -127,15 +188,9 @@ func WatchInterfaceEvents(ch api.Channel, callback InterfaceEventCallback) error
logger.Debugf("interface event: SwIfIndex=%d, Flags=%d, Deleted=%t",
e.SwIfIndex, e.Flags, e.Deleted)
// When an interface event occurs, retrieve all interface details and call callback
// When an interface event occurs, call the callback
if callback != nil {
details, err := GetAllInterfaceDetails(ch)
if err != nil {
logger.Debugf("Failed to retrieve interface details after event: %v", err)
} else {
logger.Debugf("Calling interface event callback with %d interfaces", len(details))
callback(details)
}
callback()
}
}
logger.Debugf("Interface event listener goroutine ended")

View File

@ -3,164 +3,126 @@
package vpp
import (
"flag"
"time"
"go.fd.io/govpp/adapter/socketclient"
"go.fd.io/govpp/adapter/statsclient"
"go.fd.io/govpp/api"
"go.fd.io/govpp/binapi/vpe"
"go.fd.io/govpp/core"
"govpp-snmp-agentx/logger"
)
// StatsCallback is called when interface stats are retrieved
type StatsCallback func(*api.InterfaceStats)
// Global callback for interface events
var interfaceEventCallback InterfaceEventCallback
var (
// Flags for VPP stats configuration
ApiAddr = flag.String("vppstats.api.addr", "/var/run/vpp/api.sock", "VPP API socket path")
StatsAddr = flag.String("vppstats.stats.addr", "/var/run/vpp/stats.sock", "VPP stats socket path")
IfIndexOffset = flag.Int("vppstats.ifindex-offset", 1000, "Offset to add to VPP interface indices for SNMP")
Period = flag.Int("vppstats.period", 10, "Interval in seconds for querying VPP interface stats")
)
// SetInterfaceEventCallback sets the callback for interface events
func SetInterfaceEventCallback(callback InterfaceEventCallback) {
interfaceEventCallback = callback
// StatsManager handles VPP statistics operations
type StatsManager struct {
client *VPPClient
interfaceManager *InterfaceManager
statsCallback StatsCallback
period time.Duration
running bool
}
// StartStatsRoutine starts a goroutine that queries VPP interface stats at the configured interval
func StartStatsRoutine(callback StatsCallback) {
period := time.Duration(*Period) * time.Second
go statsRoutine(period, callback)
// NewStatsManager creates a new stats manager
func NewStatsManager(client *VPPClient, interfaceManager *InterfaceManager) *StatsManager {
return &StatsManager{
client: client,
interfaceManager: interfaceManager,
period: time.Duration(*Period) * time.Second,
}
}
func statsRoutine(period time.Duration, callback StatsCallback) {
logger.Debugf("Starting VPP stats routine with API: %s, Stats: %s, period: %v", *ApiAddr, *StatsAddr, period)
// SetStatsCallback sets the callback for stats updates
func (sm *StatsManager) SetStatsCallback(callback StatsCallback) {
sm.statsCallback = callback
}
var conn *core.Connection
var statsConn *core.StatsConnection
var connected = false
var wasConnected = false
// SetPeriod sets the polling period for stats
func (sm *StatsManager) SetPeriod(period time.Duration) {
sm.period = period
}
ticker := time.NewTicker(period)
// StartStatsRoutine starts the stats polling routine
func (sm *StatsManager) StartStatsRoutine() {
if sm.running {
logger.Debugf("Stats routine already running")
return
}
sm.running = true
go sm.statsRoutine()
}
// StopStatsRoutine stops the stats polling routine
func (sm *StatsManager) StopStatsRoutine() {
sm.running = false
}
// GetInterfaceStats retrieves current interface statistics
func (sm *StatsManager) GetInterfaceStats() (*api.InterfaceStats, error) {
if !sm.client.IsConnected() {
return nil, &VPPError{Message: "VPP client not connected"}
}
statsConn := sm.client.GetStatsConnection()
if statsConn == nil {
return nil, &VPPError{Message: "Stats connection not available"}
}
stats := new(api.InterfaceStats)
if err := statsConn.GetInterfaceStats(stats); err != nil {
return nil, err
}
return stats, nil
}
// statsRoutine is the main stats polling loop
func (sm *StatsManager) statsRoutine() {
logger.Debugf("Starting VPP stats routine with period: %v", sm.period)
ticker := time.NewTicker(sm.period)
defer ticker.Stop()
defer func() {
// Safely disconnect connections with panic recovery
if conn != nil {
func() {
defer func() {
if r := recover(); r != nil {
logger.Debugf("Recovered from conn.Disconnect panic: %v", r)
}
}()
conn.Disconnect()
}()
}
if statsConn != nil {
func() {
defer func() {
if r := recover(); r != nil {
logger.Debugf("Recovered from statsConn.Disconnect panic: %v", r)
}
}()
statsConn.Disconnect()
}()
}
}()
var wasConnected = false
for {
if !sm.running {
logger.Debugf("Stats routine stopping")
break
}
// Check if we need to connect/reconnect
if !connected {
// Clean up existing connections
if conn != nil {
func() {
defer func() {
if r := recover(); r != nil {
logger.Debugf("Recovered from conn.Disconnect during reconnect: %v", r)
}
}()
conn.Disconnect()
}()
conn = nil
}
if statsConn != nil {
func() {
defer func() {
if r := recover(); r != nil {
logger.Debugf("Recovered from statsConn.Disconnect during reconnect: %v", r)
}
}()
statsConn.Disconnect()
}()
statsConn = nil
if !sm.client.IsConnected() {
if wasConnected {
logger.Printf("VPP connection lost, attempting reconnect...")
wasConnected = false
} else {
logger.Debugf("VPP not connected, attempting connection...")
}
// Create API connection first - only proceed if this succeeds
var err error
conn, err = core.Connect(socketclient.NewVppClient(*ApiAddr))
if err != nil {
if wasConnected {
logger.Printf("VPP API connection lost: %v", err)
wasConnected = false
} else {
logger.Debugf("Failed to connect to VPP API: %v", err)
}
connected = false
if err := sm.client.Connect(); err != nil {
logger.Debugf("Failed to connect to VPP: %v", err)
time.Sleep(time.Second)
continue
}
// Only try stats connection if API connection succeeded
statsClient := statsclient.NewStatsClient(*StatsAddr)
statsConn, err = core.ConnectStats(statsClient)
if err != nil {
logger.Printf("VPP stats connection failed: %v", err)
// Close the API connection since we can't get stats
func() {
defer func() {
if r := recover(); r != nil {
logger.Debugf("Recovered from conn.Disconnect during stats error: %v", r)
}
}()
conn.Disconnect()
}()
conn = nil
connected = false
time.Sleep(time.Second)
continue
}
logger.Printf("Connected to VPP (API: %s, Stats: %s)", *ApiAddr, *StatsAddr)
connected = true
logger.Printf("VPP connection established")
wasConnected = true
// Start watching interface events
logger.Debugf("Creating API channel for interface events...")
ch, err := conn.NewAPIChannel()
if err != nil {
logger.Debugf("Failed to create API channel for interface events: %v", err)
} else {
logger.Debugf("API channel created successfully, calling WatchInterfaceEvents...")
if err := WatchInterfaceEvents(ch, interfaceEventCallback); err != nil {
// Initialize interface event watching
if sm.interfaceManager != nil {
if err := sm.interfaceManager.StartEventWatcher(); err != nil {
logger.Debugf("Failed to start interface event watching: %v", err)
ch.Close()
} else {
logger.Printf("Interface event watching started successfully")
logger.Debugf("Interface event watching started")
// Do initial retrieval of interface details
if interfaceEventCallback != nil {
details, err := GetAllInterfaceDetails(ch)
if err != nil {
logger.Debugf("Failed to get initial interface details: %v", err)
} else {
logger.Debugf("Retrieved initial interface details for %d interfaces", len(details))
interfaceEventCallback(details)
// Get initial interface details
if details, err := sm.interfaceManager.GetAllInterfaceDetails(); err != nil {
logger.Debugf("Failed to get initial interface details: %v", err)
} else {
logger.Debugf("Retrieved initial interface details for %d interfaces", len(details))
if sm.interfaceManager.eventCallback != nil {
sm.interfaceManager.eventCallback(details)
}
}
}
@ -168,9 +130,10 @@ func statsRoutine(period time.Duration, callback StatsCallback) {
}
// Query stats if connected
if connected {
if !queryInterfaceStats(conn, statsConn, callback) {
connected = false
if sm.client.IsConnected() {
if !sm.queryAndReportStats() {
logger.Printf("Stats query failed, marking connection as lost")
sm.client.Disconnect()
continue
}
}
@ -178,20 +141,21 @@ func statsRoutine(period time.Duration, callback StatsCallback) {
// Wait for next tick
<-ticker.C
}
logger.Debugf("Stats routine ended")
}
func queryInterfaceStats(conn *core.Connection, statsConn *core.StatsConnection, callback StatsCallback) bool {
// Check VPP liveness using API call
if !checkVPPLiveness(conn) {
logger.Printf("VPP liveness check failed")
// queryAndReportStats queries stats and calls the callback
func (sm *StatsManager) queryAndReportStats() bool {
// Check VPP liveness first
if !sm.client.CheckLiveness() {
logger.Debugf("VPP liveness check failed")
return false
}
// Create the proper struct for interface stats
stats := new(api.InterfaceStats)
// Use the GetInterfaceStats method - this is the correct approach
if err := statsConn.GetInterfaceStats(stats); err != nil {
// Get interface stats
stats, err := sm.GetInterfaceStats()
if err != nil {
logger.Printf("Failed to get interface stats: %v", err)
return false
}
@ -208,60 +172,9 @@ func queryInterfaceStats(conn *core.Connection, statsConn *core.StatsConnection,
}
// Call the callback to update the MIB
if callback != nil {
callback(stats)
if sm.statsCallback != nil {
sm.statsCallback(stats)
}
return true
}
func checkVPPLiveness(conn *core.Connection) bool {
// Create a channel for the API call
ch, err := conn.NewAPIChannel()
if err != nil {
logger.Debugf("Failed to create API channel: %v", err)
return false
}
// Use a flag to track if channel was closed successfully
var channelClosed bool
defer func() {
if !channelClosed {
// Recover from potential panic when closing already closed channel
defer func() {
if r := recover(); r != nil {
logger.Debugf("Recovered from channel close panic: %v", r)
}
}()
ch.Close()
}
}()
// Create ShowVersion request
req := &vpe.ShowVersion{}
reply := &vpe.ShowVersionReply{}
// Send the request with timeout
if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
logger.Debugf("VPP ShowVersion failed: %v", err)
// Try to close the channel properly on error
func() {
defer func() {
if r := recover(); r != nil {
logger.Debugf("Channel already closed during error handling")
}
}()
ch.Close()
channelClosed = true
}()
return false
}
// Close channel successfully
ch.Close()
channelClosed = true
// If we got here, VPP is responsive
logger.Debugf("VPP liveness check passed (version: %s)", string(reply.Version))
return true
}