Rename 'config' to 'cache' in the VPPApi.

This makes it clearer what its purpose is, in preparation of actual VPP
dataplane configuration changes. For example remove_lcp() refers
currently to the removal of the LCP from the cache, not the VPP
dataplane itself, so rename it and its siblings cache_remove_*()
instead.

In a future commit, the call remove_*() will refer to the removal of an
object _in VPP_.
This commit is contained in:
Pim van Pelt
2022-04-03 11:21:29 +00:00
parent 1a0daa48d1
commit 50581f7171
2 changed files with 152 additions and 152 deletions

View File

@ -18,8 +18,8 @@ class VPPApi():
self.connected = False
self.clientname = clientname
self.vpp = None
self.config = self.clearconfig()
self.config_read = False
self.cache = self.cache_clear()
self.cache_read = False
self.lcp_enabled = False
def connect(self):
@ -62,15 +62,15 @@ class VPPApi():
self.connected = False
return True
def clearconfig(self):
self.config_read = False
def cache_clear(self):
self.cache_read = False
return {"lcps": {}, "interface_names": {}, "interfaces": {}, "interface_addresses": {},
"bondethernets": {}, "bondethernet_members": {},
"bridgedomains": {}, "vxlan_tunnels": {}, "l2xcs": {}}
def remove_lcp(self, lcpname):
def cache_remove_lcp(self, lcpname):
""" Removes the LCP and TAP interface, identified by lcpname, from the config. """
for idx, lcp in self.config['lcps'].items():
for idx, lcp in self.cache['lcps'].items():
if lcp.host_if_name == lcpname:
found = True
break
@ -78,63 +78,63 @@ class VPPApi():
self.logger.warning("Trying to remove an LCP which is not in the config: %s" % lcpname)
return False
ifname = self.config['interfaces'][lcp.host_sw_if_index].interface_name
del self.config['interface_names'][ifname]
del self.config['interface_addresses'][lcp.host_sw_if_index]
del self.config['interfaces'][lcp.host_sw_if_index]
del self.config['lcps'][lcp.phy_sw_if_index]
ifname = self.cache['interfaces'][lcp.host_sw_if_index].interface_name
del self.cache['interface_names'][ifname]
del self.cache['interface_addresses'][lcp.host_sw_if_index]
del self.cache['interfaces'][lcp.host_sw_if_index]
del self.cache['lcps'][lcp.phy_sw_if_index]
return True
def remove_bondethernet_member(self, ifname):
def cache_remove_bondethernet_member(self, ifname):
""" Removes the bonderthernet member interface, identified by name, from the config. """
if not ifname in self.config['interface_names']:
if not ifname in self.cache['interface_names']:
self.logger.warning("Trying to remove a bondethernet member interface which is not in the config: %s" % ifname)
return False
iface = self.config['interface_names'][ifname]
for bond_idx, members in self.config['bondethernet_members'].items():
iface = self.cache['interface_names'][ifname]
for bond_idx, members in self.cache['bondethernet_members'].items():
if iface.sw_if_index in members:
self.config['bondethernet_members'][bond_idx].remove(iface.sw_if_index)
self.cache['bondethernet_members'][bond_idx].remove(iface.sw_if_index)
return True
def remove_l2xc(self, ifname):
if not ifname in self.config['interface_names']:
def cache_remove_l2xc(self, ifname):
if not ifname in self.cache['interface_names']:
self.logger.warning("Trying to remove an L2XC which is not in the config: %s" % ifname)
return False
iface = self.config['interface_names'][ifname]
self.config['l2xcs'].pop(iface.sw_if_index, None)
iface = self.cache['interface_names'][ifname]
self.cache['l2xcs'].pop(iface.sw_if_index, None)
return True
def remove_vxlan_tunnel(self, ifname):
if not ifname in self.config['interface_names']:
def cache_remove_vxlan_tunnel(self, ifname):
if not ifname in self.cache['interface_names']:
self.logger.warning("Trying to remove a VXLAN Tunnel which is not in the config: %s" % ifname)
return False
iface = self.config['interface_names'][ifname]
self.config['vxlan_tunnels'].pop(iface.sw_if_index, None)
iface = self.cache['interface_names'][ifname]
self.cache['vxlan_tunnels'].pop(iface.sw_if_index, None)
return True
def remove_interface(self, ifname):
def cache_remove_interface(self, ifname):
""" Removes the interface, identified by name, from the config. """
if not ifname in self.config['interface_names']:
if not ifname in self.cache['interface_names']:
self.logger.warning("Trying to remove an interface which is not in the config: %s" % ifname)
return False
iface = self.config['interface_names'][ifname]
del self.config['interfaces'][iface.sw_if_index]
if len(self.config['interface_addresses'][iface.sw_if_index]) > 0:
iface = self.cache['interface_names'][ifname]
del self.cache['interfaces'][iface.sw_if_index]
if len(self.cache['interface_addresses'][iface.sw_if_index]) > 0:
self.logger.warning("Not all addresses were removed on %s" % ifname)
del self.config['interface_addresses'][iface.sw_if_index]
del self.config['interface_names'][ifname]
del self.cache['interface_addresses'][iface.sw_if_index]
del self.cache['interface_names'][ifname]
## Use my_dict.pop('key', None), as it allows 'key' to be absent
if iface.sw_if_index in self.config['bondethernet_members']:
if len(self.config['bondethernet_members'][iface.sw_if_index]) != 0:
self.logger.warning("When removing BondEthernet %s, its members are not empty: %s" % (ifname, self.config['bondethernet_members'][iface.sw_if_index]))
if iface.sw_if_index in self.cache['bondethernet_members']:
if len(self.cache['bondethernet_members'][iface.sw_if_index]) != 0:
self.logger.warning("When removing BondEthernet %s, its members are not empty: %s" % (ifname, self.cache['bondethernet_members'][iface.sw_if_index]))
else:
del self.config['bondethernet_members'][iface.sw_if_index]
self.config['bondethernets'].pop(iface.sw_if_index, None)
del self.cache['bondethernet_members'][iface.sw_if_index]
self.cache['bondethernets'].pop(iface.sw_if_index, None)
return True
def readconfig(self):
@ -142,7 +142,7 @@ class VPPApi():
self.logger.error("Could not connect to VPP")
return False
self.config_read = False
self.cache_read = False
## Workaround LCPng and linux-cp, in order.
self.lcp_enabled = False
@ -152,7 +152,7 @@ class VPPApi():
r = self.vpp.api.lcpng_itf_pair_get()
if isinstance(r, tuple) and r[0].retval == 0:
for lcp in r[1]:
self.config['lcps'][lcp.phy_sw_if_index] = lcp
self.cache['lcps'][lcp.phy_sw_if_index] = lcp
self.lcp_enabled = True
except:
self.logger.warning("lcpng not found, trying linux-cp")
@ -162,7 +162,7 @@ class VPPApi():
r = self.vpp.api.lcp_itf_pair_get()
if isinstance(r, tuple) and r[0].retval == 0:
for lcp in r[1]:
self.config['lcps'][lcp.phy_sw_if_index] = lcp
self.cache['lcps'][lcp.phy_sw_if_index] = lcp
self.lcp_enabled = True
except:
pass
@ -173,43 +173,43 @@ class VPPApi():
self.logger.debug("Retrieving interfaces")
r = self.vpp.api.sw_interface_dump()
for iface in r:
self.config['interfaces'][iface.sw_if_index] = iface
self.config['interface_names'][iface.interface_name] = iface
self.config['interface_addresses'][iface.sw_if_index] = []
self.cache['interfaces'][iface.sw_if_index] = iface
self.cache['interface_names'][iface.interface_name] = iface
self.cache['interface_addresses'][iface.sw_if_index] = []
self.logger.debug("Retrieving IPv4 addresses for %s" % iface.interface_name)
ipr = self.vpp.api.ip_address_dump(sw_if_index=iface.sw_if_index, is_ipv6=False)
for ip in ipr:
self.config['interface_addresses'][iface.sw_if_index].append(str(ip.prefix))
self.cache['interface_addresses'][iface.sw_if_index].append(str(ip.prefix))
self.logger.debug("Retrieving IPv6 addresses for %s" % iface.interface_name)
ipr = self.vpp.api.ip_address_dump(sw_if_index=iface.sw_if_index, is_ipv6=True)
for ip in ipr:
self.config['interface_addresses'][iface.sw_if_index].append(str(ip.prefix))
self.cache['interface_addresses'][iface.sw_if_index].append(str(ip.prefix))
self.logger.debug("Retrieving bondethernets")
r = self.vpp.api.sw_bond_interface_dump()
for iface in r:
self.config['bondethernets'][iface.sw_if_index] = iface
self.config['bondethernet_members'][iface.sw_if_index] = []
self.cache['bondethernets'][iface.sw_if_index] = iface
self.cache['bondethernet_members'][iface.sw_if_index] = []
for member in self.vpp.api.sw_member_interface_dump(sw_if_index=iface.sw_if_index):
self.config['bondethernet_members'][iface.sw_if_index].append(member.sw_if_index)
self.cache['bondethernet_members'][iface.sw_if_index].append(member.sw_if_index)
self.logger.debug("Retrieving bridgedomains")
r = self.vpp.api.bridge_domain_dump()
for bridge in r:
self.config['bridgedomains'][bridge.bd_id] = bridge
self.cache['bridgedomains'][bridge.bd_id] = bridge
self.logger.debug("Retrieving vxlan_tunnels")
r = self.vpp.api.vxlan_tunnel_v2_dump()
for vxlan in r:
self.config['vxlan_tunnels'][vxlan.sw_if_index] = vxlan
self.cache['vxlan_tunnels'][vxlan.sw_if_index] = vxlan
self.logger.debug("Retrieving L2 Cross Connects")
r = self.vpp.api.l2_xconnect_dump()
for l2xc in r:
self.config['l2xcs'][l2xc.rx_sw_if_index] = l2xc
self.cache['l2xcs'][l2xc.rx_sw_if_index] = l2xc
self.config_read = True
return self.config_read
self.cache_read = True
return self.cache_read
def get_encapsulation(self, iface):
""" Return a string with the encapsulation of a subint """
@ -228,7 +228,7 @@ class VPPApi():
in VPP. Return False otherwise."""
ret = True
for ifname in ifname_list:
if not ifname in self.config['interface_names']:
if not ifname in self.cache['interface_names']:
self.logger.warning("Interface %s does not exist in VPP" % ifname)
ret = False
return ret
@ -240,35 +240,35 @@ class VPPApi():
self.dump_subints()
def get_sub_interfaces(self):
subints = [self.config['interfaces'][x].interface_name for x in self.config['interfaces'] if self.config['interfaces'][x].sub_id>0 and self.config['interfaces'][x].sub_number_of_tags > 0]
subints = [self.cache['interfaces'][x].interface_name for x in self.cache['interfaces'] if self.cache['interfaces'][x].sub_id>0 and self.cache['interfaces'][x].sub_number_of_tags > 0]
return subints
def get_qinx_interfaces(self):
qinx_subints = [self.config['interfaces'][x].interface_name for x in self.config['interfaces'] if self.config['interfaces'][x].sub_id>0 and self.config['interfaces'][x].sub_inner_vlan_id>0]
qinx_subints = [self.cache['interfaces'][x].interface_name for x in self.cache['interfaces'] if self.cache['interfaces'][x].sub_id>0 and self.cache['interfaces'][x].sub_inner_vlan_id>0]
return qinx_subints
def get_dot1x_interfaces(self):
dot1x_subints = [self.config['interfaces'][x].interface_name for x in self.config['interfaces'] if self.config['interfaces'][x].sub_id>0 and self.config['interfaces'][x].sub_inner_vlan_id==0]
dot1x_subints = [self.cache['interfaces'][x].interface_name for x in self.cache['interfaces'] if self.cache['interfaces'][x].sub_id>0 and self.cache['interfaces'][x].sub_inner_vlan_id==0]
return dot1x_subints
def get_loopbacks(self):
loopbacks = [self.config['interfaces'][x].interface_name for x in self.config['interfaces'] if self.config['interfaces'][x].interface_dev_type=='Loopback']
loopbacks = [self.cache['interfaces'][x].interface_name for x in self.cache['interfaces'] if self.cache['interfaces'][x].interface_dev_type=='Loopback']
return loopbacks
def get_phys(self):
phys = [self.config['interfaces'][x].interface_name for x in self.config['interfaces'] if self.config['interfaces'][x].sw_if_index == self.config['interfaces'][x].sup_sw_if_index and self.config['interfaces'][x].interface_dev_type not in ['virtio', 'BVI', 'Loopback', 'VXLAN', 'local', 'bond']]
phys = [self.cache['interfaces'][x].interface_name for x in self.cache['interfaces'] if self.cache['interfaces'][x].sw_if_index == self.cache['interfaces'][x].sup_sw_if_index and self.cache['interfaces'][x].interface_dev_type not in ['virtio', 'BVI', 'Loopback', 'VXLAN', 'local', 'bond']]
return phys
def get_bondethernets(self):
bonds = [self.config['bondethernets'][x].interface_name for x in self.config['bondethernets']]
bonds = [self.cache['bondethernets'][x].interface_name for x in self.cache['bondethernets']]
return bonds
def get_vxlan_tunnels(self):
vxlan_tunnels = [self.config['interfaces'][x].interface_name for x in self.config['interfaces'] if self.config['interfaces'][x].interface_dev_type in ['VXLAN']]
vxlan_tunnels = [self.cache['interfaces'][x].interface_name for x in self.cache['interfaces'] if self.cache['interfaces'][x].interface_dev_type in ['VXLAN']]
return vxlan_tunnels
def get_lcp_by_interface(self, sw_if_index):
for idx, lcp in self.config['lcps'].items():
for idx, lcp in self.cache['lcps'].items():
if lcp.phy_sw_if_index == sw_if_index:
return lcp
return None
@ -276,65 +276,65 @@ class VPPApi():
def dump_phys(self):
phys = self.get_phys()
for ifname in phys:
iface = self.config['interface_names'][ifname]
iface = self.cache['interface_names'][ifname]
self.logger.info("%s idx=%d" % (iface.interface_name, iface.sw_if_index))
def dump_subints(self):
self.logger.info("*** QinX ***")
subints = self.get_qinx_interfaces()
for ifname in subints:
iface = self.config['interface_names'][ifname]
iface = self.cache['interface_names'][ifname]
self.logger.info("%s idx=%d encap=%s" % (iface.interface_name, iface.sw_if_index, self.get_encapsulation(iface)))
self.logger.info("*** .1q/.1ad ***")
subints = self.get_dot1x_interfaces()
for ifname in subints:
iface = self.config['interface_names'][ifname]
iface = self.cache['interface_names'][ifname]
self.logger.info("%s idx=%d encap=%s" % (iface.interface_name, iface.sw_if_index, self.get_encapsulation(iface)))
def dump_bridgedomains(self):
for bd_id, bridge in self.config['bridgedomains'].items():
for bd_id, bridge in self.cache['bridgedomains'].items():
self.logger.info("BridgeDomain%d" % (bridge.bd_id))
if bridge.bvi_sw_if_index > 0 and bridge.bvi_sw_if_index < 2**32-1 :
self.logger.info(" BVI: " + self.config['interfaces'][bridge.bvi_sw_if_index].interface_name)
self.logger.info(" BVI: " + self.cache['interfaces'][bridge.bvi_sw_if_index].interface_name)
members = []
for member in bridge.sw_if_details:
members.append(self.config['interfaces'][member.sw_if_index].interface_name)
members.append(self.cache['interfaces'][member.sw_if_index].interface_name)
if len(members) > 0:
self.logger.info(" Members: " + ' '.join(members))
def dump_interfaces(self):
for idx, iface in self.config['interfaces'].items():
for idx, iface in self.cache['interfaces'].items():
self.logger.info("%s idx=%d type=%s mac=%s mtu=%d flags=%d" % (iface.interface_name,
iface.sw_if_index, iface.interface_dev_type, iface.l2_address,
iface.mtu[0], iface.flags))
if iface.interface_dev_type=='bond' and iface.sub_id == 0 and iface.sw_if_index in self.config['bondethernet_members']:
members = [self.config['interfaces'][x].interface_name for x in self.config['bondethernet_members'][iface.sw_if_index]]
if iface.interface_dev_type=='bond' and iface.sub_id == 0 and iface.sw_if_index in self.cache['bondethernet_members']:
members = [self.cache['interfaces'][x].interface_name for x in self.cache['bondethernet_members'][iface.sw_if_index]]
self.logger.info(" Members: %s" % ' '.join(members))
if iface.interface_dev_type=="VXLAN":
vxlan = self.config['vxlan_tunnels'][iface.sw_if_index]
vxlan = self.cache['vxlan_tunnels'][iface.sw_if_index]
self.logger.info(" VXLAN: %s:%d -> %s:%d VNI %d" % (vxlan.src_address, vxlan.src_port,
vxlan.dst_address, vxlan.dst_port, vxlan.vni))
if iface.sub_id > 0:
self.logger.info(" Encapsulation: %s" % (self.get_encapsulation(iface)))
if iface.sw_if_index in self.config['lcps']:
lcp = self.config['lcps'][iface.sw_if_index]
tap_name = self.config['interfaces'][lcp.host_sw_if_index].interface_name
if iface.sw_if_index in self.cache['lcps']:
lcp = self.cache['lcps'][iface.sw_if_index]
tap_name = self.cache['interfaces'][lcp.host_sw_if_index].interface_name
tap_idx = lcp.host_sw_if_index
self.logger.info(" TAP: %s (tap=%s idx=%d)" % (lcp.host_if_name, tap_name, tap_idx))
if len(self.config['interface_addresses'][iface.sw_if_index])>0:
self.logger.info(" L3: %s" % ' '.join(self.config['interface_addresses'][iface.sw_if_index]))
if len(self.cache['interface_addresses'][iface.sw_if_index])>0:
self.logger.info(" L3: %s" % ' '.join(self.cache['interface_addresses'][iface.sw_if_index]))
if iface.sw_if_index in self.config['l2xcs']:
l2xc = self.config['l2xcs'][iface.sw_if_index]
self.logger.info(" L2XC: %s" % self.config['interfaces'][l2xc.tx_sw_if_index].interface_name)
if iface.sw_if_index in self.cache['l2xcs']:
l2xc = self.cache['l2xcs'][iface.sw_if_index]
self.logger.info(" L2XC: %s" % self.cache['interfaces'][l2xc.tx_sw_if_index].interface_name)
for bd_id, bridge in self.config['bridgedomains'].items():
for bd_id, bridge in self.cache['bridgedomains'].items():
if bridge.bvi_sw_if_index == iface.sw_if_index:
self.logger.info(" BVI: BridgeDomain%d" % (bd_id))