Turn VPPApi into a threadsafe object

It now is tolerant to VPP restarts. Upon initialization, we connect(),
blocking all but the first thread from trying. The rest will see
self.connected=True and move on.

Then, on each/any error, call vpp.disconect() and set connected=False
which will make any subsequent AgentX updater run force a reconnect.
This commit is contained in:
Pim van Pelt
2021-09-05 20:02:11 +00:00
parent e1cddc8c26
commit 7dec1329d2
2 changed files with 78 additions and 63 deletions

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from vppstats import VPPStats from vppstats import VPPStats
import vppapi from vppapi import VPPApi
import time import time
import pyagentx import pyagentx
import logging import logging
@ -52,8 +52,9 @@ class ifMtu(pyagentx.Updater):
def update(self): def update(self):
global vppstat, vpp global vppstat, vpp
vppstat.connect() vppstat.connect()
vpp.connect()
ifaces = vppapi.get_ifaces(vpp) ifaces = vpp.get_ifaces()
for i in range(len(vppstat['/if/names'])): for i in range(len(vppstat['/if/names'])):
ifname = vppstat['/if/names'][i] ifname = vppstat['/if/names'][i]
@ -69,8 +70,9 @@ class ifSpeed(pyagentx.Updater):
def update(self): def update(self):
global vppstat, vpp global vppstat, vpp
vppstat.connect() vppstat.connect()
vpp.connect()
ifaces = vppapi.get_ifaces(vpp) ifaces = vpp.get_ifaces()
for i in range(len(vppstat['/if/names'])): for i in range(len(vppstat['/if/names'])):
ifname = vppstat['/if/names'][i] ifname = vppstat['/if/names'][i]
@ -91,8 +93,9 @@ class ifAdminStatus(pyagentx.Updater):
def update(self): def update(self):
global vppstat, vpp global vppstat, vpp
vppstat.connect() vppstat.connect()
vpp.connect()
ifaces = vppapi.get_ifaces(vpp) ifaces = vpp.get_ifaces()
for i in range(len(vppstat['/if/names'])): for i in range(len(vppstat['/if/names'])):
ifname = vppstat['/if/names'][i] ifname = vppstat['/if/names'][i]
@ -112,8 +115,9 @@ class ifOperStatus(pyagentx.Updater):
def update(self): def update(self):
global vppstat, vpp global vppstat, vpp
vppstat.connect() vppstat.connect()
vpp.connect()
ifaces = vppapi.get_ifaces(vpp) ifaces = vpp.get_ifaces()
for i in range(len(vppstat['/if/names'])): for i in range(len(vppstat['/if/names'])):
ifname = vppstat['/if/names'][i] ifname = vppstat['/if/names'][i]
@ -133,8 +137,9 @@ class ifPhysAddress(pyagentx.Updater):
def update(self): def update(self):
global vppstat, vpp global vppstat, vpp
vppstat.connect() vppstat.connect()
vpp.connect()
ifaces = vppapi.get_ifaces(vpp) ifaces = vpp.get_ifaces()
for i in range(len(vppstat['/if/names'])): for i in range(len(vppstat['/if/names'])):
ifname = vppstat['/if/names'][i] ifname = vppstat['/if/names'][i]
@ -280,8 +285,9 @@ class ifHighSpeed(pyagentx.Updater):
def update(self): def update(self):
global vppstat, vpp global vppstat, vpp
vppstat.connect() vppstat.connect()
vpp.connect()
ifaces = vppapi.get_ifaces(vpp) ifaces = vpp.get_ifaces()
for i in range(len(vppstat['/if/names'])): for i in range(len(vppstat['/if/names'])):
ifname = vppstat['/if/names'][i] ifname = vppstat['/if/names'][i]
@ -480,8 +486,8 @@ def main():
vppstat = VPPStats(socketname='/run/vpp/stats.sock', timeout=2) vppstat = VPPStats(socketname='/run/vpp/stats.sock', timeout=2)
vppstat.connect() vppstat.connect()
vpp = vppapi.vpp_connect() vpp = VPPApi()
if not vpp: if not vpp.connect():
logger.error("Can't connect to VPP API, bailing") logger.error("Can't connect to VPP API, bailing")
return return
@ -495,6 +501,7 @@ def main():
a.stop() a.stop()
vppstat.disconnect() vppstat.disconnect()
vpp.disconnect()
if __name__ == "__main__": if __name__ == "__main__":

116
vppapi.py
View File

@ -15,71 +15,79 @@ class NullHandler(logging.Handler):
pass pass
logger = logging.getLogger('pyagentx.vppapi') class VPPApi():
logger.addHandler(NullHandler()) def __init__(self, address='/run/vpp/api.sock'):
self.address = address
self.lock = threading.Lock()
self.connected = False
self.logger = logging.getLogger('pyagentx.vppapi')
self.logger.addHandler(NullHandler())
self.vpp = None
vpp_lock = threading.Lock() def connect(self):
self.lock.acquire()
if self.connected:
self.lock.release()
return True
vpp_json_dir = '/usr/share/vpp/api/'
def vpp_connect(): # construct a list of all the json api files
global logger jsonfiles = []
for root, dirnames, filenames in os.walk(vpp_json_dir):
for filename in fnmatch.filter(filenames, '*.api.json'):
jsonfiles.append(os.path.join(root, filename))
vpp_json_dir = '/usr/share/vpp/api/' if not jsonfiles:
self.logger.error('no json api files found')
self.lock.release()
return False
# construct a list of all the json api files self.vpp = VPPApiClient(apifiles=jsonfiles,
jsonfiles = [] server_address=self.address)
for root, dirnames, filenames in os.walk(vpp_json_dir): try:
for filename in fnmatch.filter(filenames, '*.api.json'): self.logger.info('Connecting to VPP')
jsonfiles.append(os.path.join(root, filename)) self.vpp.connect('vpp-snmp-agent')
except:
self.lock.release()
return False
if not jsonfiles: v = self.vpp.api.show_version()
logger.error('no json api files found') self.logger.info('VPP version is %s' % v.version)
return False
vpp = VPPApiClient(apifiles=jsonfiles, server_address='/run/vpp/api.sock') self.connected = True
try: self.lock.release()
vpp.connect('vpp-snmp-agent') return True
except:
return False
v = vpp.api.show_version() def disconnect(self):
logger.info('VPP version is %s' % v.version) if not self.connected:
return True
self.vpp.disconnect()
self.connected = False
return True
return vpp def get_ifaces(self):
ret = {}
if not self.connected:
return ret
self.lock.acquire()
try:
iface_list = self.vpp.api.sw_interface_dump()
except Exception as e:
self.logger.error("VPP communication error, disconnecting", e)
self.vpp.disconnect()
self.connected = False
self.lock.release()
return ret
def get_iface(vpp, ifname): if not iface_list:
global logger self.logger.error("Can't get interface list")
self.lock.release()
return ret
vpp_lock.acquire() for iface in iface_list:
iface_list = vpp.api.sw_interface_dump(name_filter=ifname, ret[iface.interface_name] = iface
name_filter_valid=True)
if not iface_list:
logger.error("Can't get interface %s" % ifname)
vpp_lock.release()
return None
for iface in iface_list: self.lock.release()
if iface.interface_name == ifname:
vpp_lock.release()
return iface
vpp_lock.release()
return None
def get_ifaces(vpp):
global logger
vpp_lock.acquire()
ret = {}
iface_list = vpp.api.sw_interface_dump()
if not iface_list:
logger.error("Can't get interface list")
vpp_lock.release()
return ret return ret
for iface in iface_list:
ret[iface.interface_name] = iface
vpp_lock.release()
return ret