From c9233749bcd8674038c94308aa726da5acf6a223 Mon Sep 17 00:00:00 2001 From: Pim van Pelt Date: Fri, 1 Apr 2022 13:10:17 +0000 Subject: [PATCH] Pulled in latest vpp_stats.py from upstream after https://gerrit.fd.io/r/c/vpp/+/35640 --- vpp-snmp-agent.py | 4 +- vppstats.py | 176 ++++++++++++++-------------------------------- 2 files changed, 52 insertions(+), 128 deletions(-) diff --git a/vpp-snmp-agent.py b/vpp-snmp-agent.py index dd82399..2b7640e 100755 --- a/vpp-snmp-agent.py +++ b/vpp-snmp-agent.py @@ -49,9 +49,7 @@ class MyAgent(agentx.Agent): self.logger.info("Connecting to VPP Stats Segment") vppstat = VPPStats(socketname='/run/vpp/stats.sock', timeout=2) - if not vppstat.connect(): - self.logger.error("Can't connect to VPP Stats API, bailing") - return False + vppstat.connect() vpp = VPPApi(clientname='vpp-snmp-agent') if not vpp.connect(): diff --git a/vppstats.py b/vppstats.py index b1afbd0..8ed142b 100644 --- a/vppstats.py +++ b/vppstats.py @@ -40,26 +40,32 @@ from struct import Struct import time import re +def recv_fd(sock): + '''Get file descriptor for memory map''' + fds = array.array("i") # Array of ints + _, ancdata, _, _ = sock.recvmsg(0, socket.CMSG_LEN(4)) + for cmsg_level, cmsg_type, cmsg_data in ancdata: + if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS: + fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) + return list(fds)[0] + VEC_LEN_FMT = Struct('I') - - def get_vec_len(stats, vector_offset): '''Equivalent to VPP vec_len()''' return VEC_LEN_FMT.unpack_from(stats.statseg, vector_offset - 8)[0] - def get_string(stats, ptr): '''Get a string from a VPP vector''' namevector = ptr - stats.base namevectorlen = get_vec_len(stats, namevector) if namevector + namevectorlen >= stats.size: raise IOError('String overruns stats segment') - return stats.statseg[namevector:namevector + namevectorlen - - 1].decode('ascii') + return stats.statseg[namevector:namevector+namevectorlen-1].decode('ascii') class StatsVector: '''A class representing a VPP vector''' + def __init__(self, stats, ptr, fmt): self.vec_start = ptr - stats.base self.vec_len = get_vec_len(stats, ptr - stats.base) @@ -74,21 +80,18 @@ class StatsVector: def __iter__(self): with self.stats.lock: - return self.struct.iter_unpack( - self.statseg[self.vec_start:self.vec_start + - self.elementsize * self.vec_len]) + return self.struct.iter_unpack(self.statseg[self.vec_start:self.vec_start + + self.elementsize*self.vec_len]) def __getitem__(self, index): if index > self.vec_len: raise IOError('Index beyond end of vector') with self.stats.lock: if self.fmtlen == 1: - return self.struct.unpack_from( - self.statseg, - self.vec_start + (index * self.elementsize))[0] - return self.struct.unpack_from( - self.statseg, self.vec_start + (index * self.elementsize)) - + return self.struct.unpack_from(self.statseg, self.vec_start + + (index * self.elementsize))[0] + return self.struct.unpack_from(self.statseg, self.vec_start + + (index * self.elementsize)) class VPPStats(): '''Main class implementing Python access to the VPP statistics segment''' @@ -104,43 +107,29 @@ class VPPStats(): self.connected = False self.size = 0 self.last_epoch = 0 - self.error_vectors = 0 self.statseg = 0 def connect(self): '''Connect to stats segment''' if self.connected: - return True - try: - sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) - sock.connect(self.socketname) - except: - return False + return + sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) + sock.connect(self.socketname) - # Get file descriptor for memory map - fds = array.array("i") # Array of ints - _, ancdata, _, _ = sock.recvmsg(0, socket.CMSG_LEN(4)) - for cmsg_level, cmsg_type, cmsg_data in ancdata: - if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS: - fds.frombytes(cmsg_data[:len(cmsg_data) - - (len(cmsg_data) % fds.itemsize)]) - mfd = list(fds)[0] + mfd = recv_fd(sock) sock.close() stat_result = os.fstat(mfd) - self.statseg = mmap.mmap(mfd, stat_result.st_size, mmap.PROT_READ, - mmap.MAP_SHARED) + self.statseg = mmap.mmap(mfd, stat_result.st_size, mmap.PROT_READ, mmap.MAP_SHARED) os.close(mfd) self.size = stat_result.st_size if self.version != 2: - raise Exception('Incompatbile stat segment version {}'.format( - self.version)) - return False + raise Exception('Incompatbile stat segment version {}' + .format(self.version)) self.refresh() self.connected = True - return True def disconnect(self): '''Disconnect from stats segment''' @@ -173,11 +162,6 @@ class VPPStats(): '''Get pointer of directory vector''' return self.shared_headerfmt.unpack_from(self.statseg)[4] - @property - def error_vector(self): - '''Get pointer of error vector''' - return self.shared_headerfmt.unpack_from(self.statseg)[5] - elementfmt = 'IQ128s' def refresh(self, blocking=True): @@ -188,21 +172,13 @@ class VPPStats(): try: with self.lock: self.last_epoch = self.epoch - for i, direntry in enumerate( - StatsVector(self, self.directory_vector, - self.elementfmt)): + for i, direntry in enumerate(StatsVector(self, self.directory_vector, self.elementfmt)): path_raw = direntry[2].find(b'\x00') path = direntry[2][:path_raw].decode('ascii') directory[path] = StatsEntry(direntry[0], direntry[1]) directory_by_idx[i] = path self.directory = directory self.directory_by_idx = directory_by_idx - - # Cache the error index vectors - self.error_vectors = [] - for threads in StatsVector(self, self.error_vector, 'P'): - self.error_vectors.append( - StatsVector(self, threads[0], 'Q')) return except IOError: if not blocking: @@ -224,32 +200,23 @@ class VPPStats(): def __iter__(self): return iter(self.directory.items()) + def set_errors(self, blocking=True): '''Return dictionary of error counters > 0''' if not self.connected: self.connect() - errors = { - k: v - for k, v in self.directory.items() if k.startswith("/err/") - } + errors = {k: v for k, v in self.directory.items() + if k.startswith("/err/")} result = {} - while True: + for k in errors: try: - if self.last_epoch != self.epoch: - self.refresh(blocking) - with self.lock: - for k, entry in errors.items(): - total = 0 - i = entry.value - for per_thread in self.error_vectors: - total += per_thread[i] - if total: - result[k] = total - return result - except IOError: - if not blocking: - raise + total = self[k].sum() + if total: + result[k] = total + except KeyError: + pass + return result def set_errors_str(self, blocking=True): '''Return all errors counters > 0 pretty printed''' @@ -264,19 +231,8 @@ class VPPStats(): return self.__getitem__(name, blocking) def get_err_counter(self, name, blocking=True): - '''Return a single value (sum of all threads)''' - if not self.connected: - self.connect() - if name.startswith("/err/"): - while True: - try: - if self.last_epoch != self.epoch: - self.refresh(blocking) - with self.lock: - return sum(self.directory[name].get_counter(self)) - except IOError: - if not blocking: - raise + '''Alternative call to __getitem__''' + return self.__getitem__(name, blocking).sum() def ls(self, patterns): '''Returns list of counters matching pattern''' @@ -286,10 +242,11 @@ class VPPStats(): if not isinstance(patterns, list): patterns = [patterns] regex = [re.compile(i) for i in patterns] - return [ - k for k, v in self.directory.items() if any( - re.match(pattern, k) for pattern in regex) - ] + if self.last_epoch != self.epoch: + self.refresh() + + return [k for k, v in self.directory.items() + if any(re.match(pattern, k) for pattern in regex)] def dump(self, counters, blocking=True): '''Given a list of counters return a dictionary of results''' @@ -297,12 +254,12 @@ class VPPStats(): self.connect() result = {} for cnt in counters: - result[cnt] = self.__getitem__(cnt, blocking) + result[cnt] = self.__getitem__(cnt,blocking) return result - class StatsLock(): '''Stat segment optimistic locking''' + def __init__(self, stats): self.stats = stats self.epoch = 0 @@ -339,15 +296,16 @@ class StatsLock(): class StatsCombinedList(list): '''Column slicing for Combined counters list''' + def __getitem__(self, item): '''Supports partial numpy style 2d support. Slice by column [:,1]''' if isinstance(item, int): return list.__getitem__(self, item) return CombinedList([row[item[1]] for row in self]) - class CombinedList(list): '''Combined Counters 2-dimensional by thread by index of packets/octets''' + def packets(self): '''Return column (2nd dimension). Packets for all threads''' return [pair[0] for pair in self] @@ -364,7 +322,6 @@ class CombinedList(list): '''Return column (2nd dimension). Sum of all octets for all threads''' return sum(self.octets()) - class StatsTuple(tuple): '''A Combined vector tuple (packets, octets)''' def __init__(self, data): @@ -381,26 +338,24 @@ class StatsTuple(tuple): return tuple.__getitem__(self, 0) return tuple.__getitem__(self, 1) - class StatsSimpleList(list): '''Simple Counters 2-dimensional by thread by index of packets''' + def __getitem__(self, item): '''Supports partial numpy style 2d support. Slice by column [:,1]''' if isinstance(item, int): return list.__getitem__(self, item) return SimpleList([row[item[1]] for row in self]) - class SimpleList(list): '''Simple counter''' + def sum(self): '''Sum the vector''' return sum(self) - class StatsEntry(): '''An individual stats entry''' - # pylint: disable=unused-argument,no-self-use def __init__(self, stattype, statvalue): @@ -414,10 +369,8 @@ class StatsEntry(): elif stattype == 3: self.function = self.combined elif stattype == 4: - self.function = self.error - elif stattype == 5: self.function = self.name - elif stattype == 7: + elif stattype == 6: self.function = self.symlink else: self.function = self.illegal @@ -442,19 +395,10 @@ class StatsEntry(): '''Combined counter''' counter = StatsCombinedList() for threads in StatsVector(stats, self.value, 'P'): - clist = [ - StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], 'QQ') - ] + clist = [StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], 'QQ')] counter.append(clist) return counter - def error(self, stats): - '''Error counter''' - counter = SimpleList() - for clist in stats.error_vectors: - counter.append(clist[self.value]) - return counter - def name(self, stats): '''Name counter''' counter = [] @@ -465,32 +409,14 @@ class StatsEntry(): SYMLINK_FMT1 = Struct('II') SYMLINK_FMT2 = Struct('Q') - def symlink(self, stats): '''Symlink counter''' b = self.SYMLINK_FMT2.pack(self.value) index1, index2 = self.SYMLINK_FMT1.unpack(b) name = stats.directory_by_idx[index1] - return stats[name][:, index2] + return stats[name][:,index2] def get_counter(self, stats): '''Return a list of counters''' if stats: return self.function(stats) - - -# -# stat = VPPStats(socketname='/run/vpp/stats.sock', timeout=2) -# stat.connect() -# print('version ', stat.version) -# print('epoch ', stat.epoch) -# print('/if/names', stat['/if/names']) -# -# for x in range(10): -# idx=2 -# print('/if/rx[%s] packets %u octets %u' % (stat['/if/names'][idx], stat['/if/rx'][:, idx].sum_packets(), stat['/if/rx'][:, idx].sum_octets())) -# print('/if/tx[%s] packets %u octets %u' % (stat['/if/names'][idx], stat['/if/tx'][:, idx].sum_packets(), stat['/if/tx'][:, idx].sum_octets())) -# print("") -# time.sleep(10) -# -# stat.disconnect()