Files
vpp-snmp-agent/vppstats.py
2023-01-08 13:05:42 +01:00

448 lines
14 KiB
Python

"""
VPPStats is a shared memory implementation that exposes the VPP statseg as
an associative array. Counters can be accessed in either dimension.
stat['/if/rx'] - returns 2D lists
stat['/if/rx'][0] - returns counters for all interfaces for thread 0
stat['/if/rx'][0][1] - returns counter for interface 1 on thread 0
stat['/if/rx'][0][1]['packets'] - returns the packet counter
for interface 1 on thread 0
stat['/if/rx'][:, 1] - returns the counters for interface 1 on all threads
stat['/if/rx'][:, 1].packets() - returns the packet counters for
interface 1 on all threads
stat['/if/rx'][:, 1].sum_packets() - returns the sum of packet counters for
interface 1 on all threads
stat['/if/rx-miss'][:, 1].sum() - returns the sum of packet counters for
interface 1 on all threads for simple counters
Usage:
stat = VPPStats()
stat.connect()
for x in range(10):
print('version ', stat.version)
print('epoch ', stat.epoch)
print('/if/names', stat['/if/names'])
print('/if/rx[1].packets', stat['/if/rx'][:, 1].sum_packets())
print('/if/rx[1].octets', stat['/if/rx'][:, 1].sum_octets())
print('/if/tx[1].packets', stat['/if/tx'][:, 1].sum_packets())
print('/if/tx[1].octets', stat['/if/tx'][:, 1].sum_octets())
print("")
time.sleep(10)
stat.disconnect()
"""
import os
import socket
import array
import mmap
from struct import Struct
import time
import re
def recv_fd(sock):
"""Get file descriptor for memory map"""
fds = array.array("i") # Array of ints
_, ancdata, _, _ = sock.recvmsg(0, socket.CMSG_LEN(4))
for cmsg_level, cmsg_type, cmsg_data in ancdata:
if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS:
fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
return list(fds)[0]
VEC_LEN_FMT = Struct("I")
def get_vec_len(stats, vector_offset):
"""Equivalent to VPP vec_len()"""
return VEC_LEN_FMT.unpack_from(stats.statseg, vector_offset - 8)[0]
def get_string(stats, ptr):
"""Get a string from a VPP vector"""
namevector = ptr - stats.base
namevectorlen = get_vec_len(stats, namevector)
if namevector + namevectorlen >= stats.size:
raise IOError("String overruns stats segment")
return stats.statseg[namevector : namevector + namevectorlen - 1].decode("ascii")
class StatsVector:
"""A class representing a VPP vector"""
def __init__(self, stats, ptr, fmt):
self.vec_start = ptr - stats.base
self.vec_len = get_vec_len(stats, ptr - stats.base)
self.struct = Struct(fmt)
self.fmtlen = len(fmt)
self.elementsize = self.struct.size
self.statseg = stats.statseg
self.stats = stats
if self.vec_start + self.vec_len * self.elementsize >= stats.size:
raise IOError("Vector overruns stats segment")
def __iter__(self):
with self.stats.lock:
return self.struct.iter_unpack(
self.statseg[
self.vec_start : self.vec_start + self.elementsize * self.vec_len
]
)
def __getitem__(self, index):
if index > self.vec_len:
raise IOError("Index beyond end of vector")
with self.stats.lock:
if self.fmtlen == 1:
return self.struct.unpack_from(
self.statseg, self.vec_start + (index * self.elementsize)
)[0]
return self.struct.unpack_from(
self.statseg, self.vec_start + (index * self.elementsize)
)
class VPPStats:
"""Main class implementing Python access to the VPP statistics segment"""
# pylint: disable=too-many-instance-attributes
shared_headerfmt = Struct("QPQQPP")
default_socketname = "/run/vpp/stats.sock"
def __init__(self, socketname=default_socketname, timeout=10):
self.socketname = socketname
self.timeout = timeout
self.directory = {}
self.lock = StatsLock(self)
self.connected = False
self.size = 0
self.last_epoch = 0
self.statseg = 0
def connect(self):
"""Connect to stats segment"""
if self.connected:
return
sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
sock.connect(self.socketname)
mfd = recv_fd(sock)
sock.close()
stat_result = os.fstat(mfd)
self.statseg = mmap.mmap(
mfd, stat_result.st_size, mmap.PROT_READ, mmap.MAP_SHARED
)
os.close(mfd)
self.size = stat_result.st_size
if self.version != 2:
raise Exception("Incompatbile stat segment version {}".format(self.version))
self.refresh()
self.connected = True
def disconnect(self):
"""Disconnect from stats segment"""
if self.connected:
self.statseg.close()
self.connected = False
@property
def version(self):
"""Get version of stats segment"""
return self.shared_headerfmt.unpack_from(self.statseg)[0]
@property
def base(self):
"""Get base pointer of stats segment"""
return self.shared_headerfmt.unpack_from(self.statseg)[1]
@property
def epoch(self):
"""Get current epoch value from stats segment"""
return self.shared_headerfmt.unpack_from(self.statseg)[2]
@property
def in_progress(self):
"""Get value of in_progress from stats segment"""
return self.shared_headerfmt.unpack_from(self.statseg)[3]
@property
def directory_vector(self):
"""Get pointer of directory vector"""
return self.shared_headerfmt.unpack_from(self.statseg)[4]
elementfmt = "IQ128s"
def refresh(self, blocking=True):
"""Refresh directory vector cache (epoch changed)"""
directory = {}
directory_by_idx = {}
while True:
try:
with self.lock:
self.last_epoch = self.epoch
for i, direntry in enumerate(
StatsVector(self, self.directory_vector, self.elementfmt)
):
path_raw = direntry[2].find(b"\x00")
path = direntry[2][:path_raw].decode("ascii")
directory[path] = StatsEntry(direntry[0], direntry[1])
directory_by_idx[i] = path
self.directory = directory
self.directory_by_idx = directory_by_idx
return
except IOError:
if not blocking:
raise
def __getitem__(self, item, blocking=True):
if not self.connected:
self.connect()
while True:
try:
if self.last_epoch != self.epoch:
self.refresh(blocking)
with self.lock:
return self.directory[item].get_counter(self)
except IOError:
if not blocking:
raise
def __iter__(self):
return iter(self.directory.items())
def set_errors(self, blocking=True):
"""Return dictionary of error counters > 0"""
if not self.connected:
self.connect()
errors = {k: v for k, v in self.directory.items() if k.startswith("/err/")}
result = {}
for k in errors:
try:
total = self[k].sum()
if total:
result[k] = total
except KeyError:
pass
return result
def set_errors_str(self, blocking=True):
"""Return all errors counters > 0 pretty printed"""
error_string = ["ERRORS:"]
error_counters = self.set_errors(blocking)
for k in sorted(error_counters):
error_string.append("{:<60}{:>10}".format(k, error_counters[k]))
return "%s\n" % "\n".join(error_string)
def get_counter(self, name, blocking=True):
"""Alternative call to __getitem__"""
return self.__getitem__(name, blocking)
def get_err_counter(self, name, blocking=True):
"""Alternative call to __getitem__"""
return self.__getitem__(name, blocking).sum()
def ls(self, patterns):
"""Returns list of counters matching pattern"""
# pylint: disable=invalid-name
if not self.connected:
self.connect()
if not isinstance(patterns, list):
patterns = [patterns]
regex = [re.compile(i) for i in patterns]
if self.last_epoch != self.epoch:
self.refresh()
return [
k
for k, v in self.directory.items()
if any(re.match(pattern, k) for pattern in regex)
]
def dump(self, counters, blocking=True):
"""Given a list of counters return a dictionary of results"""
if not self.connected:
self.connect()
result = {}
for cnt in counters:
result[cnt] = self.__getitem__(cnt, blocking)
return result
class StatsLock:
"""Stat segment optimistic locking"""
def __init__(self, stats):
self.stats = stats
self.epoch = 0
def __enter__(self):
acquired = self.acquire(blocking=True)
assert acquired, "Lock wasn't acquired, but blocking=True"
return self
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
self.release()
def acquire(self, blocking=True, timeout=-1):
"""Acquire the lock. Await in progress to go false. Record epoch."""
self.epoch = self.stats.epoch
if timeout > 0:
start = time.monotonic()
while self.stats.in_progress:
if not blocking:
time.sleep(0.01)
if timeout > 0:
if start + time.monotonic() > timeout:
return False
return True
def release(self):
"""Check if data read while locked is valid"""
if self.stats.in_progress or self.stats.epoch != self.epoch:
raise IOError("Optimistic lock failed, retry")
def locked(self):
"""Not used"""
class StatsCombinedList(list):
"""Column slicing for Combined counters list"""
def __getitem__(self, item):
"""Supports partial numpy style 2d support. Slice by column [:,1]"""
if isinstance(item, int):
return list.__getitem__(self, item)
return CombinedList([row[item[1]] for row in self])
class CombinedList(list):
"""Combined Counters 2-dimensional by thread by index of packets/octets"""
def packets(self):
"""Return column (2nd dimension). Packets for all threads"""
return [pair[0] for pair in self]
def octets(self):
"""Return column (2nd dimension). Octets for all threads"""
return [pair[1] for pair in self]
def sum_packets(self):
"""Return column (2nd dimension). Sum of all packets for all threads"""
return sum(self.packets())
def sum_octets(self):
"""Return column (2nd dimension). Sum of all octets for all threads"""
return sum(self.octets())
class StatsTuple(tuple):
"""A Combined vector tuple (packets, octets)"""
def __init__(self, data):
self.dictionary = {"packets": data[0], "bytes": data[1]}
super().__init__()
def __repr__(self):
return dict.__repr__(self.dictionary)
def __getitem__(self, item):
if isinstance(item, int):
return tuple.__getitem__(self, item)
if item == "packets":
return tuple.__getitem__(self, 0)
return tuple.__getitem__(self, 1)
class StatsSimpleList(list):
"""Simple Counters 2-dimensional by thread by index of packets"""
def __getitem__(self, item):
"""Supports partial numpy style 2d support. Slice by column [:,1]"""
if isinstance(item, int):
return list.__getitem__(self, item)
return SimpleList([row[item[1]] for row in self])
class SimpleList(list):
"""Simple counter"""
def sum(self):
"""Sum the vector"""
return sum(self)
class StatsEntry:
"""An individual stats entry"""
# pylint: disable=unused-argument,no-self-use
def __init__(self, stattype, statvalue):
self.type = stattype
self.value = statvalue
if stattype == 1:
self.function = self.scalar
elif stattype == 2:
self.function = self.simple
elif stattype == 3:
self.function = self.combined
elif stattype == 4:
self.function = self.name
elif stattype == 6:
self.function = self.symlink
else:
self.function = self.illegal
def illegal(self, stats):
"""Invalid or unknown counter type"""
return None
def scalar(self, stats):
"""Scalar counter"""
return self.value
def simple(self, stats):
"""Simple counter"""
counter = StatsSimpleList()
for threads in StatsVector(stats, self.value, "P"):
clist = [v[0] for v in StatsVector(stats, threads[0], "Q")]
counter.append(clist)
return counter
def combined(self, stats):
"""Combined counter"""
counter = StatsCombinedList()
for threads in StatsVector(stats, self.value, "P"):
clist = [StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], "QQ")]
counter.append(clist)
return counter
def name(self, stats):
"""Name counter"""
counter = []
for name in StatsVector(stats, self.value, "P"):
if name[0]:
counter.append(get_string(stats, name[0]))
return counter
SYMLINK_FMT1 = Struct("II")
SYMLINK_FMT2 = Struct("Q")
def symlink(self, stats):
"""Symlink counter"""
b = self.SYMLINK_FMT2.pack(self.value)
index1, index2 = self.SYMLINK_FMT1.unpack(b)
name = stats.directory_by_idx[index1]
return stats[name][:, index2]
def get_counter(self, stats):
"""Return a list of counters"""
if stats:
return self.function(stats)