From bdaa2e366bcedae30164fe97a715b61d45b37038 Mon Sep 17 00:00:00 2001 From: Pim van Pelt Date: Tue, 24 Jun 2025 07:00:52 +0200 Subject: [PATCH] Refactor vpp.go to have the connection mgmt and vpp_*.go to have one Manager each --- src/main.go | 20 ++- src/vpp/vpp.go | 178 ++++++++++++++++++++++++++ src/vpp/vpp_iface.go | 75 +++++++++-- src/vpp/vpp_stats.go | 297 +++++++++++++++---------------------------- 4 files changed, 364 insertions(+), 206 deletions(-) create mode 100644 src/vpp/vpp.go diff --git a/src/main.go b/src/main.go index 445c9d7..68d4183 100644 --- a/src/main.go +++ b/src/main.go @@ -45,11 +45,19 @@ func main() { log.Fatalf("Failed to start AgentX: %v", err) } - // Set up interface event callback to update interface details - vpp.SetInterfaceEventCallback(interfaceMIB.UpdateInterfaceDetails) + // Create VPP client and managers + vppClient := vpp.NewVPPClient() + interfaceManager := vpp.NewInterfaceManager(vppClient) + statsManager := vpp.NewStatsManager(vppClient, interfaceManager) - // Start VPP stats routine with callback to update MIB - vpp.StartStatsRoutine(interfaceMIB.UpdateStats) + // Set up interface event callback to update interface details + interfaceManager.SetEventCallback(interfaceMIB.UpdateInterfaceDetails) + + // Set up stats callback to update MIB + statsManager.SetStatsCallback(interfaceMIB.UpdateStats) + + // Start VPP stats routine + statsManager.StartStatsRoutine() // Set up signal handling for graceful shutdown sigChan := make(chan os.Signal, 1) @@ -59,6 +67,10 @@ func main() { <-sigChan logger.Printf("Shutting down...") + // Stop stats routine and disconnect + statsManager.StopStatsRoutine() + vppClient.Disconnect() + // Flush any buffered log entries logger.Sync() } diff --git a/src/vpp/vpp.go b/src/vpp/vpp.go new file mode 100644 index 0000000..cc9b1af --- /dev/null +++ b/src/vpp/vpp.go @@ -0,0 +1,178 @@ +// Copyright 2025, IPng Networks GmbH, Pim van Pelt + +package vpp + +import ( + "flag" + + "go.fd.io/govpp/adapter/socketclient" + "go.fd.io/govpp/adapter/statsclient" + "go.fd.io/govpp/api" + "go.fd.io/govpp/binapi/vpe" + "go.fd.io/govpp/core" + + "govpp-snmp-agentx/logger" +) + +var ( + // Flags for VPP configuration + ApiAddr = flag.String("vppstats.api.addr", "/var/run/vpp/api.sock", "VPP API socket path") + StatsAddr = flag.String("vppstats.stats.addr", "/var/run/vpp/stats.sock", "VPP stats socket path") + IfIndexOffset = flag.Int("vppstats.ifindex-offset", 1000, "Offset to add to VPP interface indices for SNMP") + Period = flag.Int("vppstats.period", 10, "Interval in seconds for querying VPP interface stats") +) + +// VPPClient manages VPP connections and provides a unified interface +type VPPClient struct { + apiConn *core.Connection + statsConn *core.StatsConnection + connected bool +} + +// NewVPPClient creates a new VPP client instance +func NewVPPClient() *VPPClient { + return &VPPClient{} +} + +// Connect establishes connections to both VPP API and Stats sockets +func (c *VPPClient) Connect() error { + logger.Debugf("Connecting to VPP (API: %s, Stats: %s)", *ApiAddr, *StatsAddr) + + // Connect to API socket + apiConn, err := core.Connect(socketclient.NewVppClient(*ApiAddr)) + if err != nil { + return err + } + + // Connect to stats socket + statsClient := statsclient.NewStatsClient(*StatsAddr) + statsConn, err := core.ConnectStats(statsClient) + if err != nil { + // Clean up API connection on stats failure + func() { + defer func() { + if r := recover(); r != nil { + logger.Debugf("Recovered from API disconnect during stats error: %v", r) + } + }() + apiConn.Disconnect() + }() + return err + } + + c.apiConn = apiConn + c.statsConn = statsConn + c.connected = true + + logger.Printf("Connected to VPP (API: %s, Stats: %s)", *ApiAddr, *StatsAddr) + return nil +} + +// Disconnect closes all VPP connections safely +func (c *VPPClient) Disconnect() { + if c.apiConn != nil { + func() { + defer func() { + if r := recover(); r != nil { + logger.Debugf("Recovered from API disconnect panic: %v", r) + } + }() + c.apiConn.Disconnect() + }() + c.apiConn = nil + } + + if c.statsConn != nil { + func() { + defer func() { + if r := recover(); r != nil { + logger.Debugf("Recovered from stats disconnect panic: %v", r) + } + }() + c.statsConn.Disconnect() + }() + c.statsConn = nil + } + + c.connected = false +} + +// IsConnected returns true if both API and Stats connections are active +func (c *VPPClient) IsConnected() bool { + return c.connected && c.apiConn != nil && c.statsConn != nil +} + +// GetAPIConnection returns the API connection for direct use +func (c *VPPClient) GetAPIConnection() *core.Connection { + return c.apiConn +} + +// GetStatsConnection returns the stats connection for direct use +func (c *VPPClient) GetStatsConnection() *core.StatsConnection { + return c.statsConn +} + +// NewAPIChannel creates a new API channel from the connection +func (c *VPPClient) NewAPIChannel() (api.Channel, error) { + if c.apiConn == nil { + return nil, &VPPError{Message: "API connection not established"} + } + return c.apiConn.NewAPIChannel() +} + +// CheckLiveness performs a VPP liveness check using ShowVersion API call +func (c *VPPClient) CheckLiveness() bool { + if !c.IsConnected() { + return false + } + + ch, err := c.NewAPIChannel() + if err != nil { + logger.Debugf("Failed to create API channel for liveness check: %v", err) + return false + } + + var channelClosed bool + defer func() { + if !channelClosed { + defer func() { + if r := recover(); r != nil { + logger.Debugf("Recovered from channel close panic: %v", r) + } + }() + ch.Close() + } + }() + + req := &vpe.ShowVersion{} + reply := &vpe.ShowVersionReply{} + + if err := ch.SendRequest(req).ReceiveReply(reply); err != nil { + logger.Debugf("VPP ShowVersion failed: %v", err) + func() { + defer func() { + if r := recover(); r != nil { + logger.Debugf("Channel already closed during error handling") + } + }() + ch.Close() + channelClosed = true + }() + return false + } + + ch.Close() + channelClosed = true + + logger.Debugf("VPP liveness check passed (version: %s)", string(reply.Version)) + return true +} + +// VPPError represents a VPP-specific error +type VPPError struct { + Message string +} + +func (e *VPPError) Error() string { + return e.Message +} diff --git a/src/vpp/vpp_iface.go b/src/vpp/vpp_iface.go index 1fc2f6e..132babd 100644 --- a/src/vpp/vpp_iface.go +++ b/src/vpp/vpp_iface.go @@ -26,8 +26,68 @@ type InterfaceDetails struct { // InterfaceEventCallback is called when interface events occur type InterfaceEventCallback func(details []InterfaceDetails) +// InterfaceManager handles interface-related VPP operations +type InterfaceManager struct { + client *VPPClient + eventCallback InterfaceEventCallback +} + +// NewInterfaceManager creates a new interface manager +func NewInterfaceManager(client *VPPClient) *InterfaceManager { + return &InterfaceManager{ + client: client, + } +} + +// SetEventCallback sets the callback for interface events +func (im *InterfaceManager) SetEventCallback(callback InterfaceEventCallback) { + im.eventCallback = callback +} + // GetAllInterfaceDetails retrieves detailed information for all interfaces -func GetAllInterfaceDetails(ch api.Channel) ([]InterfaceDetails, error) { +func (im *InterfaceManager) GetAllInterfaceDetails() ([]InterfaceDetails, error) { + if !im.client.IsConnected() { + return nil, &VPPError{Message: "VPP client not connected"} + } + + ch, err := im.client.NewAPIChannel() + if err != nil { + return nil, err + } + defer ch.Close() + + return getAllInterfaceDetails(ch) +} + +// StartEventWatcher starts watching for interface events +func (im *InterfaceManager) StartEventWatcher() error { + if !im.client.IsConnected() { + return &VPPError{Message: "VPP client not connected"} + } + + ch, err := im.client.NewAPIChannel() + if err != nil { + return err + } + + return watchInterfaceEvents(ch, im.handleInterfaceEvent) +} + +// handleInterfaceEvent handles interface events and calls the callback +func (im *InterfaceManager) handleInterfaceEvent() { + if im.eventCallback != nil { + details, err := im.GetAllInterfaceDetails() + if err != nil { + logger.Debugf("Failed to retrieve interface details after event: %v", err) + } else { + logger.Debugf("Calling interface event callback with %d interfaces", len(details)) + im.eventCallback(details) + } + } +} + +// getAllInterfaceDetails retrieves detailed information for all interfaces (internal function) +func getAllInterfaceDetails(ch api.Channel) ([]InterfaceDetails, error) { logger.Debugf("Retrieving all interface details from VPP") // Get all interfaces @@ -73,7 +133,8 @@ func GetAllInterfaceDetails(ch api.Channel) ([]InterfaceDetails, error) { return details, nil } -func WatchInterfaceEvents(ch api.Channel, callback InterfaceEventCallback) error { +// watchInterfaceEvents watches for VPP interface events (internal function) +func watchInterfaceEvents(ch api.Channel, callback func()) error { logger.Debugf("WatchInterfaceEvents() called - starting interface event monitoring") notifChan := make(chan api.Message, 100) @@ -127,15 +188,9 @@ func WatchInterfaceEvents(ch api.Channel, callback InterfaceEventCallback) error logger.Debugf("interface event: SwIfIndex=%d, Flags=%d, Deleted=%t", e.SwIfIndex, e.Flags, e.Deleted) - // When an interface event occurs, retrieve all interface details and call callback + // When an interface event occurs, call the callback if callback != nil { - details, err := GetAllInterfaceDetails(ch) - if err != nil { - logger.Debugf("Failed to retrieve interface details after event: %v", err) - } else { - logger.Debugf("Calling interface event callback with %d interfaces", len(details)) - callback(details) - } + callback() } } logger.Debugf("Interface event listener goroutine ended") diff --git a/src/vpp/vpp_stats.go b/src/vpp/vpp_stats.go index fed5df7..f331c00 100644 --- a/src/vpp/vpp_stats.go +++ b/src/vpp/vpp_stats.go @@ -3,164 +3,126 @@ package vpp import ( - "flag" "time" - "go.fd.io/govpp/adapter/socketclient" - "go.fd.io/govpp/adapter/statsclient" "go.fd.io/govpp/api" - "go.fd.io/govpp/binapi/vpe" - "go.fd.io/govpp/core" "govpp-snmp-agentx/logger" ) +// StatsCallback is called when interface stats are retrieved type StatsCallback func(*api.InterfaceStats) -// Global callback for interface events -var interfaceEventCallback InterfaceEventCallback - -var ( - // Flags for VPP stats configuration - ApiAddr = flag.String("vppstats.api.addr", "/var/run/vpp/api.sock", "VPP API socket path") - StatsAddr = flag.String("vppstats.stats.addr", "/var/run/vpp/stats.sock", "VPP stats socket path") - IfIndexOffset = flag.Int("vppstats.ifindex-offset", 1000, "Offset to add to VPP interface indices for SNMP") - Period = flag.Int("vppstats.period", 10, "Interval in seconds for querying VPP interface stats") -) - -// SetInterfaceEventCallback sets the callback for interface events -func SetInterfaceEventCallback(callback InterfaceEventCallback) { - interfaceEventCallback = callback +// StatsManager handles VPP statistics operations +type StatsManager struct { + client *VPPClient + interfaceManager *InterfaceManager + statsCallback StatsCallback + period time.Duration + running bool } -// StartStatsRoutine starts a goroutine that queries VPP interface stats at the configured interval -func StartStatsRoutine(callback StatsCallback) { - period := time.Duration(*Period) * time.Second - go statsRoutine(period, callback) +// NewStatsManager creates a new stats manager +func NewStatsManager(client *VPPClient, interfaceManager *InterfaceManager) *StatsManager { + return &StatsManager{ + client: client, + interfaceManager: interfaceManager, + period: time.Duration(*Period) * time.Second, + } } -func statsRoutine(period time.Duration, callback StatsCallback) { - logger.Debugf("Starting VPP stats routine with API: %s, Stats: %s, period: %v", *ApiAddr, *StatsAddr, period) +// SetStatsCallback sets the callback for stats updates +func (sm *StatsManager) SetStatsCallback(callback StatsCallback) { + sm.statsCallback = callback +} - var conn *core.Connection - var statsConn *core.StatsConnection - var connected = false - var wasConnected = false +// SetPeriod sets the polling period for stats +func (sm *StatsManager) SetPeriod(period time.Duration) { + sm.period = period +} - ticker := time.NewTicker(period) +// StartStatsRoutine starts the stats polling routine +func (sm *StatsManager) StartStatsRoutine() { + if sm.running { + logger.Debugf("Stats routine already running") + return + } + + sm.running = true + go sm.statsRoutine() +} + +// StopStatsRoutine stops the stats polling routine +func (sm *StatsManager) StopStatsRoutine() { + sm.running = false +} + +// GetInterfaceStats retrieves current interface statistics +func (sm *StatsManager) GetInterfaceStats() (*api.InterfaceStats, error) { + if !sm.client.IsConnected() { + return nil, &VPPError{Message: "VPP client not connected"} + } + + statsConn := sm.client.GetStatsConnection() + if statsConn == nil { + return nil, &VPPError{Message: "Stats connection not available"} + } + + stats := new(api.InterfaceStats) + if err := statsConn.GetInterfaceStats(stats); err != nil { + return nil, err + } + + return stats, nil +} + +// statsRoutine is the main stats polling loop +func (sm *StatsManager) statsRoutine() { + logger.Debugf("Starting VPP stats routine with period: %v", sm.period) + + ticker := time.NewTicker(sm.period) defer ticker.Stop() - defer func() { - // Safely disconnect connections with panic recovery - if conn != nil { - func() { - defer func() { - if r := recover(); r != nil { - logger.Debugf("Recovered from conn.Disconnect panic: %v", r) - } - }() - conn.Disconnect() - }() - } - if statsConn != nil { - func() { - defer func() { - if r := recover(); r != nil { - logger.Debugf("Recovered from statsConn.Disconnect panic: %v", r) - } - }() - statsConn.Disconnect() - }() - } - }() + var wasConnected = false for { + if !sm.running { + logger.Debugf("Stats routine stopping") + break + } + // Check if we need to connect/reconnect - if !connected { - // Clean up existing connections - if conn != nil { - func() { - defer func() { - if r := recover(); r != nil { - logger.Debugf("Recovered from conn.Disconnect during reconnect: %v", r) - } - }() - conn.Disconnect() - }() - conn = nil - } - if statsConn != nil { - func() { - defer func() { - if r := recover(); r != nil { - logger.Debugf("Recovered from statsConn.Disconnect during reconnect: %v", r) - } - }() - statsConn.Disconnect() - }() - statsConn = nil + if !sm.client.IsConnected() { + if wasConnected { + logger.Printf("VPP connection lost, attempting reconnect...") + wasConnected = false + } else { + logger.Debugf("VPP not connected, attempting connection...") } - // Create API connection first - only proceed if this succeeds - var err error - conn, err = core.Connect(socketclient.NewVppClient(*ApiAddr)) - if err != nil { - if wasConnected { - logger.Printf("VPP API connection lost: %v", err) - wasConnected = false - } else { - logger.Debugf("Failed to connect to VPP API: %v", err) - } - connected = false + if err := sm.client.Connect(); err != nil { + logger.Debugf("Failed to connect to VPP: %v", err) time.Sleep(time.Second) continue } - // Only try stats connection if API connection succeeded - statsClient := statsclient.NewStatsClient(*StatsAddr) - statsConn, err = core.ConnectStats(statsClient) - if err != nil { - logger.Printf("VPP stats connection failed: %v", err) - // Close the API connection since we can't get stats - func() { - defer func() { - if r := recover(); r != nil { - logger.Debugf("Recovered from conn.Disconnect during stats error: %v", r) - } - }() - conn.Disconnect() - }() - conn = nil - connected = false - time.Sleep(time.Second) - continue - } - - logger.Printf("Connected to VPP (API: %s, Stats: %s)", *ApiAddr, *StatsAddr) - connected = true + logger.Printf("VPP connection established") wasConnected = true - // Start watching interface events - logger.Debugf("Creating API channel for interface events...") - ch, err := conn.NewAPIChannel() - if err != nil { - logger.Debugf("Failed to create API channel for interface events: %v", err) - } else { - logger.Debugf("API channel created successfully, calling WatchInterfaceEvents...") - if err := WatchInterfaceEvents(ch, interfaceEventCallback); err != nil { + // Initialize interface event watching + if sm.interfaceManager != nil { + if err := sm.interfaceManager.StartEventWatcher(); err != nil { logger.Debugf("Failed to start interface event watching: %v", err) - ch.Close() } else { - logger.Printf("Interface event watching started successfully") + logger.Debugf("Interface event watching started") - // Do initial retrieval of interface details - if interfaceEventCallback != nil { - details, err := GetAllInterfaceDetails(ch) - if err != nil { - logger.Debugf("Failed to get initial interface details: %v", err) - } else { - logger.Debugf("Retrieved initial interface details for %d interfaces", len(details)) - interfaceEventCallback(details) + // Get initial interface details + if details, err := sm.interfaceManager.GetAllInterfaceDetails(); err != nil { + logger.Debugf("Failed to get initial interface details: %v", err) + } else { + logger.Debugf("Retrieved initial interface details for %d interfaces", len(details)) + if sm.interfaceManager.eventCallback != nil { + sm.interfaceManager.eventCallback(details) } } } @@ -168,9 +130,10 @@ func statsRoutine(period time.Duration, callback StatsCallback) { } // Query stats if connected - if connected { - if !queryInterfaceStats(conn, statsConn, callback) { - connected = false + if sm.client.IsConnected() { + if !sm.queryAndReportStats() { + logger.Printf("Stats query failed, marking connection as lost") + sm.client.Disconnect() continue } } @@ -178,20 +141,21 @@ func statsRoutine(period time.Duration, callback StatsCallback) { // Wait for next tick <-ticker.C } + + logger.Debugf("Stats routine ended") } -func queryInterfaceStats(conn *core.Connection, statsConn *core.StatsConnection, callback StatsCallback) bool { - // Check VPP liveness using API call - if !checkVPPLiveness(conn) { - logger.Printf("VPP liveness check failed") +// queryAndReportStats queries stats and calls the callback +func (sm *StatsManager) queryAndReportStats() bool { + // Check VPP liveness first + if !sm.client.CheckLiveness() { + logger.Debugf("VPP liveness check failed") return false } - // Create the proper struct for interface stats - stats := new(api.InterfaceStats) - - // Use the GetInterfaceStats method - this is the correct approach - if err := statsConn.GetInterfaceStats(stats); err != nil { + // Get interface stats + stats, err := sm.GetInterfaceStats() + if err != nil { logger.Printf("Failed to get interface stats: %v", err) return false } @@ -208,60 +172,9 @@ func queryInterfaceStats(conn *core.Connection, statsConn *core.StatsConnection, } // Call the callback to update the MIB - if callback != nil { - callback(stats) + if sm.statsCallback != nil { + sm.statsCallback(stats) } return true } - -func checkVPPLiveness(conn *core.Connection) bool { - // Create a channel for the API call - ch, err := conn.NewAPIChannel() - if err != nil { - logger.Debugf("Failed to create API channel: %v", err) - return false - } - - // Use a flag to track if channel was closed successfully - var channelClosed bool - defer func() { - if !channelClosed { - // Recover from potential panic when closing already closed channel - defer func() { - if r := recover(); r != nil { - logger.Debugf("Recovered from channel close panic: %v", r) - } - }() - ch.Close() - } - }() - - // Create ShowVersion request - req := &vpe.ShowVersion{} - reply := &vpe.ShowVersionReply{} - - // Send the request with timeout - if err := ch.SendRequest(req).ReceiveReply(reply); err != nil { - logger.Debugf("VPP ShowVersion failed: %v", err) - // Try to close the channel properly on error - func() { - defer func() { - if r := recover(); r != nil { - logger.Debugf("Channel already closed during error handling") - } - }() - ch.Close() - channelClosed = true - }() - return false - } - - // Close channel successfully - ch.Close() - channelClosed = true - - // If we got here, VPP is responsive - logger.Debugf("VPP liveness check passed (version: %s)", string(reply.Version)) - return true -}