Add a VPP API liveness check before reading the stats segment.
This commit is contained in:
@ -2,11 +2,12 @@ package vppstats
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"log"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.fd.io/govpp/adapter/socketclient"
|
||||||
"go.fd.io/govpp/adapter/statsclient"
|
"go.fd.io/govpp/adapter/statsclient"
|
||||||
"go.fd.io/govpp/api"
|
"go.fd.io/govpp/api"
|
||||||
|
"go.fd.io/govpp/binapi/vpe"
|
||||||
"go.fd.io/govpp/core"
|
"go.fd.io/govpp/core"
|
||||||
|
|
||||||
"govpp-snmp-agentx/logger"
|
"govpp-snmp-agentx/logger"
|
||||||
@ -30,31 +31,208 @@ func StartStatsRoutine(callback StatsCallback) {
|
|||||||
func statsRoutine(period time.Duration, callback StatsCallback) {
|
func statsRoutine(period time.Duration, callback StatsCallback) {
|
||||||
logger.Debugf("Starting VPP stats routine with socket: %s, period: %v", *StatsAddr, period)
|
logger.Debugf("Starting VPP stats routine with socket: %s, period: %v", *StatsAddr, period)
|
||||||
|
|
||||||
// Create stats client
|
var conn *core.Connection
|
||||||
client := statsclient.NewStatsClient(*StatsAddr)
|
var statsConn *core.StatsConnection
|
||||||
|
var connected = false
|
||||||
// Connect using core.ConnectStats (proper way)
|
var wasConnected = false
|
||||||
c, err := core.ConnectStats(client)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to connect to VPP stats: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer c.Disconnect()
|
|
||||||
|
|
||||||
// Query stats immediately on startup
|
|
||||||
queryInterfaceStats(c, callback)
|
|
||||||
|
|
||||||
ticker := time.NewTicker(period)
|
ticker := time.NewTicker(period)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
// Safely disconnect connections with panic recovery
|
||||||
|
if conn != nil {
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
logger.Debugf("Recovered from conn.Disconnect panic: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
conn.Disconnect()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
if statsConn != nil {
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
logger.Debugf("Recovered from statsConn.Disconnect panic: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
statsConn.Disconnect()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
// Check if we need to connect/reconnect
|
||||||
|
if !connected {
|
||||||
|
// Clean up existing connections
|
||||||
|
if conn != nil {
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
logger.Debugf("Recovered from conn.Disconnect during reconnect: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
conn.Disconnect()
|
||||||
|
}()
|
||||||
|
conn = nil
|
||||||
|
}
|
||||||
|
if statsConn != nil {
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
logger.Debugf("Recovered from statsConn.Disconnect during reconnect: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
statsConn.Disconnect()
|
||||||
|
}()
|
||||||
|
statsConn = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create API connection first - only proceed if this succeeds
|
||||||
|
var err error
|
||||||
|
conn, err = core.Connect(socketclient.NewVppClient("/var/run/vpp/api.sock"))
|
||||||
|
if err != nil {
|
||||||
|
if wasConnected {
|
||||||
|
logger.Printf("VPP API connection lost: %v", err)
|
||||||
|
wasConnected = false
|
||||||
|
} else {
|
||||||
|
logger.Debugf("Failed to connect to VPP API: %v", err)
|
||||||
|
}
|
||||||
|
connected = false
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only try stats connection if API connection succeeded
|
||||||
|
statsClient := statsclient.NewStatsClient(*StatsAddr)
|
||||||
|
statsConn, err = core.ConnectStats(statsClient)
|
||||||
|
if err != nil {
|
||||||
|
logger.Printf("VPP stats connection failed: %v", err)
|
||||||
|
// Close the API connection since we can't get stats
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
logger.Debugf("Recovered from conn.Disconnect during stats error: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
conn.Disconnect()
|
||||||
|
}()
|
||||||
|
conn = nil
|
||||||
|
connected = false
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Printf("Connected to VPP (API: /var/run/vpp/api.sock, Stats: %s)", *StatsAddr)
|
||||||
|
connected = true
|
||||||
|
wasConnected = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query stats if connected
|
||||||
|
if connected {
|
||||||
|
if !queryInterfaceStatsWithLivenessCheck(conn, statsConn, callback) {
|
||||||
|
connected = false
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for next tick
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
queryInterfaceStats(c, callback)
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func queryInterfaceStatsWithLivenessCheck(conn *core.Connection, statsConn *core.StatsConnection, callback StatsCallback) bool {
|
||||||
|
// Check VPP liveness using API call
|
||||||
|
if !checkVPPLiveness(conn) {
|
||||||
|
logger.Printf("VPP liveness check failed")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the proper struct for interface stats
|
||||||
|
stats := new(api.InterfaceStats)
|
||||||
|
|
||||||
|
// Use the GetInterfaceStats method - this is the correct approach
|
||||||
|
if err := statsConn.GetInterfaceStats(stats); err != nil {
|
||||||
|
logger.Printf("Failed to get interface stats: %v", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Always log basic info
|
||||||
|
logger.Printf("Retrieved stats for %d interfaces", len(stats.Interfaces))
|
||||||
|
|
||||||
|
// Debug logging for individual interfaces
|
||||||
|
for _, iface := range stats.Interfaces {
|
||||||
|
logger.Debugf("Interface %d (%s): RX %d pkts/%d bytes, TX %d pkts/%d bytes",
|
||||||
|
iface.InterfaceIndex, iface.InterfaceName,
|
||||||
|
iface.Rx.Packets, iface.Rx.Bytes,
|
||||||
|
iface.Tx.Packets, iface.Tx.Bytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the callback to update the MIB
|
||||||
|
if callback != nil {
|
||||||
|
callback(stats)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkVPPLiveness(conn *core.Connection) bool {
|
||||||
|
// Create a channel for the API call
|
||||||
|
ch, err := conn.NewAPIChannel()
|
||||||
|
if err != nil {
|
||||||
|
logger.Debugf("Failed to create API channel: %v", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use a flag to track if channel was closed successfully
|
||||||
|
var channelClosed bool
|
||||||
|
defer func() {
|
||||||
|
if !channelClosed {
|
||||||
|
// Recover from potential panic when closing already closed channel
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
logger.Debugf("Recovered from channel close panic: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
ch.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Create ShowVersion request
|
||||||
|
req := &vpe.ShowVersion{}
|
||||||
|
reply := &vpe.ShowVersionReply{}
|
||||||
|
|
||||||
|
// Send the request with timeout
|
||||||
|
if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
|
||||||
|
logger.Debugf("VPP ShowVersion failed: %v", err)
|
||||||
|
// Try to close the channel properly on error
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
logger.Debugf("Channel already closed during error handling")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
ch.Close()
|
||||||
|
channelClosed = true
|
||||||
|
}()
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close channel successfully
|
||||||
|
ch.Close()
|
||||||
|
channelClosed = true
|
||||||
|
|
||||||
|
// If we got here, VPP is responsive
|
||||||
|
logger.Debugf("VPP liveness check passed (version: %s)", string(reply.Version))
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep the old function for compatibility
|
||||||
func queryInterfaceStats(c *core.StatsConnection, callback StatsCallback) {
|
func queryInterfaceStats(c *core.StatsConnection, callback StatsCallback) {
|
||||||
// Create the proper struct for interface stats
|
// Create the proper struct for interface stats
|
||||||
stats := new(api.InterfaceStats)
|
stats := new(api.InterfaceStats)
|
||||||
|
Reference in New Issue
Block a user