Make statsmanager and interfacemanager independent. Add reconnect logic for EventMonitoring
This commit is contained in:
@ -48,7 +48,7 @@ func main() {
|
||||
// Create VPP client and managers
|
||||
vppClient := vpp.NewVPPClient()
|
||||
interfaceManager := vpp.NewInterfaceManager(vppClient)
|
||||
statsManager := vpp.NewStatsManager(vppClient, interfaceManager)
|
||||
statsManager := vpp.NewStatsManager(vppClient)
|
||||
|
||||
// Set up interface event callback to update interface details
|
||||
interfaceManager.SetEventCallback(interfaceMIB.UpdateInterfaceDetails)
|
||||
@ -59,6 +59,9 @@ func main() {
|
||||
// Start VPP stats routine
|
||||
statsManager.StartStatsRoutine()
|
||||
|
||||
// Start interface event monitoring (handles reconnections automatically)
|
||||
interfaceManager.StartEventMonitoring()
|
||||
|
||||
// Set up signal handling for graceful shutdown
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
@ -67,8 +70,9 @@ func main() {
|
||||
<-sigChan
|
||||
logger.Printf("Shutting down...")
|
||||
|
||||
// Stop stats routine and disconnect
|
||||
// Stop stats routine and interface monitoring, then disconnect
|
||||
statsManager.StopStatsRoutine()
|
||||
interfaceManager.StopEventMonitoring()
|
||||
vppClient.Disconnect()
|
||||
|
||||
// Flush any buffered log entries
|
||||
|
@ -4,6 +4,7 @@ package vpp
|
||||
|
||||
import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"go.fd.io/govpp/api"
|
||||
interfaces "go.fd.io/govpp/binapi/interface"
|
||||
@ -30,6 +31,8 @@ type InterfaceEventCallback func(details []InterfaceDetails)
|
||||
type InterfaceManager struct {
|
||||
client *VPPClient
|
||||
eventCallback InterfaceEventCallback
|
||||
running bool
|
||||
watchingEvents bool
|
||||
}
|
||||
|
||||
// NewInterfaceManager creates a new interface manager
|
||||
@ -44,6 +47,82 @@ func (im *InterfaceManager) SetEventCallback(callback InterfaceEventCallback) {
|
||||
im.eventCallback = callback
|
||||
}
|
||||
|
||||
// InitializeEventWatching starts event watching and retrieves initial interface details
|
||||
func (im *InterfaceManager) InitializeEventWatching() error {
|
||||
if !im.client.IsConnected() {
|
||||
return &VPPError{Message: "VPP client not connected"}
|
||||
}
|
||||
|
||||
// Start watching interface events
|
||||
if err := im.StartEventWatcher(); err != nil {
|
||||
logger.Debugf("Failed to start interface event watching: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Debugf("Interface event watching started")
|
||||
|
||||
// Get initial interface details
|
||||
if details, err := im.GetAllInterfaceDetails(); err != nil {
|
||||
logger.Debugf("Failed to get initial interface details: %v", err)
|
||||
return err
|
||||
} else {
|
||||
logger.Debugf("Retrieved initial interface details for %d interfaces", len(details))
|
||||
if im.eventCallback != nil {
|
||||
im.eventCallback(details)
|
||||
}
|
||||
}
|
||||
|
||||
im.watchingEvents = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartEventMonitoring starts continuous monitoring for VPP connection and restarts event watching as needed
|
||||
func (im *InterfaceManager) StartEventMonitoring() {
|
||||
if im.running {
|
||||
logger.Debugf("Interface event monitoring already running")
|
||||
return
|
||||
}
|
||||
|
||||
im.running = true
|
||||
go im.eventMonitoringRoutine()
|
||||
}
|
||||
|
||||
// StopEventMonitoring stops the event monitoring routine
|
||||
func (im *InterfaceManager) StopEventMonitoring() {
|
||||
im.running = false
|
||||
}
|
||||
|
||||
// eventMonitoringRoutine continuously monitors VPP connection and manages event watching
|
||||
func (im *InterfaceManager) eventMonitoringRoutine() {
|
||||
logger.Debugf("Starting interface event monitoring routine")
|
||||
|
||||
for {
|
||||
if !im.running {
|
||||
logger.Debugf("Interface event monitoring routine stopping")
|
||||
break
|
||||
}
|
||||
|
||||
if im.client.IsConnected() {
|
||||
if !im.watchingEvents {
|
||||
if err := im.InitializeEventWatching(); err != nil {
|
||||
logger.Debugf("Failed to initialize interface event watching: %v", err)
|
||||
} else {
|
||||
logger.Debugf("Interface event watching restarted after reconnection")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if im.watchingEvents {
|
||||
logger.Debugf("VPP connection lost, interface event watching will restart on reconnection")
|
||||
im.watchingEvents = false
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
logger.Debugf("Interface event monitoring routine ended")
|
||||
}
|
||||
|
||||
// GetAllInterfaceDetails retrieves detailed information for all interfaces
|
||||
func (im *InterfaceManager) GetAllInterfaceDetails() ([]InterfaceDetails, error) {
|
||||
if !im.client.IsConnected() {
|
||||
|
@ -3,8 +3,10 @@
|
||||
package vpp
|
||||
|
||||
import (
|
||||
"go.fd.io/govpp/binapi/interface_types"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.fd.io/govpp/binapi/interface_types"
|
||||
)
|
||||
|
||||
func TestNewInterfaceManager(t *testing.T) {
|
||||
@ -22,6 +24,14 @@ func TestNewInterfaceManager(t *testing.T) {
|
||||
if manager.eventCallback != nil {
|
||||
t.Error("InterfaceManager should have nil callback initially")
|
||||
}
|
||||
|
||||
if manager.running {
|
||||
t.Error("InterfaceManager should not be running initially")
|
||||
}
|
||||
|
||||
if manager.watchingEvents {
|
||||
t.Error("InterfaceManager should not be watching events initially")
|
||||
}
|
||||
}
|
||||
|
||||
func TestInterfaceManagerSetEventCallback(t *testing.T) {
|
||||
@ -116,6 +126,25 @@ func TestInterfaceManagerHandleInterfaceEventWithoutCallback(t *testing.T) {
|
||||
manager.handleInterfaceEvent()
|
||||
}
|
||||
|
||||
func TestInterfaceManagerInitializeEventWatchingWithoutConnection(t *testing.T) {
|
||||
client := NewVPPClient()
|
||||
manager := NewInterfaceManager(client)
|
||||
|
||||
err := manager.InitializeEventWatching()
|
||||
if err == nil {
|
||||
t.Error("InitializeEventWatching() should return error when not connected")
|
||||
}
|
||||
|
||||
vppErr, ok := err.(*VPPError)
|
||||
if !ok {
|
||||
t.Errorf("Expected VPPError, got %T", err)
|
||||
}
|
||||
|
||||
if vppErr.Message != "VPP client not connected" {
|
||||
t.Errorf("Expected specific error message, got: %s", vppErr.Message)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInterfaceDetails(t *testing.T) {
|
||||
details := InterfaceDetails{
|
||||
SwIfIndex: interface_types.InterfaceIndex(42),
|
||||
@ -188,3 +217,68 @@ func TestInterfaceEventCallback(t *testing.T) {
|
||||
t.Errorf("Expected second interface 'test2', got %q", callbackDetails[1].InterfaceName)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInterfaceManagerStartStopEventMonitoring(t *testing.T) {
|
||||
client := NewVPPClient()
|
||||
manager := NewInterfaceManager(client)
|
||||
|
||||
if manager.running {
|
||||
t.Error("InterfaceManager should not be running initially")
|
||||
}
|
||||
|
||||
manager.StartEventMonitoring()
|
||||
|
||||
if !manager.running {
|
||||
t.Error("InterfaceManager should be running after StartEventMonitoring()")
|
||||
}
|
||||
|
||||
// Test starting again (should be safe)
|
||||
manager.StartEventMonitoring()
|
||||
|
||||
if !manager.running {
|
||||
t.Error("InterfaceManager should still be running after second StartEventMonitoring()")
|
||||
}
|
||||
|
||||
manager.StopEventMonitoring()
|
||||
|
||||
if manager.running {
|
||||
t.Error("InterfaceManager should not be running after StopEventMonitoring()")
|
||||
}
|
||||
}
|
||||
|
||||
func TestInterfaceManagerEventMonitoringWithConnectionChanges(t *testing.T) {
|
||||
client := NewVPPClient()
|
||||
manager := NewInterfaceManager(client)
|
||||
|
||||
// Set a callback to track calls
|
||||
var callbackCount int
|
||||
manager.SetEventCallback(func(details []InterfaceDetails) {
|
||||
callbackCount++
|
||||
})
|
||||
|
||||
manager.StartEventMonitoring()
|
||||
|
||||
// Let it run briefly
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// Simulate VPP connection and disconnection by checking state changes
|
||||
initialWatchingState := manager.watchingEvents
|
||||
|
||||
// Stop monitoring
|
||||
manager.StopEventMonitoring()
|
||||
|
||||
// Verify it stopped
|
||||
if manager.running {
|
||||
t.Error("Event monitoring should have stopped")
|
||||
}
|
||||
|
||||
// The watching state should reflect the connection state
|
||||
if !client.IsConnected() && manager.watchingEvents {
|
||||
t.Error("Should not be watching events when disconnected")
|
||||
}
|
||||
|
||||
// Initial state should be false since we're not connected to VPP in tests
|
||||
if initialWatchingState {
|
||||
t.Error("Should not be watching events initially when VPP is not connected")
|
||||
}
|
||||
}
|
||||
|
@ -16,17 +16,15 @@ type StatsCallback func(*api.InterfaceStats)
|
||||
// StatsManager handles VPP statistics operations
|
||||
type StatsManager struct {
|
||||
client *VPPClient
|
||||
interfaceManager *InterfaceManager
|
||||
statsCallback StatsCallback
|
||||
period time.Duration
|
||||
running bool
|
||||
}
|
||||
|
||||
// NewStatsManager creates a new stats manager
|
||||
func NewStatsManager(client *VPPClient, interfaceManager *InterfaceManager) *StatsManager {
|
||||
func NewStatsManager(client *VPPClient) *StatsManager {
|
||||
return &StatsManager{
|
||||
client: client,
|
||||
interfaceManager: interfaceManager,
|
||||
period: time.Duration(*Period) * time.Second,
|
||||
}
|
||||
}
|
||||
@ -108,25 +106,6 @@ func (sm *StatsManager) statsRoutine() {
|
||||
|
||||
logger.Printf("VPP connection established")
|
||||
wasConnected = true
|
||||
|
||||
// Initialize interface event watching
|
||||
if sm.interfaceManager != nil {
|
||||
if err := sm.interfaceManager.StartEventWatcher(); err != nil {
|
||||
logger.Debugf("Failed to start interface event watching: %v", err)
|
||||
} else {
|
||||
logger.Debugf("Interface event watching started")
|
||||
|
||||
// Get initial interface details
|
||||
if details, err := sm.interfaceManager.GetAllInterfaceDetails(); err != nil {
|
||||
logger.Debugf("Failed to get initial interface details: %v", err)
|
||||
} else {
|
||||
logger.Debugf("Retrieved initial interface details for %d interfaces", len(details))
|
||||
if sm.interfaceManager.eventCallback != nil {
|
||||
sm.interfaceManager.eventCallback(details)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Query stats if connected
|
||||
|
@ -11,8 +11,7 @@ import (
|
||||
|
||||
func TestNewStatsManager(t *testing.T) {
|
||||
client := NewVPPClient()
|
||||
interfaceManager := NewInterfaceManager(client)
|
||||
manager := NewStatsManager(client, interfaceManager)
|
||||
manager := NewStatsManager(client)
|
||||
|
||||
if manager == nil {
|
||||
t.Fatal("NewStatsManager() returned nil")
|
||||
@ -22,10 +21,6 @@ func TestNewStatsManager(t *testing.T) {
|
||||
t.Error("StatsManager should store the provided client")
|
||||
}
|
||||
|
||||
if manager.interfaceManager != interfaceManager {
|
||||
t.Error("StatsManager should store the provided interface manager")
|
||||
}
|
||||
|
||||
if manager.period != time.Duration(*Period)*time.Second {
|
||||
t.Errorf("Expected period %v, got %v", time.Duration(*Period)*time.Second, manager.period)
|
||||
}
|
||||
@ -41,8 +36,7 @@ func TestNewStatsManager(t *testing.T) {
|
||||
|
||||
func TestStatsManagerSetStatsCallback(t *testing.T) {
|
||||
client := NewVPPClient()
|
||||
interfaceManager := NewInterfaceManager(client)
|
||||
manager := NewStatsManager(client, interfaceManager)
|
||||
manager := NewStatsManager(client)
|
||||
|
||||
var callbackCalled bool
|
||||
var receivedStats *api.InterfaceStats
|
||||
@ -91,8 +85,7 @@ func TestStatsManagerSetStatsCallback(t *testing.T) {
|
||||
|
||||
func TestStatsManagerSetPeriod(t *testing.T) {
|
||||
client := NewVPPClient()
|
||||
interfaceManager := NewInterfaceManager(client)
|
||||
manager := NewStatsManager(client, interfaceManager)
|
||||
manager := NewStatsManager(client)
|
||||
|
||||
newPeriod := 5 * time.Second
|
||||
manager.SetPeriod(newPeriod)
|
||||
@ -104,8 +97,7 @@ func TestStatsManagerSetPeriod(t *testing.T) {
|
||||
|
||||
func TestStatsManagerStartStopStatsRoutine(t *testing.T) {
|
||||
client := NewVPPClient()
|
||||
interfaceManager := NewInterfaceManager(client)
|
||||
manager := NewStatsManager(client, interfaceManager)
|
||||
manager := NewStatsManager(client)
|
||||
|
||||
if manager.running {
|
||||
t.Error("StatsManager should not be running initially")
|
||||
@ -133,8 +125,7 @@ func TestStatsManagerStartStopStatsRoutine(t *testing.T) {
|
||||
|
||||
func TestStatsManagerGetInterfaceStatsWithoutConnection(t *testing.T) {
|
||||
client := NewVPPClient()
|
||||
interfaceManager := NewInterfaceManager(client)
|
||||
manager := NewStatsManager(client, interfaceManager)
|
||||
manager := NewStatsManager(client)
|
||||
|
||||
_, err := manager.GetInterfaceStats()
|
||||
if err == nil {
|
||||
@ -215,8 +206,7 @@ func TestStatsCallback(t *testing.T) {
|
||||
|
||||
func TestStatsManagerQueryAndReportStatsWithoutConnection(t *testing.T) {
|
||||
client := NewVPPClient()
|
||||
interfaceManager := NewInterfaceManager(client)
|
||||
manager := NewStatsManager(client, interfaceManager)
|
||||
manager := NewStatsManager(client)
|
||||
|
||||
// Should return false when not connected
|
||||
if manager.queryAndReportStats() {
|
||||
@ -226,8 +216,7 @@ func TestStatsManagerQueryAndReportStatsWithoutConnection(t *testing.T) {
|
||||
|
||||
func TestStatsManagerWithShortPeriod(t *testing.T) {
|
||||
client := NewVPPClient()
|
||||
interfaceManager := NewInterfaceManager(client)
|
||||
manager := NewStatsManager(client, interfaceManager)
|
||||
manager := NewStatsManager(client)
|
||||
|
||||
// Set a very short period for testing
|
||||
manager.SetPeriod(10 * time.Millisecond)
|
||||
|
Reference in New Issue
Block a user