Replace the pyagentx threaded version with a much simpler, non-threaded version.

This commit is contained in:
Pim van Pelt
2021-09-11 12:19:38 +00:00
parent 842bce9d6e
commit 8c9c1e2b4a
13 changed files with 507 additions and 1089 deletions

View File

@ -8,27 +8,22 @@ from __future__ import (
import logging import logging
from pyagentx.updater import Updater from agentx.agent import Agent
from pyagentx.agent import Agent from agentx.dataset import DataSet
from pyagentx.sethandler import SetHandler, SetHandlerError
def setup_logging(debug=False): def setup_logging(debug=False):
if debug: if debug:
level = logging.DEBUG level = logging.DEBUG
else: else:
level = logging.INFO level = logging.INFO
logger = logging.getLogger('pyagentx') logger = logging.getLogger('agentx')
logger.setLevel(level) logger.setLevel(level)
# formatter = logging.Formatter('%(asctime)s - %(name)20s - %(levelname)s - %(message)s') formatter = logging.Formatter('[%(levelname)-8s] %(name)17s - %(funcName)-15s: %(message)s')
formatter = logging.Formatter(
'[%(levelname)-8s] %(name)17s - %(funcName)-15s: %(message)s')
ch = logging.StreamHandler() ch = logging.StreamHandler()
ch.setLevel(level) ch.setLevel(level)
ch.setFormatter(formatter) ch.setFormatter(formatter)
logger.addHandler(ch) logger.addHandler(ch)
AGENTX_EMPTY_PDU = 1 AGENTX_EMPTY_PDU = 1
AGENTX_OPEN_PDU = 1 AGENTX_OPEN_PDU = 1
AGENTX_CLOSE_PDU = 2 AGENTX_CLOSE_PDU = 2

89
agentx/agent.py Normal file
View File

@ -0,0 +1,89 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import (
absolute_import,
division,
print_function,
)
# --------------------------------------------
import logging
class NullHandler(logging.Handler):
def emit(self, record):
pass
# --------------------------------------------
import time
import agentx
from agentx.dataset import DataSet
from agentx.network import Network
class AgentError(Exception):
pass
class Agent(object):
def __init__(self, server_address='/var/agentx/master', freq=5):
self.logger = logging.getLogger('agentx.agent')
self.logger.addHandler(NullHandler())
self._servingset = DataSet()
self._workingset = DataSet()
self._lastupdate = 0
self._update_freq = freq
self._net = Network(server_address = server_address)
self._oid_list = []
def _update(self):
ds = self.update()
self._net.update(ds._data)
self._lastupdate = time.time()
def run(self):
self.logger.info('Calling setup')
self.setup()
self.logger.info('Initial update')
self._update()
while True:
if not self._net.is_connected():
self.logger.info('Opening AgentX connection')
self._net.start(self._oid_list)
if time.time() - self._lastupdate > self._update_freq:
self._update()
try:
self._net.run()
except Exception as e:
self.logger.error('An exception occurred: %s' % e)
time.sleep(1)
def stop(self):
self.logger.debug('Stopping')
self._net.disconnect()
pass
def setup(self):
# Override this
pass
def update(self):
# Override this
pass
def register(self, oid_list):
if not isinstance(oid_list, list):
oid_list = [oid_list]
for oid in oid_list:
if not oid in self._oid_list:
self.logger.debug('Adding %s to list' % oid)
self._oid_list.append(oid)

57
agentx/dataset.py Normal file
View File

@ -0,0 +1,57 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import (
absolute_import,
division,
print_function,
)
# --------------------------------------------
import logging
class NullHandler(logging.Handler):
def emit(self, record):
pass
logger = logging.getLogger('agentx.dataset')
logger.addHandler(NullHandler())
# --------------------------------------------
import time
import agentx
class DataSet():
def __init__(self):
self._data = {}
def set(self, oid, oid_type, value):
if oid_type.startswith('int'):
t = agentx.TYPE_INTEGER
elif oid_type.startswith('str'):
t = agentx.TYPE_OCTETSTRING
elif oid_type.startswith('oid'):
t = agentx.TYPE_OBJECTIDENTIFIER
elif oid_type.startswith('ip'):
t = agentx.TYPE_IPADDRESS
elif oid_type == 'counter32' or oid_type == 'uint32' or oid_type == 'u32':
t = agentx.TYPE_COUNTER32
elif oid_type == 'gauge32':
t = agentx.TYPE_GAUGE32
elif oid_type.startswith('time') or oid_type.startswith('tick'):
t = agentx.TYPE_TIMETICKS
elif oid_type.startswith('opaque'):
t = agentx.TYPE_OPAQUE
elif oid_type == 'counter64' or oid_type == 'uint64' or oid_type == 'u64':
t = agentx.TYPE_COUNTER64
else:
logger.error('Invalid oid_type: %s' % (oid_type))
return
self._data[oid] = {
'name': oid,
'type': t,
'value': value
}

220
agentx/network.py Normal file
View File

@ -0,0 +1,220 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import (
absolute_import,
division,
print_function,
)
# --------------------------------------------
import logging
class NullHandler(logging.Handler):
def emit(self, record):
pass
logger = logging.getLogger('agentx.network')
logger.addHandler(NullHandler())
class NetworkError(Exception):
pass
# --------------------------------------------
import socket
import time
import agentx
from agentx.pdu import PDU
class Network():
def __init__(self, server_address = '/var/agentx/master'):
self.session_id = 0
self.transaction_id = 0
self.debug = 1
# Data Related Variables
self.data = {}
self.data_idx = []
self._connected = False
self._server_address = server_address
self._timeout = 0.1 # Seconds
def connect(self):
if self._connected:
return
try:
logger.info("Connecting to %s" % self._server_address)
if self._server_address.startswith('/'):
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.connect(self._server_address)
else:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect(self._server_address.split(':'))
self.socket.settimeout(self._timeout)
self._connected = True
except socket.error:
logger.error("Failed to connect to %s" % self._server_address)
self._connected = False
def disconnect(self):
if not self._connected:
return
logger.info("Disconnecting from %s" % self._server_address)
self.socket.close()
self.socket = None
self._connected = False
return
def update(self, newdata):
if len(self.data) == 0:
logger.debug("Setting initial serving dataset (%d OIDs)" % len(newdata))
else:
logger.debug("Replacing serving dataset (%d OIDs)" % len(newdata))
self.data = newdata
self.data_idx = sorted(self.data.keys(), key=lambda k: tuple(int(part) for part in k.split('.')))
def new_pdu(self, type):
pdu = PDU(type)
pdu.session_id = self.session_id
pdu.transaction_id = self.transaction_id
self.transaction_id += 1
return pdu
def response_pdu(self, org_pdu):
pdu = PDU(agentx.AGENTX_RESPONSE_PDU)
pdu.session_id = org_pdu.session_id
pdu.transaction_id = org_pdu.transaction_id
pdu.packet_id = org_pdu.packet_id
return pdu
def send_pdu(self, pdu):
if self.debug: pdu.dump()
self.socket.send(pdu.encode())
def recv_pdu(self):
buf = self.socket.recv(1024)
if not buf: return None
pdu = PDU()
pdu.decode(buf)
if self.debug: pdu.dump()
return pdu
# =========================================
def _get_next_oid(self, oid, endoid):
if oid in self.data:
# Exact match found
#logger.debug('get_next_oid, exact match of %s' % oid)
idx = self.data_idx.index(oid)
if idx == (len(self.data_idx) - 1):
# Last Item in MIB, No match!
return None
return self.data_idx[idx + 1]
else:
# No exact match, find prefix
#logger.debug('get_next_oid, no exact match of %s' % oid)
slist = oid.split('.')
elist = endoid.split('.')
for tmp_oid in self.data_idx:
tlist = tmp_oid.split('.')
for i in range(len(tlist)):
try:
sok = int(slist[i]) <= int(tlist[i])
eok = int(elist[i]) >= int(tlist[i])
if not (sok and eok):
break
except IndexError:
pass
if sok and eok:
return tmp_oid
return None # No match!
def start(self, oid_list):
self.connect()
if not self._connected:
return
logger.debug("==== Open PDU ====")
pdu = self.new_pdu(agentx.AGENTX_OPEN_PDU)
self.send_pdu(pdu)
pdu = self.recv_pdu()
self.session_id = pdu.session_id
logger.debug("==== Ping PDU ====")
pdu = self.new_pdu(agentx.AGENTX_PING_PDU)
self.send_pdu(pdu)
pdu = self.recv_pdu()
logger.debug("==== Register PDU ====")
for oid in oid_list:
logger.info("Registering: %s" % (oid))
pdu = self.new_pdu(agentx.AGENTX_REGISTER_PDU)
pdu.oid = oid
self.send_pdu(pdu)
pdu = self.recv_pdu()
return
def stop(self):
self.disconnect()
def is_connected(self):
return self._connected
def run(self, timeout = 0.1):
if not self._connected:
raise NetworkError('Not connected')
if timeout != self._timeout:
self.socket.settimeout(timeout)
self._timeout = timeout
try:
request = self.recv_pdu()
except socket.timeout:
return
if not request:
logger.error("Empty PDU, connection closed!")
self.disconnect()
raise NetworkError("Empty PDU, disconnecting")
response = self.response_pdu(request)
if request.type == agentx.AGENTX_GET_PDU:
logger.debug("Received GET PDU")
for rvalue in request.range_list:
oid = rvalue[0]
logger.debug("OID: %s" % (oid))
if oid in self.data:
logger.debug("OID Found")
response.values.append(self.data[oid])
else:
logger.debug("OID Not Found!")
response.values.append({
'type': agentx.TYPE_NOSUCHOBJECT,
'name': rvalue[0],
'value': 0
})
elif request.type == agentx.AGENTX_GETNEXT_PDU:
logger.debug("Received GET_NEXT PDU")
for rvalue in request.range_list:
oid = self._get_next_oid(rvalue[0], rvalue[1])
logger.debug("GET_NEXT: %s => %s" % (rvalue[0], oid))
if oid:
response.values.append(self.data[oid])
else:
response.values.append({
'type': agentx.TYPE_ENDOFMIBVIEW,
'name': rvalue[0],
'value': 0
})
else:
logger.warn("Received unsupported PDU %d" % request.type)
self.send_pdu(response)

View File

@ -15,14 +15,14 @@ class NullHandler(logging.Handler):
pass pass
logger = logging.getLogger('pyagentx.pdu') logger = logging.getLogger('agentx.pdu')
logger.addHandler(NullHandler()) logger.addHandler(NullHandler())
# -------------------------------------------- # --------------------------------------------
import struct import struct
import pprint import pprint
import pyagentx import agentx
class PDU(object): class PDU(object):
@ -31,14 +31,14 @@ class PDU(object):
self.session_id = 0 self.session_id = 0
self.transaction_id = 0 self.transaction_id = 0
self.packet_id = 0 self.packet_id = 0
self.error = pyagentx.ERROR_NOAGENTXERROR self.error = agentx.ERROR_NOAGENTXERROR
self.error_index = 0 self.error_index = 0
self.decode_buf = '' self.decode_buf = ''
self.state = {} self.state = {}
self.values = [] self.values = []
def dump(self): def dump(self):
name = pyagentx.PDU_TYPE_NAME[self.type] name = agentx.PDU_TYPE_NAME[self.type]
logger.debug('PDU DUMP: New PDU') logger.debug('PDU DUMP: New PDU')
logger.debug( logger.debug(
'PDU DUMP: Meta : [%s: %d %d %d]' % 'PDU DUMP: Meta : [%s: %d %d %d]' %
@ -85,25 +85,25 @@ class PDU(object):
def encode_value(self, type, name, value): def encode_value(self, type, name, value):
buf = struct.pack('!HH', type, 0) buf = struct.pack('!HH', type, 0)
buf += self.encode_oid(name) buf += self.encode_oid(name)
if type in [pyagentx.TYPE_INTEGER]: if type in [agentx.TYPE_INTEGER]:
buf += struct.pack('!l', value) buf += struct.pack('!l', value)
elif type in [ elif type in [
pyagentx.TYPE_COUNTER32, pyagentx.TYPE_GAUGE32, agentx.TYPE_COUNTER32, agentx.TYPE_GAUGE32,
pyagentx.TYPE_TIMETICKS agentx.TYPE_TIMETICKS
]: ]:
buf += struct.pack('!L', value) buf += struct.pack('!L', value)
elif type in [pyagentx.TYPE_COUNTER64]: elif type in [agentx.TYPE_COUNTER64]:
buf += struct.pack('!Q', value) buf += struct.pack('!Q', value)
elif type in [pyagentx.TYPE_OBJECTIDENTIFIER]: elif type in [agentx.TYPE_OBJECTIDENTIFIER]:
buf += self.encode_oid(value) buf += self.encode_oid(value)
elif type in [ elif type in [
pyagentx.TYPE_IPADDRESS, pyagentx.TYPE_OPAQUE, agentx.TYPE_IPADDRESS, agentx.TYPE_OPAQUE,
pyagentx.TYPE_OCTETSTRING agentx.TYPE_OCTETSTRING
]: ]:
buf += self.encode_octet(value) buf += self.encode_octet(value)
elif type in [ elif type in [
pyagentx.TYPE_NULL, pyagentx.TYPE_NOSUCHOBJECT, agentx.TYPE_NULL, agentx.TYPE_NOSUCHOBJECT,
pyagentx.TYPE_NOSUCHINSTANCE, pyagentx.TYPE_ENDOFMIBVIEW agentx.TYPE_NOSUCHINSTANCE, agentx.TYPE_ENDOFMIBVIEW
]: ]:
# No data # No data
pass pass
@ -122,7 +122,7 @@ class PDU(object):
def encode(self): def encode(self):
buf = b'' buf = b''
if self.type == pyagentx.AGENTX_OPEN_PDU: if self.type == agentx.AGENTX_OPEN_PDU:
# timeout # timeout
buf += struct.pack('!BBBB', 5, 0, 0, 0) buf += struct.pack('!BBBB', 5, 0, 0, 0)
# agent OID # agent OID
@ -130,11 +130,11 @@ class PDU(object):
# Agent Desc # Agent Desc
buf += self.encode_octet('MyAgent') buf += self.encode_octet('MyAgent')
elif self.type == pyagentx.AGENTX_PING_PDU: elif self.type == agentx.AGENTX_PING_PDU:
# No extra data # No extra data
pass pass
elif self.type == pyagentx.AGENTX_REGISTER_PDU: elif self.type == agentx.AGENTX_REGISTER_PDU:
range_subid = 0 range_subid = 0
timeout = 5 timeout = 5
priority = 127 priority = 127
@ -142,7 +142,7 @@ class PDU(object):
# Sub Tree # Sub Tree
buf += self.encode_oid(self.oid) buf += self.encode_oid(self.oid)
elif self.type == pyagentx.AGENTX_RESPONSE_PDU: elif self.type == agentx.AGENTX_RESPONSE_PDU:
buf += struct.pack('!LHH', 0, self.error, self.error_index) buf += struct.pack('!LHH', 0, self.error, self.error_index)
for value in self.values: for value in self.values:
buf += self.encode_value(value['type'], value['name'], buf += self.encode_value(value['type'], value['name'],
@ -217,26 +217,26 @@ class PDU(object):
logger.exception('Invalid packing value header') logger.exception('Invalid packing value header')
oid, _ = self.decode_oid() oid, _ = self.decode_oid()
if vtype in [ if vtype in [
pyagentx.TYPE_INTEGER, pyagentx.TYPE_COUNTER32, agentx.TYPE_INTEGER, agentx.TYPE_COUNTER32,
pyagentx.TYPE_GAUGE32, pyagentx.TYPE_TIMETICKS agentx.TYPE_GAUGE32, agentx.TYPE_TIMETICKS
]: ]:
data = struct.unpack('!L', self.decode_buf[:4]) data = struct.unpack('!L', self.decode_buf[:4])
data = data[0] data = data[0]
self.decode_buf = self.decode_buf[4:] self.decode_buf = self.decode_buf[4:]
elif vtype in [pyagentx.TYPE_COUNTER64]: elif vtype in [agentx.TYPE_COUNTER64]:
data = struct.unpack('!Q', self.decode_buf[:8]) data = struct.unpack('!Q', self.decode_buf[:8])
data = data[0] data = data[0]
self.decode_buf = self.decode_buf[8:] self.decode_buf = self.decode_buf[8:]
elif vtype in [pyagentx.TYPE_OBJECTIDENTIFIER]: elif vtype in [agentx.TYPE_OBJECTIDENTIFIER]:
data, _ = self.decode_oid() data, _ = self.decode_oid()
elif vtype in [ elif vtype in [
pyagentx.TYPE_IPADDRESS, pyagentx.TYPE_OPAQUE, agentx.TYPE_IPADDRESS, agentx.TYPE_OPAQUE,
pyagentx.TYPE_OCTETSTRING agentx.TYPE_OCTETSTRING
]: ]:
data = self.decode_octet() data = self.decode_octet()
elif vtype in [ elif vtype in [
pyagentx.TYPE_NULL, pyagentx.TYPE_NOSUCHOBJECT, agentx.TYPE_NULL, agentx.TYPE_NOSUCHOBJECT,
pyagentx.TYPE_NOSUCHINSTANCE, pyagentx.TYPE_ENDOFMIBVIEW agentx.TYPE_NOSUCHINSTANCE, agentx.TYPE_ENDOFMIBVIEW
]: ]:
# No data # No data
data = None data = None
@ -251,7 +251,7 @@ class PDU(object):
ret = { ret = {
'version': t[0], 'version': t[0],
'pdu_type': t[1], 'pdu_type': t[1],
'pdu_type_name': pyagentx.PDU_TYPE_NAME[t[1]], 'pdu_type_name': agentx.PDU_TYPE_NAME[t[1]],
'flags': t[2], 'flags': t[2],
'reserved': t[3], 'reserved': t[3],
'session_id': t[4], 'session_id': t[4],
@ -276,14 +276,14 @@ class PDU(object):
def decode(self, buf): def decode(self, buf):
self.set_decode_buf(buf) self.set_decode_buf(buf)
ret = self.decode_header() ret = self.decode_header()
if ret['pdu_type'] == pyagentx.AGENTX_RESPONSE_PDU: if ret['pdu_type'] == agentx.AGENTX_RESPONSE_PDU:
# Decode Response Header # Decode Response Header
t = struct.unpack('!LHH', self.decode_buf[:8]) t = struct.unpack('!LHH', self.decode_buf[:8])
self.decode_buf = self.decode_buf[8:] self.decode_buf = self.decode_buf[8:]
self.response = { self.response = {
'sysUpTime': t[0], 'sysUpTime': t[0],
'error': t[1], 'error': t[1],
'error_name': pyagentx.ERROR_NAMES[t[1]], 'error_name': agentx.ERROR_NAMES[t[1]],
'index': t[2], 'index': t[2],
} }
# Decode VarBindList # Decode VarBindList
@ -291,23 +291,23 @@ class PDU(object):
while len(self.decode_buf): while len(self.decode_buf):
self.values.append(self.decode_value()) self.values.append(self.decode_value())
elif ret['pdu_type'] == pyagentx.AGENTX_GET_PDU: elif ret['pdu_type'] == agentx.AGENTX_GET_PDU:
self.range_list = self.decode_search_range_list() self.range_list = self.decode_search_range_list()
elif ret['pdu_type'] == pyagentx.AGENTX_GETNEXT_PDU: elif ret['pdu_type'] == agentx.AGENTX_GETNEXT_PDU:
self.range_list = self.decode_search_range_list() self.range_list = self.decode_search_range_list()
elif ret['pdu_type'] == pyagentx.AGENTX_TESTSET_PDU: elif ret['pdu_type'] == agentx.AGENTX_TESTSET_PDU:
# Decode VarBindList # Decode VarBindList
self.values = [] self.values = []
while len(self.decode_buf): while len(self.decode_buf):
self.values.append(self.decode_value()) self.values.append(self.decode_value())
elif ret['pdu_type'] in [ elif ret['pdu_type'] in [
pyagentx.AGENTX_COMMITSET_PDU, pyagentx.AGENTX_UNDOSET_PDU, agentx.AGENTX_COMMITSET_PDU, agentx.AGENTX_UNDOSET_PDU,
pyagentx.AGENTX_CLEANUPSET_PDU agentx.AGENTX_CLEANUPSET_PDU
]: ]:
pass pass
else: else:
pdu_type_str = pyagentx.PDU_TYPE_NAME.get( pdu_type_str = agentx.PDU_TYPE_NAME.get(
ret['pdu_type'], 'Unknown:' + str(ret['pdu_type'])) ret['pdu_type'], 'Unknown:' + str(ret['pdu_type']))
logger.error('Unsupported PDU type:' + pdu_type_str) logger.error('Unsupported PDU type:' + pdu_type_str)

1
pyagentx/.gitignore vendored
View File

@ -1 +0,0 @@
__pycache__

View File

@ -1,97 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import (
absolute_import,
division,
print_function,
)
# --------------------------------------------
import logging
class NullHandler(logging.Handler):
def emit(self, record):
pass
logger = logging.getLogger('pyagentx.agent')
logger.addHandler(NullHandler())
# --------------------------------------------
import time
import inspect
try:
import queue
except ImportError:
import Queue as queue
import pyagentx
from pyagentx.updater import Updater
from pyagentx.network import Network
class AgentError(Exception):
pass
class Agent(object):
def __init__(self, server_address='/var/agentx/master'):
self._updater_list = []
self._sethandlers = {}
self._threads = []
self._server_address = server_address
def register(self, oid, class_, freq=10):
if Updater not in inspect.getmro(class_):
raise AgentError('Class given isn\'t an updater')
# cleanup and test oid
try:
oid = oid.strip(' .')
[int(i) for i in oid.split('.')]
except ValueError:
raise AgentError('OID isn\'t valid')
self._updater_list.append({'oid': oid, 'class': class_, 'freq': freq})
def register_set(self, oid, class_):
if pyagentx.SetHandler not in class_.__bases__:
raise AgentError('Class given isn\'t a SetHandler')
# cleanup and test oid
try:
oid = oid.strip(' .')
[int(i) for i in oid.split('.')]
except ValueError:
raise AgentError('OID isn\'t valid')
self._sethandlers[oid] = class_()
def setup(self):
# Override this
pass
def start(self):
update_queue = queue.Queue(maxsize=20000) # 1000 interfaces, 20 variables each
self.setup()
# Start Updaters
for u in self._updater_list:
logger.debug('Starting updater [%s]' % u['oid'])
t = u['class']()
t.agent_setup(update_queue, u['oid'], u['freq'])
t.start()
self._threads.append(t)
# Start Network
oid_list = [u['oid'] for u in self._updater_list]
t = Network(update_queue, oid_list, self._sethandlers, self._server_address)
t.start()
self._threads.append(t)
# Do nothing ... just wait for someone to stop you
while True:
#logger.debug('Agent Sleeping ...')
time.sleep(1)
def stop(self):
logger.debug('Stop threads')
for t in self._threads:
t.stop.set()
logger.debug('Wait for updater')
for t in self._threads:
t.join()

View File

@ -1,272 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import (
absolute_import,
division,
print_function,
)
# --------------------------------------------
import logging
class NullHandler(logging.Handler):
def emit(self, record):
pass
logger = logging.getLogger('pyagentx.network')
logger.addHandler(NullHandler())
# --------------------------------------------
import socket
import time
import threading
try:
import queue
except ImportError:
import Queue as queue
import pyagentx
from pyagentx.pdu import PDU
class Network(threading.Thread):
def __init__(self, update_queue, oid_list, sethandlers, server_address):
threading.Thread.__init__(self)
self.stop = threading.Event()
self._queue = update_queue
self._oid_list = oid_list
self._sethandlers = sethandlers
self._server_address = server_address
self.session_id = 0
self.transaction_id = 0
self.debug = 1
# Data Related Variables
self.data = {}
self.data_idx = []
def _connect(self):
while True:
try:
logger.info("Connecting to %s", self._server_address)
if self._server_address.startswith('/'):
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.connect(self._server_address)
else:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect(self._server_address.split(':'))
self.socket.settimeout(0.1)
return
except socket.error:
logger.error("Failed to connect, sleeping and retrying later")
time.sleep(2)
def new_pdu(self, type):
pdu = PDU(type)
pdu.session_id = self.session_id
pdu.transaction_id = self.transaction_id
self.transaction_id += 1
return pdu
def response_pdu(self, org_pdu):
pdu = PDU(pyagentx.AGENTX_RESPONSE_PDU)
pdu.session_id = org_pdu.session_id
pdu.transaction_id = org_pdu.transaction_id
pdu.packet_id = org_pdu.packet_id
return pdu
def send_pdu(self, pdu):
if self.debug: pdu.dump()
self.socket.send(pdu.encode())
def recv_pdu(self):
buf = self.socket.recv(1024)
if not buf: return None
pdu = PDU()
pdu.decode(buf)
if self.debug: pdu.dump()
return pdu
# =========================================
def _get_updates(self):
while True:
try:
item = self._queue.get_nowait()
logger.debug('New update')
update_oid = item['oid']
update_data = item['data']
# clear values with prefix oid
for oid in list(self.data.keys()):
if oid.startswith(update_oid):
del (self.data[oid])
# insert updated value
for row in update_data.values():
oid = "%s.%s" % (update_oid, row['name'])
self.data[oid] = {
'name': oid,
'type': row['type'],
'value': row['value']
}
# recalculate reverse index if data changed
self.data_idx = sorted(
self.data.keys(),
key=lambda k: tuple(int(part) for part in k.split('.')))
except queue.Empty:
break
def _get_next_oid(self, oid, endoid):
if oid in self.data:
# Exact match found
#logger.debug('get_next_oid, exact match of %s' % oid)
idx = self.data_idx.index(oid)
if idx == (len(self.data_idx) - 1):
# Last Item in MIB, No match!
return None
return self.data_idx[idx + 1]
else:
# No exact match, find prefix
#logger.debug('get_next_oid, no exact match of %s' % oid)
slist = oid.split('.')
elist = endoid.split('.')
for tmp_oid in self.data_idx:
tlist = tmp_oid.split('.')
for i in range(len(tlist)):
try:
sok = int(slist[i]) <= int(tlist[i])
eok = int(elist[i]) >= int(tlist[i])
if not (sok and eok):
break
except IndexError:
pass
if sok and eok:
return tmp_oid
return None # No match!
def start(self):
while True:
try:
self._start_network()
except socket.error:
logger.error("Network error, master disconnect?!")
def _start_network(self):
self._connect()
logger.info("==== Open PDU ====")
pdu = self.new_pdu(pyagentx.AGENTX_OPEN_PDU)
self.send_pdu(pdu)
pdu = self.recv_pdu()
self.session_id = pdu.session_id
logger.info("==== Ping PDU ====")
pdu = self.new_pdu(pyagentx.AGENTX_PING_PDU)
self.send_pdu(pdu)
pdu = self.recv_pdu()
logger.info("==== Register PDU ====")
for oid in self._oid_list:
logger.info("Registering: %s" % (oid))
pdu = self.new_pdu(pyagentx.AGENTX_REGISTER_PDU)
pdu.oid = oid
self.send_pdu(pdu)
pdu = self.recv_pdu()
logger.info("==== Waiting for PDU ====")
while True:
try:
self._get_updates()
request = self.recv_pdu()
except socket.timeout:
continue
if not request:
logger.error("Empty PDU, connection closed!")
raise socket.error
response = self.response_pdu(request)
if request.type == pyagentx.AGENTX_GET_PDU:
logger.debug("Received GET PDU")
for rvalue in request.range_list:
oid = rvalue[0]
logger.debug("OID: %s" % (oid))
if oid in self.data:
logger.debug("OID Found")
response.values.append(self.data[oid])
else:
logger.debug("OID Not Found!")
response.values.append({
'type': pyagentx.TYPE_NOSUCHOBJECT,
'name': rvalue[0],
'value': 0
})
elif request.type == pyagentx.AGENTX_GETNEXT_PDU:
logger.debug("Received GET_NEXT PDU")
for rvalue in request.range_list:
oid = self._get_next_oid(rvalue[0], rvalue[1])
logger.debug("GET_NEXT: %s => %s" % (rvalue[0], oid))
if oid:
response.values.append(self.data[oid])
else:
response.values.append({
'type': pyagentx.TYPE_ENDOFMIBVIEW,
'name': rvalue[0],
'value': 0
})
elif request.type == pyagentx.AGENTX_TESTSET_PDU:
logger.info("Received TESTSET PDU")
idx = 0
for row in request.values:
idx += 1
oid = row['name']
type_ = pyagentx.TYPE_NAME.get(row['type'], 'Unknown type')
value = row['data']
logger.info("Name: [%s] Type: [%s] Value: [%s]" %
(oid, type_, value))
# Find matching sethandler
matching_oid = ''
for target_oid in self._sethandlers:
if oid.startswith(target_oid):
matching_oid = target_oid
break
if matching_oid == '':
logger.debug(
'TestSet request failed: not writeable #%s' % idx)
response.error = pyagentx.ERROR_NOTWRITABLE
response.error_index = idx
break
try:
self._sethandlers[matching_oid].network_test(
request.session_id, request.transaction_id, oid,
row['data'])
except pyagentx.SetHandlerError:
logger.debug(
'TestSet request failed: wrong value #%s' % idx)
response.error = pyagentx.ERROR_WRONGVALUE
response.error_index = idx
break
logger.debug('TestSet request passed')
elif request.type == pyagentx.AGENTX_COMMITSET_PDU:
for handler in self._sethandlers.values():
handler.network_commit(request.session_id,
request.transaction_id)
logger.info("Received COMMITSET PDU")
elif request.type == pyagentx.AGENTX_UNDOSET_PDU:
for handler in self._sethandlers.values():
handler.network_undo(request.session_id,
request.transaction_id)
logger.info("Received UNDOSET PDU")
elif request.type == pyagentx.AGENTX_CLEANUPSET_PDU:
for handler in self._sethandlers.values():
handler.network_cleanup(request.session_id,
request.transaction_id)
logger.info("Received CLEANUP PDU")
self.send_pdu(response)

View File

@ -1,67 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import (
absolute_import,
division,
print_function,
)
# --------------------------------------------
import logging
class NullHandler(logging.Handler):
def emit(self, record):
pass
logger = logging.getLogger('pyagentx.sethandler')
logger.addHandler(NullHandler())
# --------------------------------------------
class SetHandlerError(Exception):
pass
class SetHandler(object):
def __init__(self):
self.transactions = {}
def network_test(self, session_id, transaction_id, oid, data):
tid = "%s_%s" % (session_id, transaction_id)
if tid in self.transactions:
del (self.transactions[tid])
try:
self.test(oid, data)
self.transactions[tid] = oid, data
except SetHandler as e:
logger.error('TestSet failed')
raise e
def network_commit(self, session_id, transaction_id):
tid = "%s_%s" % (session_id, transaction_id)
try:
oid, data = self.transactions[tid]
self.commit(oid, data)
if tid in self.transactions:
del (self.transactions[tid])
except:
logger.error('CommitSet failed')
def network_undo(self, session_id, transaction_id):
tid = "%s_%s" % (session_id, transaction_id)
if tid in self.transactions:
del (self.transactions[tid])
def network_cleanup(self, session_id, transaction_id):
tid = "%s_%s" % (session_id, transaction_id)
if tid in self.transactions:
del (self.transactions[tid])
# User override these
def test(self, oid, data):
pass
def commit(self, oid, data):
pass

View File

@ -1,137 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import (
absolute_import,
division,
print_function,
)
# --------------------------------------------
import logging
class NullHandler(logging.Handler):
def emit(self, record):
pass
logger = logging.getLogger('pyagentx.updater')
logger.addHandler(NullHandler())
# --------------------------------------------
import time
import threading
try:
import queue
except ImportError:
import Queue as queue
import pyagentx
class Updater(threading.Thread):
def agent_setup(self, queue, oid, freq):
self.stop = threading.Event()
self._queue = queue
self._oid = oid
self._freq = freq
self._data = {}
def run(self):
start_time = 0
while True:
if self.stop.is_set(): break
now = time.time()
if now - start_time > self._freq:
logger.info('Updating : %s (%s)' %
(self.__class__.__name__, self._oid))
start_time = now
self._data = {}
try:
self.update()
self._queue.put_nowait({
'oid': self._oid,
'data': self._data
})
except queue.Full:
logger.error('Queue full')
except:
logger.exception('Unhandled update exception')
time.sleep(0.1)
logger.info('Updater stopping')
# Override this
def update(self):
pass
def set_INTEGER(self, oid, value):
logger.debug('Setting INTEGER %s = %s' % (oid, value))
self._data[oid] = {
'name': oid,
'type': pyagentx.TYPE_INTEGER,
'value': value
}
def set_OCTETSTRING(self, oid, value):
logger.debug('Setting OCTETSTRING %s = %s' % (oid, value))
self._data[oid] = {
'name': oid,
'type': pyagentx.TYPE_OCTETSTRING,
'value': value
}
def set_OBJECTIDENTIFIER(self, oid, value):
logger.debug('Setting OBJECTIDENTIFIER %s = %s' % (oid, value))
self._data[oid] = {
'name': oid,
'type': pyagentx.TYPE_OBJECTIDENTIFIER,
'value': value
}
def set_IPADDRESS(self, oid, value):
logger.debug('Setting IPADDRESS %s = %s' % (oid, value))
self._data[oid] = {
'name': oid,
'type': pyagentx.TYPE_IPADDRESS,
'value': value
}
def set_COUNTER32(self, oid, value):
logger.debug('Setting COUNTER32 %s = %s' % (oid, value))
self._data[oid] = {
'name': oid,
'type': pyagentx.TYPE_COUNTER32,
'value': value
}
def set_GAUGE32(self, oid, value):
logger.debug('Setting GAUGE32 %s = %s' % (oid, value))
self._data[oid] = {
'name': oid,
'type': pyagentx.TYPE_GAUGE32,
'value': value
}
def set_TIMETICKS(self, oid, value):
logger.debug('Setting TIMETICKS %s = %s' % (oid, value))
self._data[oid] = {
'name': oid,
'type': pyagentx.TYPE_TIMETICKS,
'value': value
}
def set_OPAQUE(self, oid, value):
logger.debug('Setting OPAQUE %s = %s' % (oid, value))
self._data[oid] = {
'name': oid,
'type': pyagentx.TYPE_OPAQUE,
'value': value
}
def set_COUNTER64(self, oid, value):
logger.debug('Setting COUNTER64 %s = %s' % (oid, value))
self._data[oid] = {
'name': oid,
'type': pyagentx.TYPE_COUNTER64,
'value': value
}

View File

@ -3,515 +3,151 @@
from vppstats import VPPStats from vppstats import VPPStats
from vppapi import VPPApi from vppapi import VPPApi
import time import agentx
import pyagentx
import logging class MyAgent(agentx.Agent):
import threading def setup(self):
global vppstat, vpp, logger
self.logger.info("Connecting to VPP Stats...")
vppstat = VPPStats(socketname='/run/vpp/stats.sock', timeout=2)
if not vppstat.connect():
self.logger.error("Can't connect to VPP Stats API, bailing")
return False
vpp = VPPApi(clientname='vpp-snmp-agent')
if not vpp.connect():
logger.error("Can't connect to VPP API, bailing")
return False
self.register('1.3.6.1.2.1.2.2.1')
self.register('1.3.6.1.2.1.31.1.1.1')
return True
class NullHandler(logging.Handler):
def emit(self, record):
pass
logger = logging.getLogger('pyagentx.vppstats')
logger.addHandler(NullHandler())
class ifName(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_OCTETSTRING(str(i + 1), vppstat['/if/names'][i])
class ifIndex(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_INTEGER(str(i + 1), i + 1)
class ifType(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
t = 6 # ethermet-csmacd
if vppstat['/if/names'][i].startswith("loop"):
t = 24 # softwareLoopback
self.set_INTEGER(str(i + 1), t)
class ifMtu(pyagentx.Updater):
def update(self): def update(self):
global vppstat, vpp global vppstat, vpp
vppstat.connect() vppstat.connect()
vpp.connect() vpp.connect()
ds = agentx.DataSet()
ifaces = vpp.get_ifaces() ifaces = vpp.get_ifaces()
self.logger.debug("%d VPP interfaces retrieved" % len(ifaces))
self.logger.debug("%d VPP Stats interfaces retrieved" % len(vppstat['/if/names']))
for i in range(len(vppstat['/if/names'])): for i in range(len(vppstat['/if/names'])):
ifname = vppstat['/if/names'][i] ifname = vppstat['/if/names'][i]
idx = 1000+i
ds.set('1.3.6.1.2.1.2.2.1.1.%u' % (idx), 'int', idx)
ds.set('1.3.6.1.2.1.2.2.1.2.%u' % (idx), 'str', ifname)
if ifname.startswith("loop"):
ds.set('1.3.6.1.2.1.2.2.1.3.%u' % (idx), 'int', 24) # softwareLoopback
else:
ds.set('1.3.6.1.2.1.2.2.1.3.%u' % (idx), 'int', 6) # ethermet-csmacd
mtu = 0 mtu = 0
if not ifname in ifaces: if not ifname in ifaces:
logger.warning("Could not get MTU for interface %s", ifname) self.logger.warning("Could not get MTU for interface %s", ifname)
else: else:
mtu = ifaces[ifname].mtu[0] mtu = ifaces[ifname].mtu[0]
self.set_INTEGER(str(i + 1), mtu) ds.set('1.3.6.1.2.1.2.2.1.4.%u' % (idx), 'int', mtu)
class ifSpeed(pyagentx.Updater):
def update(self):
global vppstat, vpp
vppstat.connect()
vpp.connect()
ifaces = vpp.get_ifaces()
for i in range(len(vppstat['/if/names'])):
ifname = vppstat['/if/names'][i]
speed = 0 speed = 0
if ifname.startswith("loop") or ifname.startswith("tap"): if ifname.startswith("loop") or ifname.startswith("tap"):
speed = 1000000000 speed = 1000000000
elif not ifname in ifaces: elif not ifname in ifaces:
logger.warning("Could not get link speed for interface %s", self.logger.warning("Could not get link speed for interface %s", ifname)
ifname)
else: else:
speed = ifaces[ifname].link_speed * 1000 speed = ifaces[ifname].link_speed * 1000
if speed >= 2**32: if speed >= 2**32:
speed = 2**32 - 1 speed = 2**32 - 1
self.set_GAUGE32(str(i + 1), speed) ds.set('1.3.6.1.2.1.2.2.1.5.%u' % (idx), 'gauge32', speed)
class ifAdminStatus(pyagentx.Updater):
def update(self):
global vppstat, vpp
vppstat.connect()
vpp.connect()
ifaces = vpp.get_ifaces()
for i in range(len(vppstat['/if/names'])):
ifname = vppstat['/if/names'][i]
state = 3 # testing
if not ifname in ifaces:
logger.warning("Could not get AdminStatus for interface %s",
ifname)
else:
if int(ifaces[ifname].flags) & 2:
state = 1 # up
else:
state = 2 # down
self.set_INTEGER(str(i + 1), state)
class ifOperStatus(pyagentx.Updater):
def update(self):
global vppstat, vpp
vppstat.connect()
vpp.connect()
ifaces = vpp.get_ifaces()
for i in range(len(vppstat['/if/names'])):
ifname = vppstat['/if/names'][i]
state = 3 # testing
if not ifname in ifaces:
logger.warning("Could not get OperStatus for interface %s",
ifname)
else:
if int(ifaces[ifname].flags) & 1:
state = 1 # up
else:
state = 2 # down
self.set_INTEGER(str(i + 1), state)
class ifPhysAddress(pyagentx.Updater):
def update(self):
global vppstat, vpp
vppstat.connect()
vpp.connect()
ifaces = vpp.get_ifaces()
for i in range(len(vppstat['/if/names'])):
ifname = vppstat['/if/names'][i]
mac = "00:00:00:00:00:00" mac = "00:00:00:00:00:00"
if not ifname in ifaces: if not ifname in ifaces:
logger.warning("Could not get PhysAddress for interface %s", self.logger.warning("Could not get PhysAddress for interface %s", ifname)
ifname)
else: else:
mac = str(ifaces[ifname].l2_address) mac = str(ifaces[ifname].l2_address)
self.set_OCTETSTRING(str(i + 1), mac) ds.set('1.3.6.1.2.1.2.2.1.6.%u' % (idx), 'str', mac)
admin_status = 3 # testing
if not ifname in ifaces:
self.logger.warning("Could not get AdminStatus for interface %s", ifname)
else:
if int(ifaces[ifname].flags) & 2:
admin_status = 1 # up
else:
admin_status = 2 # down
ds.set('1.3.6.1.2.1.2.2.1.7.%u' % (idx), 'int', admin_status)
class ifAlias(pyagentx.Updater): oper_status = 3 # testing
def update(self): if not ifname in ifaces:
global vppstat self.logger.warning("Could not get OperStatus for interface %s", ifname)
vppstat.connect() else:
if int(ifaces[ifname].flags) & 1:
oper_status = 1 # up
else:
oper_status = 2 # down
ds.set('1.3.6.1.2.1.2.2.1.8.%u' % (idx), 'int', oper_status)
for i in range(len(vppstat['/if/names'])): ds.set('1.3.6.1.2.1.2.2.1.9.%u' % (idx), 'ticks', 0)
self.set_OCTETSTRING(str(i + 1), vppstat['/if/names'][i]) ds.set('1.3.6.1.2.1.2.2.1.10.%u' % (idx), 'u32', vppstat['/if/rx'][:, i].sum_octets() % 2**32)
ds.set('1.3.6.1.2.1.2.2.1.11.%u' % (idx), 'u32', vppstat['/if/rx'][:, i].sum_packets() % 2**32)
ds.set('1.3.6.1.2.1.2.2.1.12.%u' % (idx), 'u32', vppstat['/if/rx-multicast'][:, i].sum_packets() % 2**32)
ds.set('1.3.6.1.2.1.2.2.1.13.%u' % (idx), 'u32', vppstat['/if/rx-no-buf'][:, i].sum() % 2**32)
ds.set('1.3.6.1.2.1.2.2.1.14.%u' % (idx), 'u32', vppstat['/if/rx-error'][:, i].sum() % 2**32)
ds.set('1.3.6.1.2.1.2.2.1.16.%u' % (idx), 'u32', vppstat['/if/tx'][:, i].sum_octets() % 2**32)
ds.set('1.3.6.1.2.1.2.2.1.17.%u' % (idx), 'u32', vppstat['/if/tx'][:, i].sum_packets() % 2**32)
ds.set('1.3.6.1.2.1.2.2.1.18.%u' % (idx), 'u32', vppstat['/if/tx-multicast'][:, i].sum_packets() % 2**32)
ds.set('1.3.6.1.2.1.2.2.1.19.%u' % (idx), 'u32', vppstat['/if/drops'][:, i].sum() % 2**32)
ds.set('1.3.6.1.2.1.2.2.1.20.%u' % (idx), 'u32', vppstat['/if/tx-error'][:, i].sum() % 2**32)
class ifInMulticastPkts(pyagentx.Updater): ds.set('1.3.6.1.2.1.31.1.1.1.1.%u' % (idx), 'str', ifname)
def update(self): ds.set('1.3.6.1.2.1.31.1.1.1.2.%u' % (idx), 'u32', vppstat['/if/rx-multicast'][:, i].sum_packets() % 2**32)
global vppstat ds.set('1.3.6.1.2.1.31.1.1.1.3.%u' % (idx), 'u32', vppstat['/if/rx-broadcast'][:, i].sum_packets() % 2**32)
vppstat.connect() ds.set('1.3.6.1.2.1.31.1.1.1.4.%u' % (idx), 'u32', vppstat['/if/tx-multicast'][:, i].sum_packets() % 2**32)
ds.set('1.3.6.1.2.1.31.1.1.1.5.%u' % (idx), 'u32', vppstat['/if/tx-broadcast'][:, i].sum_packets() % 2**32)
for i in range(len(vppstat['/if/names'])): ds.set('1.3.6.1.2.1.31.1.1.1.6.%u' % (idx), 'u64', vppstat['/if/rx'][:, i].sum_octets())
self.set_COUNTER32( ds.set('1.3.6.1.2.1.31.1.1.1.7.%u' % (idx), 'u64', vppstat['/if/rx'][:, i].sum_packets())
str(i + 1), ds.set('1.3.6.1.2.1.31.1.1.1.8.%u' % (idx), 'u64', vppstat['/if/rx-multicast'][:, i].sum_packets())
vppstat['/if/rx-multicast'][:, i].sum_packets() % 2**32) ds.set('1.3.6.1.2.1.31.1.1.1.9.%u' % (idx), 'u64', vppstat['/if/rx-broadcast'][:, i].sum_packets())
ds.set('1.3.6.1.2.1.31.1.1.1.10.%u' % (idx), 'u64', vppstat['/if/tx'][:, i].sum_octets())
ds.set('1.3.6.1.2.1.31.1.1.1.11.%u' % (idx), 'u64', vppstat['/if/tx'][:, i].sum_packets())
ds.set('1.3.6.1.2.1.31.1.1.1.12.%u' % (idx), 'u64', vppstat['/if/tx-multicast'][:, i].sum_packets())
ds.set('1.3.6.1.2.1.31.1.1.1.13.%u' % (idx), 'u64', vppstat['/if/tx-broadcast'][:, i].sum_packets())
class ifInBroadcastPkts(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER32(
str(i + 1),
vppstat['/if/rx-broadcast'][:, i].sum_packets() % 2**32)
class ifOutMulticastPkts(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER32(
str(i + 1),
vppstat['/if/tx-multicast'][:, i].sum_packets() % 2**32)
class ifOutBroadcastPkts(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER32(
str(i + 1),
vppstat['/if/tx-broadcast'][:, i].sum_packets() % 2**32)
class ifHCInOctets(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER64(str(i + 1), vppstat['/if/rx'][:,
i].sum_octets())
class ifHCInUcastPkts(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER64(str(i + 1), vppstat['/if/rx'][:,
i].sum_packets())
class ifHCInMulticastPkts(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER64(str(i + 1),
vppstat['/if/rx-multicast'][:, i].sum_packets())
class ifHCInBroadcastPkts(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER64(str(i + 1),
vppstat['/if/rx-broadcast'][:, i].sum_packets())
class ifHCOutOctets(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER64(str(i + 1), vppstat['/if/tx'][:,
i].sum_octets())
class ifHCOutUcastPkts(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER64(str(i + 1), vppstat['/if/tx'][:,
i].sum_packets())
class ifHCOutMulticastPkts(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER64(str(i + 1),
vppstat['/if/tx-multicast'][:, i].sum_packets())
class ifHCOutBroadcastPkts(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER64(str(i + 1),
vppstat['/if/tx-broadcast'][:, i].sum_packets())
class ifHighSpeed(pyagentx.Updater):
def update(self):
global vppstat, vpp
vppstat.connect()
vpp.connect()
ifaces = vpp.get_ifaces()
for i in range(len(vppstat['/if/names'])):
ifname = vppstat['/if/names'][i]
speed = 0 speed = 0
if ifname.startswith("loop") or ifname.startswith("tap"): if ifname.startswith("loop") or ifname.startswith("tap"):
speed = 1000 speed = 1000
elif not ifname in ifaces: elif not ifname in ifaces:
logger.warning("Could not get link speed for interface %s", self.logger.warning("Could not get link speed for interface %s", ifname)
ifname)
else: else:
speed = int(ifaces[ifname].link_speed / 1000) speed = int(ifaces[ifname].link_speed / 1000)
self.set_GAUGE32(str(i + 1), speed) ds.set('1.3.6.1.2.1.31.1.1.1.15.%u' % (idx), 'gauge32', speed)
class ifPromiscuousMode(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
# Hardcode to false(2)
self.set_INTEGER(str(i + 1), 2)
class ifConnectorPresent(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
# Hardcode to true(1)
self.set_INTEGER(str(i + 1), 1)
class ifCounterDiscontinuityTime(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
# Hardcode to Timeticks: (0) 0:00:00.00
self.set_TIMETICKS(str(i + 1), 0)
class ifInOctets(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER32(str(i + 1),
vppstat['/if/rx'][:, i].sum_octets() % 2**32)
class ifInUcastPkts(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER32(str(i + 1),
vppstat['/if/rx'][:, i].sum_packets() % 2**32)
class ifInNUcastPkts(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER32(
str(i + 1),
vppstat['/if/rx-multicast'][:, i].sum_packets() % 2**32)
class ifInDiscards(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER32(str(i + 1),
vppstat['/if/rx-no-buf'][:, i].sum() % 2**32)
class ifInErrors(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER32(str(i + 1),
vppstat['/if/rx-error'][:, i].sum() % 2**32)
class ifOutOctets(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER32(str(i + 1),
vppstat['/if/tx'][:, i].sum_octets() % 2**32)
class ifOutUcastPkts(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER32(str(i + 1),
vppstat['/if/tx'][:, i].sum_packets() % 2**32)
class ifOutNUcastPkts(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER32(
str(i + 1),
vppstat['/if/tx-multicast'][:, i].sum_packets() % 2**32)
class ifOutDiscards(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER32(str(i + 1),
vppstat['/if/drops'][:, i].sum() % 2**32)
class ifOutErrors(pyagentx.Updater):
def update(self):
global vppstat
vppstat.connect()
for i in range(len(vppstat['/if/names'])):
self.set_COUNTER32(str(i + 1),
vppstat['/if/tx-error'][:, i].sum() % 2**32)
class MyAgent(pyagentx.Agent):
def setup(self):
# iso.org.dod.internet.mgmt.mib_2.interfaces.ifTable.ifEntry
self.register('1.3.6.1.2.1.2.2.1.1', ifIndex)
self.register('1.3.6.1.2.1.2.2.1.2', ifName)
self.register('1.3.6.1.2.1.2.2.1.3', ifType)
self.register('1.3.6.1.2.1.2.2.1.4', ifMtu)
self.register('1.3.6.1.2.1.2.2.1.5', ifSpeed)
self.register('1.3.6.1.2.1.2.2.1.6', ifPhysAddress)
self.register('1.3.6.1.2.1.2.2.1.7', ifAdminStatus)
self.register('1.3.6.1.2.1.2.2.1.8', ifOperStatus)
self.register('1.3.6.1.2.1.2.2.1.9', ifCounterDiscontinuityTime)
self.register('1.3.6.1.2.1.2.2.1.10', ifInOctets)
self.register('1.3.6.1.2.1.2.2.1.11', ifInUcastPkts)
self.register('1.3.6.1.2.1.2.2.1.12', ifInNUcastPkts)
self.register('1.3.6.1.2.1.2.2.1.13', ifInDiscards)
self.register('1.3.6.1.2.1.2.2.1.14', ifInErrors)
self.register('1.3.6.1.2.1.2.2.1.16', ifOutOctets)
self.register('1.3.6.1.2.1.2.2.1.17', ifOutUcastPkts)
self.register('1.3.6.1.2.1.2.2.1.18', ifOutNUcastPkts)
self.register('1.3.6.1.2.1.2.2.1.19', ifOutDiscards)
self.register('1.3.6.1.2.1.2.2.1.20', ifOutErrors)
# iso.org.dod.internet.mgmt.mib_2.ifMIB.ifMIBObjects.ifXTable.ifXEntry
self.register('1.3.6.1.2.1.31.1.1.1.1', ifName)
self.register('1.3.6.1.2.1.31.1.1.1.2', ifInMulticastPkts)
self.register('1.3.6.1.2.1.31.1.1.1.3', ifInBroadcastPkts)
self.register('1.3.6.1.2.1.31.1.1.1.4', ifOutMulticastPkts)
self.register('1.3.6.1.2.1.31.1.1.1.5', ifOutBroadcastPkts)
self.register('1.3.6.1.2.1.31.1.1.1.6', ifHCInOctets)
self.register('1.3.6.1.2.1.31.1.1.1.7', ifHCInUcastPkts)
self.register('1.3.6.1.2.1.31.1.1.1.8', ifHCInMulticastPkts)
self.register('1.3.6.1.2.1.31.1.1.1.9', ifHCInBroadcastPkts)
self.register('1.3.6.1.2.1.31.1.1.1.10', ifHCOutOctets)
self.register('1.3.6.1.2.1.31.1.1.1.11', ifHCOutUcastPkts)
self.register('1.3.6.1.2.1.31.1.1.1.12', ifHCOutMulticastPkts)
self.register('1.3.6.1.2.1.31.1.1.1.13', ifHCOutBroadcastPkts)
self.register('1.3.6.1.2.1.31.1.1.1.15', ifHighSpeed)
self.register('1.3.6.1.2.1.31.1.1.1.16', ifPromiscuousMode)
self.register('1.3.6.1.2.1.31.1.1.1.17', ifConnectorPresent)
self.register('1.3.6.1.2.1.31.1.1.1.18', ifAlias)
self.register('1.3.6.1.2.1.31.1.1.1.19', ifCounterDiscontinuityTime)
ds.set('1.3.6.1.2.1.31.1.1.1.16.%u' % (idx), 'int', 2) # Hardcode to false(2)
ds.set('1.3.6.1.2.1.31.1.1.1.17.%u' % (idx), 'int', 1) # Hardcode to true(1)
ds.set('1.3.6.1.2.1.31.1.1.1.18.%u' % (idx), 'str', ifname)
ds.set('1.3.6.1.2.1.31.1.1.1.19.%u' % (idx), 'ticks', 0) # Hardcode to Timeticks: (0) 0:00:00.00
return ds
def main(): def main():
global vppstat, vpp, logger agentx.setup_logging(debug=False)
pyagentx.setup_logging(debug=False)
vppstat = VPPStats(socketname='/run/vpp/stats.sock', timeout=2)
vppstat.connect()
vpp = VPPApi()
if not vpp.connect():
logger.error("Can't connect to VPP API, bailing")
return
try: try:
a = MyAgent(server_address='/run/vpp/agentx.sock') a = MyAgent(server_address='/run/vpp/agentx.sock')
a.start() a.run()
except Exception as e: except Exception as e:
print("Unhandled exception:", e) print("Unhandled exception:", e)
a.stop() a.stop()
except KeyboardInterrupt: except KeyboardInterrupt:
a.stop() a.stop()
vppstat.disconnect()
vpp.disconnect()
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -7,7 +7,6 @@ from vpp_papi import VPPApiClient
import os import os
import fnmatch import fnmatch
import logging import logging
import threading
class NullHandler(logging.Handler): class NullHandler(logging.Handler):
@ -16,18 +15,16 @@ class NullHandler(logging.Handler):
class VPPApi(): class VPPApi():
def __init__(self, address='/run/vpp/api.sock'): def __init__(self, address='/run/vpp/api.sock', clientname='vppapi-client'):
self.address = address self.address = address
self.lock = threading.Lock()
self.connected = False self.connected = False
self.logger = logging.getLogger('pyagentx.vppapi') self.logger = logging.getLogger('agentx.vppapi')
self.logger.addHandler(NullHandler()) self.logger.addHandler(NullHandler())
self.clientname = clientname
self.vpp = None self.vpp = None
def connect(self): def connect(self):
self.lock.acquire()
if self.connected: if self.connected:
self.lock.release()
return True return True
vpp_json_dir = '/usr/share/vpp/api/' vpp_json_dir = '/usr/share/vpp/api/'
@ -40,23 +37,20 @@ class VPPApi():
if not jsonfiles: if not jsonfiles:
self.logger.error('no json api files found') self.logger.error('no json api files found')
self.lock.release()
return False return False
self.vpp = VPPApiClient(apifiles=jsonfiles, self.vpp = VPPApiClient(apifiles=jsonfiles,
server_address=self.address) server_address=self.address)
try: try:
self.logger.info('Connecting to VPP') self.logger.info('Connecting to VPP')
self.vpp.connect('vpp-snmp-agent') self.vpp.connect(self.clientname)
except: except:
self.lock.release()
return False return False
v = self.vpp.api.show_version() v = self.vpp.api.show_version()
self.logger.info('VPP version is %s' % v.version) self.logger.info('VPP version is %s' % v.version)
self.connected = True self.connected = True
self.lock.release()
return True return True
def disconnect(self): def disconnect(self):
@ -71,23 +65,19 @@ class VPPApi():
if not self.connected: if not self.connected:
return ret return ret
self.lock.acquire()
try: try:
iface_list = self.vpp.api.sw_interface_dump() iface_list = self.vpp.api.sw_interface_dump()
except Exception as e: except Exception as e:
self.logger.error("VPP communication error, disconnecting", e) self.logger.error("VPP communication error, disconnecting", e)
self.vpp.disconnect() self.vpp.disconnect()
self.connected = False self.connected = False
self.lock.release()
return ret return ret
if not iface_list: if not iface_list:
self.logger.error("Can't get interface list") self.logger.error("Can't get interface list")
self.lock.release()
return ret return ret
for iface in iface_list: for iface in iface_list:
ret[iface.interface_name] = iface ret[iface.interface_name] = iface
self.lock.release()
return ret return ret

View File

@ -110,9 +110,12 @@ class VPPStats():
def connect(self): def connect(self):
'''Connect to stats segment''' '''Connect to stats segment'''
if self.connected: if self.connected:
return return True
sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) try:
sock.connect(self.socketname) sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
sock.connect(self.socketname)
except:
return False
# Get file descriptor for memory map # Get file descriptor for memory map
fds = array.array("i") # Array of ints fds = array.array("i") # Array of ints
@ -133,9 +136,11 @@ class VPPStats():
if self.version != 2: if self.version != 2:
raise Exception('Incompatbile stat segment version {}'.format( raise Exception('Incompatbile stat segment version {}'.format(
self.version)) self.version))
return False
self.refresh() self.refresh()
self.connected = True self.connected = True
return True
def disconnect(self): def disconnect(self):
'''Disconnect from stats segment''' '''Disconnect from stats segment'''