Initial formatting run with Black. Integration tests and unit tests pass before and after this change.

This commit is contained in:
Pim van Pelt
2022-04-22 13:05:55 +00:00
parent b375ddb433
commit baaaaa67b5
22 changed files with 1757 additions and 1178 deletions

View File

@ -23,6 +23,7 @@ import logging
import ipaddress
import os.path
import sys
try:
import yamale
except ImportError:
@ -37,13 +38,15 @@ from config.tap import validate_taps
from yamale.validators import DefaultValidators, Validator
class IPInterfaceWithPrefixLength(Validator):
""" Custom IPAddress config - takes IP/prefixlen as input:
"""Custom IPAddress config - takes IP/prefixlen as input:
192.0.2.1/29 or 2001:db8::1/64 are correct. The PrefixLength
is required, and must be a number (0-32 for IPv4 and 0-128 for
IPv6).
"""
tag = 'ip_interface'
tag = "ip_interface"
def _is_valid(self, value):
try:
@ -52,9 +55,9 @@ class IPInterfaceWithPrefixLength(Validator):
return False
if not isinstance(value, str):
return False
if not '/' in value:
if not "/" in value:
return False
e = value.split('/')
e = value.split("/")
if not len(e) == 2:
return False
if not e[1].isnumeric():
@ -62,10 +65,9 @@ class IPInterfaceWithPrefixLength(Validator):
return True
class Validator(object):
def __init__(self, schema):
self.logger = logging.getLogger('vppcfg.config')
self.logger = logging.getLogger("vppcfg.config")
self.logger.addHandler(logging.NullHandler())
self.schema = schema
@ -75,7 +77,8 @@ class Validator(object):
validate_loopbacks,
validate_bridgedomains,
validate_vxlan_tunnels,
validate_taps ]
validate_taps,
]
def validate(self, yaml):
ret_rv = True
@ -109,7 +112,7 @@ class Validator(object):
ret_rv = False
for result in e.results:
for error in result.errors:
ret_msgs.extend([f'yamale: {error}'])
ret_msgs.extend([f"yamale: {error}"])
return ret_rv, ret_msgs
self.logger.debug("Validating Semantics...")
@ -126,7 +129,7 @@ class Validator(object):
return ret_rv, ret_msgs
def valid_config(self, yaml):
""" Validate the given YAML configuration in 'yaml' against syntax
"""Validate the given YAML configuration in 'yaml' against syntax
validation given in the yamale 'schema', and all semantic configs.
Returns True if the configuration is valid, False otherwise.
@ -142,7 +145,7 @@ class Validator(object):
return True
def add_validator(self, func):
""" Add a validator function, which strictly takes the prototype
"""Add a validator function, which strictly takes the prototype
rv, msgs = func(yaml)
returning a Boolean success value in rv and a List of strings
in msgs. The function will be passed the configuration YAML and

View File

@ -15,49 +15,51 @@ import logging
import config.interface as interface
import ipaddress
def get_all_addresses_except_ifname(yaml, except_ifname):
""" Return a list of all ipaddress.ip_interface() instances in the entire config,
"""Return a list of all ipaddress.ip_interface() instances in the entire config,
except for those that belong to 'ifname'.
"""
ret = []
if 'interfaces' in yaml:
for ifname, iface in yaml['interfaces'].items():
if "interfaces" in yaml:
for ifname, iface in yaml["interfaces"].items():
if ifname == except_ifname:
continue
if 'addresses' in iface:
for a in iface['addresses']:
if "addresses" in iface:
for a in iface["addresses"]:
ret.append(ipaddress.ip_interface(a))
if 'sub-interfaces' in iface:
for subid, sub_iface in iface['sub-interfaces'].items():
if "sub-interfaces" in iface:
for subid, sub_iface in iface["sub-interfaces"].items():
sub_ifname = f"{ifname}.{int(subid)}"
if sub_ifname == except_ifname:
continue
if 'addresses' in sub_iface:
for a in sub_iface['addresses']:
if "addresses" in sub_iface:
for a in sub_iface["addresses"]:
ret.append(ipaddress.ip_interface(a))
if 'loopbacks' in yaml:
for ifname, iface in yaml['loopbacks'].items():
if "loopbacks" in yaml:
for ifname, iface in yaml["loopbacks"].items():
if ifname == except_ifname:
continue
if 'addresses' in iface:
for a in iface['addresses']:
if "addresses" in iface:
for a in iface["addresses"]:
ret.append(ipaddress.ip_interface(a))
if 'bridgedomains' in yaml:
for ifname, iface in yaml['bridgedomains'].items():
if "bridgedomains" in yaml:
for ifname, iface in yaml["bridgedomains"].items():
if ifname == except_ifname:
continue
if 'addresses' in iface:
for a in iface['addresses']:
if "addresses" in iface:
for a in iface["addresses"]:
ret.append(ipaddress.ip_interface(a))
return ret
def is_allowed(yaml, ifname, iface_addresses, ip_interface):
""" Returns True if there is at most one occurence of the ip_interface (an IPv4/IPv6 prefix+len)
"""Returns True if there is at most one occurence of the ip_interface (an IPv4/IPv6 prefix+len)
in the entire config. That said, we need the 'iface_addresses' because VPP is a bit fickle in
this regard.

View File

@ -15,46 +15,47 @@ import logging
import config.interface as interface
import config.mac as mac
def get_bondethernets(yaml):
""" Return a list of all bondethernets. """
"""Return a list of all bondethernets."""
ret = []
if 'bondethernets' in yaml:
for ifname, iface in yaml['bondethernets'].items():
if "bondethernets" in yaml:
for ifname, iface in yaml["bondethernets"].items():
ret.append(ifname)
return ret
def get_by_name(yaml, ifname):
""" Return the BondEthernet by name, if it exists. Return None,None otherwise. """
"""Return the BondEthernet by name, if it exists. Return None,None otherwise."""
try:
if ifname in yaml['bondethernets']:
return ifname, yaml['bondethernets'][ifname]
if ifname in yaml["bondethernets"]:
return ifname, yaml["bondethernets"][ifname]
except:
pass
return None, None
def is_bondethernet(yaml, ifname):
""" Returns True if the interface name is an existing BondEthernet. """
"""Returns True if the interface name is an existing BondEthernet."""
ifname, iface = get_by_name(yaml, ifname)
return not iface == None
def is_bond_member(yaml, ifname):
""" Returns True if this interface is a member of a BondEthernet. """
if not 'bondethernets' in yaml:
"""Returns True if this interface is a member of a BondEthernet."""
if not "bondethernets" in yaml:
return False
for bond, iface in yaml['bondethernets'].items():
if not 'interfaces' in iface:
for bond, iface in yaml["bondethernets"].items():
if not "interfaces" in iface:
continue
if ifname in iface['interfaces']:
if ifname in iface["interfaces"]:
return True
return False
def get_mode(yaml, ifname):
""" Return the mode of the BondEthernet as a string, defaulting to 'lacp'
"""Return the mode of the BondEthernet as a string, defaulting to 'lacp'
if no mode is given. Return None if the bond interface doesn't exist.
Return values: 'round-robin','active-backup','broadcast','lacp','xor'
@ -63,18 +64,18 @@ def get_mode(yaml, ifname):
if not iface:
return None
if not 'mode' in iface:
return 'lacp'
return iface['mode']
if not "mode" in iface:
return "lacp"
return iface["mode"]
def mode_to_int(mode):
""" Returns the integer representation in VPP of a given bondethernet mode,
"""Returns the integer representation in VPP of a given bondethernet mode,
or -1 if 'mode' is not a valid string.
See src/vnet/bonding/bond.api and schema.yaml for valid pairs. """
See src/vnet/bonding/bond.api and schema.yaml for valid pairs."""
ret = { 'round-robin': 1, 'active-backup': 2, 'xor': 3, 'broadcast': 4, 'lacp': 5 }
ret = {"round-robin": 1, "active-backup": 2, "xor": 3, "broadcast": 4, "lacp": 5}
try:
return ret[mode]
except:
@ -83,12 +84,12 @@ def mode_to_int(mode):
def int_to_mode(mode):
""" Returns the string representation in VPP of a given bondethernet mode,
"""Returns the string representation in VPP of a given bondethernet mode,
or "" if 'mode' is not a valid id.
See src/vnet/bonding/bond.api and schema.yaml for valid pairs. """
See src/vnet/bonding/bond.api and schema.yaml for valid pairs."""
ret = { 1: 'round-robin', 2: 'active-backup', 3: 'xor', 4: 'broadcast', 5: 'lacp' }
ret = {1: "round-robin", 2: "active-backup", 3: "xor", 4: "broadcast", 5: "lacp"}
try:
return ret[mode]
except:
@ -97,7 +98,7 @@ def int_to_mode(mode):
def get_lb(yaml, ifname):
""" Return the loadbalance strategy of the BondEthernet as a string. Only
"""Return the loadbalance strategy of the BondEthernet as a string. Only
'xor' and 'lacp' modes have loadbalance strategies, so return None if
those modes are not used.
@ -108,22 +109,29 @@ def get_lb(yaml, ifname):
if not iface:
return None
mode = get_mode(yaml, ifname)
if not mode in ['xor','lacp']:
if not mode in ["xor", "lacp"]:
return None
if not 'load-balance' in iface:
return 'l34'
return iface['load-balance']
if not "load-balance" in iface:
return "l34"
return iface["load-balance"]
def lb_to_int(lb):
""" Returns the integer representation in VPP of a given load-balance strategy,
"""Returns the integer representation in VPP of a given load-balance strategy,
or -1 if 'lb' is not a valid string.
See src/vnet/bonding/bond.api and schema.yaml for valid pairs, although
bond.api defined more than we use in vppcfg. """
bond.api defined more than we use in vppcfg."""
ret = { 'l2': 0, 'l34': 1, 'l23': 2, 'round-robin': 3, 'broadcast': 4, 'active-backup': 5 }
ret = {
"l2": 0,
"l34": 1,
"l23": 2,
"round-robin": 3,
"broadcast": 4,
"active-backup": 5,
}
try:
return ret[lb]
except:
@ -132,13 +140,20 @@ def lb_to_int(lb):
def int_to_lb(lb):
""" Returns the string representation in VPP of a given load-balance strategy,
"""Returns the string representation in VPP of a given load-balance strategy,
or "" if 'lb' is not a valid int.
See src/vnet/bonding/bond.api and schema.yaml for valid pairs, although
bond.api defined more than we use in vppcfg. """
bond.api defined more than we use in vppcfg."""
ret = { 0: 'l2', 1: 'l34', 2: 'l23', 3: 'round-robin', 4: 'broadcast', 5: 'active-backup' }
ret = {
0: "l2",
1: "l34",
2: "l23",
3: "round-robin",
4: "broadcast",
5: "active-backup",
}
try:
return ret[lb]
except:
@ -149,13 +164,13 @@ def int_to_lb(lb):
def validate_bondethernets(yaml):
result = True
msgs = []
logger = logging.getLogger('vppcfg.config')
logger = logging.getLogger("vppcfg.config")
logger.addHandler(logging.NullHandler())
if not 'bondethernets' in yaml:
if not "bondethernets" in yaml:
return result, msgs
for ifname, iface in yaml['bondethernets'].items():
for ifname, iface in yaml["bondethernets"].items():
logger.debug(f"bondethernet {ifname}: {iface}")
bond_ifname, bond_iface = interface.get_by_name(yaml, ifname)
bond_mtu = 1500
@ -166,26 +181,37 @@ def validate_bondethernets(yaml):
bond_mtu = interface.get_mtu(yaml, bond_ifname)
instance = int(ifname[12:])
if instance > 4294967294:
msgs.append(f"bondethernet {ifname} has instance {int(instance)} which is too large")
msgs.append(
f"bondethernet {ifname} has instance {int(instance)} which is too large"
)
result = False
if not get_mode(yaml, bond_ifname) in ['xor','lacp'] and 'load-balance' in iface:
msgs.append(f"bondethernet {ifname} can only have load-balance if in mode XOR or LACP")
if (
not get_mode(yaml, bond_ifname) in ["xor", "lacp"]
and "load-balance" in iface
):
msgs.append(
f"bondethernet {ifname} can only have load-balance if in mode XOR or LACP"
)
result = False
if 'mac' in iface and mac.is_multicast(iface['mac']):
msgs.append(f"bondethernet {ifname} MAC address {iface['mac']} cannot be multicast")
if "mac" in iface and mac.is_multicast(iface["mac"]):
msgs.append(
f"bondethernet {ifname} MAC address {iface['mac']} cannot be multicast"
)
result = False
if not 'interfaces' in iface:
if not "interfaces" in iface:
continue
for member in iface['interfaces']:
for member in iface["interfaces"]:
if (None, None) == interface.get_by_name(yaml, member):
msgs.append(f"bondethernet {ifname} member {member} does not exist")
result = False
continue
if interface.has_sub(yaml, member):
msgs.append(f"bondethernet {ifname} member {member} has sub-interface(s)")
msgs.append(
f"bondethernet {ifname} member {member} has sub-interface(s)"
)
result = False
if interface.has_lcp(yaml, member):
msgs.append(f"bondethernet {ifname} member {member} has an LCP")
@ -195,6 +221,8 @@ def validate_bondethernets(yaml):
result = False
member_mtu = interface.get_mtu(yaml, member)
if member_mtu != bond_mtu:
msgs.append(f"bondethernet {ifname} member {member} MTU {int(member_mtu)} does not match BondEthernet MTU {int(bond_mtu)}")
msgs.append(
f"bondethernet {ifname} member {member} MTU {int(member_mtu)} does not match BondEthernet MTU {int(bond_mtu)}"
)
result = False
return result, msgs

View File

@ -19,66 +19,67 @@ import config.address as address
def get_bridgedomains(yaml):
""" Return a list of all bridgedomains. """
"""Return a list of all bridgedomains."""
ret = []
if not 'bridgedomains' in yaml:
if not "bridgedomains" in yaml:
return ret
for ifname, iface in yaml['bridgedomains'].items():
for ifname, iface in yaml["bridgedomains"].items():
ret.append(ifname)
return ret
def get_by_name(yaml, ifname):
""" Return the BridgeDomain by name (bd*), if it exists. Return None,None otherwise. """
"""Return the BridgeDomain by name (bd*), if it exists. Return None,None otherwise."""
try:
if ifname in yaml['bridgedomains']:
return ifname, yaml['bridgedomains'][ifname]
if ifname in yaml["bridgedomains"]:
return ifname, yaml["bridgedomains"][ifname]
except:
pass
return None, None
def is_bridgedomain(yaml, ifname):
""" Returns True if the name (bd*) is an existing bridgedomain. """
"""Returns True if the name (bd*) is an existing bridgedomain."""
ifname, iface = get_by_name(yaml, ifname)
return not iface == None
def get_bridge_interfaces(yaml):
""" Returns a list of all interfaces that are bridgedomain members """
"""Returns a list of all interfaces that are bridgedomain members"""
ret = []
if not 'bridgedomains' in yaml:
if not "bridgedomains" in yaml:
return ret
for ifname, iface in yaml['bridgedomains'].items():
if 'interfaces' in iface:
ret.extend(iface['interfaces'])
for ifname, iface in yaml["bridgedomains"].items():
if "interfaces" in iface:
ret.extend(iface["interfaces"])
return ret
def is_bridge_interface_unique(yaml, ifname):
""" Returns True if this interface is referenced in bridgedomains zero or one times """
"""Returns True if this interface is referenced in bridgedomains zero or one times"""
ifs = get_bridge_interfaces(yaml)
return ifs.count(ifname) < 2
def is_bridge_interface(yaml, ifname):
""" Returns True if this interface is a member of a BridgeDomain """
"""Returns True if this interface is a member of a BridgeDomain"""
return ifname in get_bridge_interfaces(yaml)
def bvi_unique(yaml, bviname):
""" Returns True if the BVI identified by bviname is unique among all BridgeDomains. """
if not 'bridgedomains' in yaml:
"""Returns True if the BVI identified by bviname is unique among all BridgeDomains."""
if not "bridgedomains" in yaml:
return True
n = 0
for ifname, iface in yaml['bridgedomains'].items():
if 'bvi' in iface and iface['bvi'] == bviname:
for ifname, iface in yaml["bridgedomains"].items():
if "bvi" in iface and iface["bvi"] == bviname:
n += 1
return n<2
return n < 2
def get_settings(yaml, ifname):
@ -87,57 +88,61 @@ def get_settings(yaml, ifname):
return None
settings = {
'learn': True,
'unicast-flood': True,
'unknown-unicast-flood': True,
'unicast-forward': True,
'arp-termination': False,
'arp-unicast-forward': False,
'mac-age-minutes': 0, ## 0 means disabled
"learn": True,
"unicast-flood": True,
"unknown-unicast-flood": True,
"unicast-forward": True,
"arp-termination": False,
"arp-unicast-forward": False,
"mac-age-minutes": 0, ## 0 means disabled
}
if 'settings' in iface:
if 'learn' in iface['settings']:
settings['learn'] = iface['settings']['learn']
if 'unicast-flood' in iface['settings']:
settings['unicast-flood'] = iface['settings']['unicast-flood']
if 'unknown-unicast-flood' in iface['settings']:
settings['unknown-unicast-flood'] = iface['settings']['unknown-unicast-flood']
if 'unicast-forward' in iface['settings']:
settings['unicast-forward'] = iface['settings']['unicast-forward']
if 'arp-termination' in iface['settings']:
settings['arp-termination'] = iface['settings']['arp-termination']
if 'arp-unicast-forward' in iface['settings']:
settings['arp-unicast-forward'] = iface['settings']['arp-unicast-forward']
if 'mac-age-minutes' in iface['settings']:
settings['mac-age-minutes'] = int(iface['settings']['mac-age-minutes'])
if "settings" in iface:
if "learn" in iface["settings"]:
settings["learn"] = iface["settings"]["learn"]
if "unicast-flood" in iface["settings"]:
settings["unicast-flood"] = iface["settings"]["unicast-flood"]
if "unknown-unicast-flood" in iface["settings"]:
settings["unknown-unicast-flood"] = iface["settings"][
"unknown-unicast-flood"
]
if "unicast-forward" in iface["settings"]:
settings["unicast-forward"] = iface["settings"]["unicast-forward"]
if "arp-termination" in iface["settings"]:
settings["arp-termination"] = iface["settings"]["arp-termination"]
if "arp-unicast-forward" in iface["settings"]:
settings["arp-unicast-forward"] = iface["settings"]["arp-unicast-forward"]
if "mac-age-minutes" in iface["settings"]:
settings["mac-age-minutes"] = int(iface["settings"]["mac-age-minutes"])
return settings
def validate_bridgedomains(yaml):
result = True
msgs = []
logger = logging.getLogger('vppcfg.config')
logger = logging.getLogger("vppcfg.config")
logger.addHandler(logging.NullHandler())
if not 'bridgedomains' in yaml:
if not "bridgedomains" in yaml:
return result, msgs
for ifname, iface in yaml['bridgedomains'].items():
for ifname, iface in yaml["bridgedomains"].items():
logger.debug(f"bridgedomain {iface}")
bd_mtu = 1500
if 'mtu' in iface:
bd_mtu = iface['mtu']
if "mtu" in iface:
bd_mtu = iface["mtu"]
instance = int(ifname[2:])
if instance == 0:
msgs.append(f"bridgedomain {ifname} is reserved")
result = False
elif instance > 16777215:
msgs.append(f"bridgedomain {ifname} has instance {int(instance)} which is too large")
msgs.append(
f"bridgedomain {ifname} has instance {int(instance)} which is too large"
)
result = False
if 'bvi' in iface:
bviname = iface['bvi']
bvi_ifname, bvi_iface = loopback.get_by_name(yaml,iface['bvi'])
if "bvi" in iface:
bviname = iface["bvi"]
bvi_ifname, bvi_iface = loopback.get_by_name(yaml, iface["bvi"])
if not bvi_unique(yaml, bvi_ifname):
msgs.append(f"bridgedomain {ifname} BVI {bvi_ifname} is not unique")
result = False
@ -147,14 +152,16 @@ def validate_bridgedomains(yaml):
continue
bvi_mtu = 1500
if 'mtu' in bvi_iface:
bvi_mtu = bvi_iface['mtu']
if "mtu" in bvi_iface:
bvi_mtu = bvi_iface["mtu"]
if bvi_mtu != bd_mtu:
msgs.append(f"bridgedomain {ifname} BVI {bvi_ifname} has MTU {int(bvi_mtu)}, while bridge has {int(bd_mtu)}")
msgs.append(
f"bridgedomain {ifname} BVI {bvi_ifname} has MTU {int(bvi_mtu)}, while bridge has {int(bd_mtu)}"
)
result = False
if 'interfaces' in iface:
for member in iface['interfaces']:
if "interfaces" in iface:
for member in iface["interfaces"]:
if (None, None) == interface.get_by_name(yaml, member):
msgs.append(f"bridgedomain {ifname} member {member} does not exist")
result = False
@ -171,8 +178,9 @@ def validate_bridgedomains(yaml):
result = False
member_mtu = interface.get_mtu(yaml, member)
if member_mtu != bd_mtu:
msgs.append(f"bridgedomain {ifname} member {member} has MTU {int(member_mtu)}, while bridge has {int(bd_mtu)}")
msgs.append(
f"bridgedomain {ifname} member {member} has MTU {int(member_mtu)}, while bridge has {int(bd_mtu)}"
)
result = False
return result, msgs

View File

@ -21,77 +21,78 @@ import config.address as address
import config.mac as mac
import config.tap as tap
def get_qinx_parent_by_name(yaml, ifname):
""" Returns the sub-interface which matches a QinAD or QinQ outer tag, or None,None
if that sub-interface doesn't exist. """
"""Returns the sub-interface which matches a QinAD or QinQ outer tag, or None,None
if that sub-interface doesn't exist."""
if not is_qinx(yaml, ifname):
return None, None
qinx_ifname, qinx_iface = get_by_name(yaml, ifname)
if not qinx_iface:
return None,None
return None, None
qinx_encap = get_encapsulation(yaml, ifname)
if not qinx_encap:
return None,None
return None, None
parent_ifname, parent_iface = get_parent_by_name(yaml, ifname)
if not parent_iface:
return None,None
return None, None
for subid, sub_iface in parent_iface['sub-interfaces'].items():
for subid, sub_iface in parent_iface["sub-interfaces"].items():
sub_ifname = f"{parent_ifname}.{int(subid)}"
sub_encap = get_encapsulation(yaml, sub_ifname)
if not sub_encap:
continue
if qinx_encap['dot1q'] > 0 and sub_encap['dot1q'] == qinx_encap['dot1q']:
if qinx_encap["dot1q"] > 0 and sub_encap["dot1q"] == qinx_encap["dot1q"]:
return sub_ifname, sub_iface
if qinx_encap['dot1ad'] > 0 and sub_encap['dot1ad'] == qinx_encap['dot1ad']:
if qinx_encap["dot1ad"] > 0 and sub_encap["dot1ad"] == qinx_encap["dot1ad"]:
return sub_ifname, sub_iface
return None,None
return None, None
def get_parent_by_name(yaml, ifname):
""" Returns the sub-interface's parent, or None,None if the sub-int doesn't exist. """
"""Returns the sub-interface's parent, or None,None if the sub-int doesn't exist."""
try:
parent_ifname, subid = ifname.split('.')
parent_ifname, subid = ifname.split(".")
subid = int(subid)
iface = yaml['interfaces'][parent_ifname]
iface = yaml["interfaces"][parent_ifname]
return parent_ifname, iface
except:
pass
return None,None
return None, None
def get_by_lcp_name(yaml, lcpname):
""" Returns the interface or sub-interface by a given lcp name, or None,None if it does not exist """
if not 'interfaces' in yaml:
return None,None
for ifname, iface in yaml['interfaces'].items():
if 'lcp' in iface and iface['lcp'] == lcpname:
"""Returns the interface or sub-interface by a given lcp name, or None,None if it does not exist"""
if not "interfaces" in yaml:
return None, None
for ifname, iface in yaml["interfaces"].items():
if "lcp" in iface and iface["lcp"] == lcpname:
return ifname, iface
if not 'sub-interfaces' in iface:
if not "sub-interfaces" in iface:
continue
for subid, sub_iface in yaml['interfaces'][ifname]['sub-interfaces'].items():
for subid, sub_iface in yaml["interfaces"][ifname]["sub-interfaces"].items():
sub_ifname = f"{ifname}.{int(subid)}"
if 'lcp' in sub_iface and sub_iface['lcp'] == lcpname:
if "lcp" in sub_iface and sub_iface["lcp"] == lcpname:
return sub_ifname, sub_iface
return None,None
return None, None
def get_by_name(yaml, ifname):
""" Returns the interface or sub-interface by a given name, or None,None if it does not exist """
if '.' in ifname:
"""Returns the interface or sub-interface by a given name, or None,None if it does not exist"""
if "." in ifname:
try:
phy_ifname, subid = ifname.split('.')
phy_ifname, subid = ifname.split(".")
subid = int(subid)
iface = yaml['interfaces'][phy_ifname]['sub-interfaces'][subid]
iface = yaml["interfaces"][phy_ifname]["sub-interfaces"][subid]
return ifname, iface
except:
return None, None
try:
iface = yaml['interfaces'][ifname]
iface = yaml["interfaces"][ifname]
return ifname, iface
except:
pass
@ -99,114 +100,118 @@ def get_by_name(yaml, ifname):
def is_sub(yaml, ifname):
""" Returns True if this interface is a sub-interface """
"""Returns True if this interface is a sub-interface"""
parent_ifname, parent_iface = get_parent_by_name(yaml, ifname)
return isinstance(parent_iface, dict)
def has_sub(yaml, ifname):
""" Returns True if this interface has sub-interfaces """
if not 'interfaces' in yaml:
"""Returns True if this interface has sub-interfaces"""
if not "interfaces" in yaml:
return False
if ifname in yaml['interfaces']:
iface = yaml['interfaces'][ifname]
if 'sub-interfaces' in iface and len(iface['sub-interfaces']) > 0:
if ifname in yaml["interfaces"]:
iface = yaml["interfaces"][ifname]
if "sub-interfaces" in iface and len(iface["sub-interfaces"]) > 0:
return True
return False
def has_address(yaml, ifname):
""" Returns True if this interface or sub-interface has one or more addresses"""
"""Returns True if this interface or sub-interface has one or more addresses"""
ifname, iface = get_by_name(yaml, ifname)
if not iface:
return False
return 'addresses' in iface
return "addresses" in iface
def get_l2xc_interfaces(yaml):
""" Returns a list of all interfaces that have an L2 CrossConnect """
"""Returns a list of all interfaces that have an L2 CrossConnect"""
ret = []
if not 'interfaces' in yaml:
if not "interfaces" in yaml:
return ret
for ifname, iface in yaml['interfaces'].items():
if 'l2xc' in iface:
for ifname, iface in yaml["interfaces"].items():
if "l2xc" in iface:
ret.append(ifname)
if 'sub-interfaces' in iface:
for subid, sub_iface in iface['sub-interfaces'].items():
if "sub-interfaces" in iface:
for subid, sub_iface in iface["sub-interfaces"].items():
sub_ifname = f"{ifname}.{int(subid)}"
if 'l2xc' in sub_iface:
if "l2xc" in sub_iface:
ret.append(sub_ifname)
return ret
def is_l2xc_interface(yaml, ifname):
""" Returns True if this interface has an L2 CrossConnect """
"""Returns True if this interface has an L2 CrossConnect"""
return ifname in get_l2xc_interfaces(yaml)
def get_l2xc_target_interfaces(yaml):
""" Returns a list of all interfaces that are the target of an L2 CrossConnect """
"""Returns a list of all interfaces that are the target of an L2 CrossConnect"""
ret = []
if 'interfaces' in yaml:
for ifname, iface in yaml['interfaces'].items():
if 'l2xc' in iface:
ret.append(iface['l2xc'])
if 'sub-interfaces' in iface:
for subid, sub_iface in iface['sub-interfaces'].items():
if 'l2xc' in sub_iface:
ret.append(sub_iface['l2xc'])
if "interfaces" in yaml:
for ifname, iface in yaml["interfaces"].items():
if "l2xc" in iface:
ret.append(iface["l2xc"])
if "sub-interfaces" in iface:
for subid, sub_iface in iface["sub-interfaces"].items():
if "l2xc" in sub_iface:
ret.append(sub_iface["l2xc"])
return ret
def is_l2xc_target_interface(yaml, ifname):
""" Returns True if this interface is the target of an L2 CrossConnect """
"""Returns True if this interface is the target of an L2 CrossConnect"""
return ifname in get_l2xc_target_interfaces(yaml)
def is_l2xc_target_interface_unique(yaml, ifname):
""" Returns True if this interface is referenced as an l2xc target zero or one times """
"""Returns True if this interface is referenced as an l2xc target zero or one times"""
ifs = get_l2xc_target_interfaces(yaml)
return ifs.count(ifname) < 2
def has_lcp(yaml, ifname):
""" Returns True if this interface or sub-interface has an LCP """
"""Returns True if this interface or sub-interface has an LCP"""
ifname, iface = get_by_name(yaml, ifname)
if not iface:
return False
return 'lcp' in iface
return "lcp" in iface
def valid_encapsulation(yaml, ifname):
""" Returns True if the sub interface has a valid encapsulation, or
none at all """
"""Returns True if the sub interface has a valid encapsulation, or
none at all"""
ifname, iface = get_by_name(yaml, ifname)
if not iface:
return True
if not 'encapsulation' in iface:
if not "encapsulation" in iface:
return True
encap = iface['encapsulation']
if 'dot1ad' in encap and 'dot1q' in encap:
encap = iface["encapsulation"]
if "dot1ad" in encap and "dot1q" in encap:
return False
if 'inner-dot1q' in encap and not ('dot1ad' in encap or 'dot1q' in encap):
if "inner-dot1q" in encap and not ("dot1ad" in encap or "dot1q" in encap):
return False
if 'exact-match' in encap and encap['exact-match'] == False and has_lcp(yaml, ifname):
if (
"exact-match" in encap
and encap["exact-match"] == False
and has_lcp(yaml, ifname)
):
return False
return True
def get_encapsulation(yaml, ifname):
""" Returns the encapsulation of an interface name as a fully formed dictionary:
"""Returns the encapsulation of an interface name as a fully formed dictionary:
dot1q: int (default 0)
dot1ad: int (default 0)
@ -225,49 +230,49 @@ def get_encapsulation(yaml, ifname):
parent_ifname, parent_iface = get_parent_by_name(yaml, ifname)
if not iface or not parent_iface:
return None
parent_ifname, subid = ifname.split('.')
parent_ifname, subid = ifname.split(".")
dot1q = 0
dot1ad = 0
inner_dot1q = 0
exact_match = False
if not 'encapsulation' in iface:
if not "encapsulation" in iface:
dot1q = int(subid)
exact_match = True
else:
if 'dot1q' in iface['encapsulation']:
dot1q = iface['encapsulation']['dot1q']
elif 'dot1ad' in iface['encapsulation']:
dot1ad = iface['encapsulation']['dot1ad']
if 'inner-dot1q' in iface['encapsulation']:
inner_dot1q = iface['encapsulation']['inner-dot1q']
if 'exact-match' in iface['encapsulation']:
exact_match = iface['encapsulation']['exact-match']
if "dot1q" in iface["encapsulation"]:
dot1q = iface["encapsulation"]["dot1q"]
elif "dot1ad" in iface["encapsulation"]:
dot1ad = iface["encapsulation"]["dot1ad"]
if "inner-dot1q" in iface["encapsulation"]:
inner_dot1q = iface["encapsulation"]["inner-dot1q"]
if "exact-match" in iface["encapsulation"]:
exact_match = iface["encapsulation"]["exact-match"]
return {
"dot1q": int(dot1q),
"dot1ad": int(dot1ad),
"inner-dot1q": int(inner_dot1q),
"exact-match": bool(exact_match)
"exact-match": bool(exact_match),
}
def get_phys(yaml):
""" Return a list of all toplevel (ie. non-sub) interfaces which are
"""Return a list of all toplevel (ie. non-sub) interfaces which are
assumed to be physical network cards, eg TenGigabitEthernet1/0/0. Note
that derived/created interfaces such as Tunnels, BondEthernets and
Loopbacks are not returned """
Loopbacks are not returned"""
ret = []
if not 'interfaces' in yaml:
if not "interfaces" in yaml:
return ret
for ifname, iface in yaml['interfaces'].items():
for ifname, iface in yaml["interfaces"].items():
if is_phy(yaml, ifname):
ret.append(ifname)
return ret
def is_phy(yaml, ifname):
""" Returns True if the ifname is the name of a physical network interface. """
"""Returns True if the ifname is the name of a physical network interface."""
ifname, iface = get_by_name(yaml, ifname)
if iface == None:
@ -287,29 +292,30 @@ def is_phy(yaml, ifname):
def get_interfaces(yaml):
""" Return a list of all interface and sub-interface names """
"""Return a list of all interface and sub-interface names"""
ret = []
if not 'interfaces' in yaml:
if not "interfaces" in yaml:
return ret
for ifname, iface in yaml['interfaces'].items():
for ifname, iface in yaml["interfaces"].items():
ret.append(ifname)
if not 'sub-interfaces' in iface:
if not "sub-interfaces" in iface:
continue
for subid, sub_iface in iface['sub-interfaces'].items():
for subid, sub_iface in iface["sub-interfaces"].items():
ret.append(f"{ifname}.{int(subid)}")
return ret
def get_sub_interfaces(yaml):
""" Return all interfaces which are a subinterface. """
"""Return all interfaces which are a subinterface."""
ret = []
for ifname in get_interfaces(yaml):
if is_sub(yaml, ifname):
ret.append(ifname)
return ret
def get_qinx_interfaces(yaml):
""" Return all interfaces which are double-tagged, either QinAD or QinQ.
"""Return all interfaces which are double-tagged, either QinAD or QinQ.
These interfaces will always have a valid encapsulation with 'inner-dot1q'
set to non-zero.
@ -322,19 +328,19 @@ def get_qinx_interfaces(yaml):
encap = get_encapsulation(yaml, ifname)
if not encap:
continue
if encap['inner-dot1q'] > 0:
if encap["inner-dot1q"] > 0:
ret.append(ifname)
return ret
def is_qinx(yaml, ifname):
""" Returns True if the interface is a double-tagged (QinQ or QinAD) interface """
"""Returns True if the interface is a double-tagged (QinQ or QinAD) interface"""
return ifname in get_qinx_interfaces(yaml)
def unique_encapsulation(yaml, sub_ifname):
""" Ensures that for the sub_ifname specified, there exist no other sub-ints on the
parent with the same encapsulation. """
"""Ensures that for the sub_ifname specified, there exist no other sub-ints on the
parent with the same encapsulation."""
new_ifname, iface = get_by_name(yaml, sub_ifname)
parent_ifname, parent_iface = get_parent_by_name(yaml, new_ifname)
if not iface or not parent_iface:
@ -345,7 +351,7 @@ def unique_encapsulation(yaml, sub_ifname):
return False
ncount = 0
for subid, sibling_iface in parent_iface['sub-interfaces'].items():
for subid, sibling_iface in parent_iface["sub-interfaces"].items():
sibling_ifname = f"{parent_ifname}.{int(subid)}"
sibling_encap = get_encapsulation(yaml, sibling_ifname)
if sub_encap == sibling_encap and new_ifname != sibling_ifname:
@ -355,7 +361,7 @@ def unique_encapsulation(yaml, sub_ifname):
def is_l2(yaml, ifname):
""" Returns True if the interface is an L2XC source, L2XC target or a member of a bridgedomain """
"""Returns True if the interface is an L2XC source, L2XC target or a member of a bridgedomain"""
if bridgedomain.is_bridge_interface(yaml, ifname):
return True
if is_l2xc_interface(yaml, ifname):
@ -366,22 +372,23 @@ def is_l2(yaml, ifname):
def is_l3(yaml, ifname):
""" Returns True if the interface exists and is neither l2xc target nor bridgedomain """
"""Returns True if the interface exists and is neither l2xc target nor bridgedomain"""
return not is_l2(yaml, ifname)
def get_lcp(yaml, ifname):
""" Returns the LCP of the interface. If the interface is a sub-interface with L3
"""Returns the LCP of the interface. If the interface is a sub-interface with L3
enabled, synthesize it based on its parent, using smart QinQ syntax.
Return None if no LCP can be found. """
Return None if no LCP can be found."""
ifname, iface = get_by_name(yaml, ifname)
if iface and 'lcp' in iface:
return iface['lcp']
if iface and "lcp" in iface:
return iface["lcp"]
return None
def get_mtu(yaml, ifname):
""" Returns MTU of the interface. If it's not set, return the parent's MTU, and
"""Returns MTU of the interface. If it's not set, return the parent's MTU, and
return 1500 if no MTU was set on the sub-int or the parent."""
ifname, iface = get_by_name(yaml, ifname)
if not iface:
@ -390,61 +397,70 @@ def get_mtu(yaml, ifname):
parent_ifname, parent_iface = get_parent_by_name(yaml, ifname)
try:
return iface['mtu']
return parent_iface['mtu']
return iface["mtu"]
return parent_iface["mtu"]
except:
pass
return 1500
def get_admin_state(yaml, ifname):
""" Return True if the interface admin state should be 'up'. Return False
if it does not exist, or if it's set to 'down'. """
"""Return True if the interface admin state should be 'up'. Return False
if it does not exist, or if it's set to 'down'."""
ifname, iface = get_by_name(yaml, ifname)
if not iface:
return False
if not 'state' in iface:
if not "state" in iface:
return True
return iface['state'] == 'up'
return iface["state"] == "up"
def validate_interfaces(yaml):
result = True
msgs = []
logger = logging.getLogger('vppcfg.config')
logger = logging.getLogger("vppcfg.config")
logger.addHandler(logging.NullHandler())
if not 'interfaces' in yaml:
if not "interfaces" in yaml:
return result, msgs
for ifname, iface in yaml['interfaces'].items():
for ifname, iface in yaml["interfaces"].items():
logger.debug(f"interface {iface}")
if ifname.startswith("BondEthernet") and (None,None) == bondethernet.get_by_name(yaml, ifname):
if ifname.startswith("BondEthernet") and (
None,
None,
) == bondethernet.get_by_name(yaml, ifname):
msgs.append(f"interface {ifname} does not exist in bondethernets")
result = False
if ifname.startswith("BondEthernet") and 'mac' in iface:
msgs.append(f"interface {ifname} is a member of bondethernet, cannot set MAC")
if ifname.startswith("BondEthernet") and "mac" in iface:
msgs.append(
f"interface {ifname} is a member of bondethernet, cannot set MAC"
)
result = False
if not 'state' in iface:
iface['state'] = 'up'
if not "state" in iface:
iface["state"] = "up"
if 'mac' in iface and mac.is_multicast(iface['mac']):
msgs.append(f"interface {ifname} MAC address {iface['mac']} cannot be multicast")
if "mac" in iface and mac.is_multicast(iface["mac"]):
msgs.append(
f"interface {ifname} MAC address {iface['mac']} cannot be multicast"
)
result = False
iface_mtu = get_mtu(yaml, ifname)
iface_lcp = get_lcp(yaml, ifname)
iface_address = has_address(yaml, ifname)
if ifname.startswith('tap'):
if ifname.startswith("tap"):
tap_ifname, tap_iface = tap.get_by_name(yaml, ifname)
if not tap_iface:
msgs.append(f"interface {ifname} is a TAP but does not exist in taps")
result = False
elif 'mtu' in tap_iface['host']:
host_mtu = tap_iface['host']['mtu']
elif "mtu" in tap_iface["host"]:
host_mtu = tap_iface["host"]["mtu"]
if host_mtu != iface_mtu:
msgs.append(f"interface {ifname} is a TAP so its MTU {int(iface_mtu)} must match host MTU {int(host_mtu)}")
msgs.append(
f"interface {ifname} is a TAP so its MTU {int(iface_mtu)} must match host MTU {int(host_mtu)}"
)
result = False
if iface_address:
msgs.append(f"interface {ifname} is a TAP so it cannot have an address")
@ -453,28 +469,38 @@ def validate_interfaces(yaml):
msgs.append(f"interface {ifname} is a TAP so it cannot have an LCP")
result = False
if has_sub(yaml, ifname):
msgs.append(f"interface {ifname} is a TAP so it cannot have sub-interfaces")
msgs.append(
f"interface {ifname} is a TAP so it cannot have sub-interfaces"
)
result = False
if is_l2(yaml, ifname) and iface_lcp:
msgs.append(f"interface {ifname} is in L2 mode but has LCP name {iface_lcp}")
msgs.append(
f"interface {ifname} is in L2 mode but has LCP name {iface_lcp}"
)
result = False
if is_l2(yaml, ifname) and iface_address:
msgs.append(f"interface {ifname} is in L2 mode but has an address")
result = False
if iface_lcp and not lcp.is_unique(yaml, iface_lcp):
msgs.append(f"interface {ifname} does not have a unique LCP name {iface_lcp}")
msgs.append(
f"interface {ifname} does not have a unique LCP name {iface_lcp}"
)
result = False
if 'addresses' in iface:
for a in iface['addresses']:
if not address.is_allowed(yaml, ifname, iface['addresses'], a):
msgs.append(f"interface {ifname} IP address {a} conflicts with another")
if "addresses" in iface:
for a in iface["addresses"]:
if not address.is_allowed(yaml, ifname, iface["addresses"], a):
msgs.append(
f"interface {ifname} IP address {a} conflicts with another"
)
result = False
if 'l2xc' in iface:
if "l2xc" in iface:
if has_sub(yaml, ifname):
msgs.append(f"interface {ifname} has l2xc so it cannot have sub-interfaces")
msgs.append(
f"interface {ifname} has l2xc so it cannot have sub-interfaces"
)
result = False
if iface_lcp:
msgs.append(f"interface {ifname} has l2xc so it cannot have an LCP")
@ -482,31 +508,45 @@ def validate_interfaces(yaml):
if iface_address:
msgs.append(f"interface {ifname} has l2xc so it cannot have an address")
result = False
if (None,None) == get_by_name(yaml, iface['l2xc']):
msgs.append(f"interface {ifname} l2xc target {iface['l2xc']} does not exist")
if (None, None) == get_by_name(yaml, iface["l2xc"]):
msgs.append(
f"interface {ifname} l2xc target {iface['l2xc']} does not exist"
)
result = False
if iface['l2xc'] == ifname:
if iface["l2xc"] == ifname:
msgs.append(f"interface {ifname} l2xc target cannot be itself")
result = False
target_mtu = get_mtu(yaml, iface['l2xc'])
target_mtu = get_mtu(yaml, iface["l2xc"])
if target_mtu != iface_mtu:
msgs.append(f"interface {ifname} l2xc target MTU {int(target_mtu)} does not match source MTU {int(iface_mtu)}")
msgs.append(
f"interface {ifname} l2xc target MTU {int(target_mtu)} does not match source MTU {int(iface_mtu)}"
)
result = False
if not is_l2xc_target_interface_unique(yaml, iface['l2xc']):
msgs.append(f"interface {ifname} l2xc target {iface['l2xc']} is not unique")
if not is_l2xc_target_interface_unique(yaml, iface["l2xc"]):
msgs.append(
f"interface {ifname} l2xc target {iface['l2xc']} is not unique"
)
result = False
if bridgedomain.is_bridge_interface(yaml, iface['l2xc']):
msgs.append(f"interface {ifname} l2xc target {iface['l2xc']} is in a bridgedomain")
if bridgedomain.is_bridge_interface(yaml, iface["l2xc"]):
msgs.append(
f"interface {ifname} l2xc target {iface['l2xc']} is in a bridgedomain"
)
result = False
if has_lcp(yaml, iface['l2xc']):
msgs.append(f"interface {ifname} l2xc target {iface['l2xc']} cannot have an LCP")
if has_lcp(yaml, iface["l2xc"]):
msgs.append(
f"interface {ifname} l2xc target {iface['l2xc']} cannot have an LCP"
)
result = False
if has_address(yaml, iface['l2xc']):
msgs.append(f"interface {ifname} l2xc target {iface['l2xc']} cannot have an address")
if has_address(yaml, iface["l2xc"]):
msgs.append(
f"interface {ifname} l2xc target {iface['l2xc']} cannot have an address"
)
result = False
if has_sub(yaml, ifname):
for sub_id, sub_iface in yaml['interfaces'][ifname]['sub-interfaces'].items():
for sub_id, sub_iface in yaml["interfaces"][ifname][
"sub-interfaces"
].items():
logger.debug(f"sub-interface {sub_iface}")
sub_ifname = f"{ifname}.{int(sub_id)}"
if not sub_iface:
@ -514,93 +554,138 @@ def validate_interfaces(yaml):
result = False
continue
if not 'state' in sub_iface:
sub_iface['state'] = 'up'
if sub_iface['state'] == 'up' and iface['state'] == 'down':
msgs.append(f"sub-interface {sub_ifname} cannot be up if parent {ifname} is down")
if not "state" in sub_iface:
sub_iface["state"] = "up"
if sub_iface["state"] == "up" and iface["state"] == "down":
msgs.append(
f"sub-interface {sub_ifname} cannot be up if parent {ifname} is down"
)
result = False
sub_mtu = get_mtu(yaml, sub_ifname)
if sub_mtu > iface_mtu:
msgs.append(f"sub-interface {sub_ifname} has MTU {int(sub_iface['mtu'])} higher than parent {ifname} MTU {int(iface_mtu)}")
msgs.append(
f"sub-interface {sub_ifname} has MTU {int(sub_iface['mtu'])} higher than parent {ifname} MTU {int(iface_mtu)}"
)
result = False
if is_qinx(yaml, sub_ifname):
mid_ifname, mid_iface = get_qinx_parent_by_name(yaml, sub_ifname)
mid_mtu = get_mtu(yaml, mid_ifname)
if sub_mtu > mid_mtu:
msgs.append(f"sub-interface {sub_ifname} has MTU {int(sub_iface['mtu'])} higher than parent {mid_ifname} MTU {int(mid_mtu)}")
msgs.append(
f"sub-interface {sub_ifname} has MTU {int(sub_iface['mtu'])} higher than parent {mid_ifname} MTU {int(mid_mtu)}"
)
result = False
sub_lcp = get_lcp(yaml, sub_ifname)
if is_l2(yaml, sub_ifname) and sub_lcp:
msgs.append(f"sub-interface {sub_ifname} is in L2 mode but has LCP name {sub_lcp}")
msgs.append(
f"sub-interface {sub_ifname} is in L2 mode but has LCP name {sub_lcp}"
)
result = False
if sub_lcp and not lcp.is_unique(yaml, sub_lcp):
msgs.append(f"sub-interface {sub_ifname} does not have a unique LCP name {sub_lcp}")
msgs.append(
f"sub-interface {sub_ifname} does not have a unique LCP name {sub_lcp}"
)
result = False
if sub_lcp and not iface_lcp:
msgs.append(f"sub-interface {sub_ifname} has LCP name {sub_lcp} but {ifname} does not have an LCP")
msgs.append(
f"sub-interface {sub_ifname} has LCP name {sub_lcp} but {ifname} does not have an LCP"
)
result = False
if sub_lcp and is_qinx(yaml, sub_ifname):
mid_ifname, mid_iface = get_qinx_parent_by_name(yaml, sub_ifname)
if not mid_iface:
msgs.append(f"sub-interface {sub_ifname} is QinX and has LCP name {sub_lcp} which requires a parent")
msgs.append(
f"sub-interface {sub_ifname} is QinX and has LCP name {sub_lcp} which requires a parent"
)
result = False
elif not get_lcp(yaml, mid_ifname):
msgs.append(f"sub-interface {sub_ifname} is QinX and has LCP name {sub_lcp} but {mid_ifname} does not have an LCP")
msgs.append(
f"sub-interface {sub_ifname} is QinX and has LCP name {sub_lcp} but {mid_ifname} does not have an LCP"
)
result = False
encap = get_encapsulation(yaml, sub_ifname)
if sub_lcp and (not encap or not encap['exact-match']):
msgs.append(f"sub-interface {sub_ifname} has LCP name {sub_lcp} but its encapsulation is not exact-match")
if sub_lcp and (not encap or not encap["exact-match"]):
msgs.append(
f"sub-interface {sub_ifname} has LCP name {sub_lcp} but its encapsulation is not exact-match"
)
result = False
if has_address(yaml, sub_ifname):
if not encap or not encap['exact-match']:
msgs.append(f"sub-interface {sub_ifname} has an address but its encapsulation is not exact-match")
if not encap or not encap["exact-match"]:
msgs.append(
f"sub-interface {sub_ifname} has an address but its encapsulation is not exact-match"
)
result = False
if is_l2(yaml, sub_ifname):
msgs.append(f"sub-interface {sub_ifname} is in L2 mode but has an address")
msgs.append(
f"sub-interface {sub_ifname} is in L2 mode but has an address"
)
result = False
for a in sub_iface['addresses']:
if not address.is_allowed(yaml, sub_ifname, sub_iface['addresses'], a):
msgs.append(f"sub-interface {sub_ifname} IP address {a} conflicts with another")
for a in sub_iface["addresses"]:
if not address.is_allowed(
yaml, sub_ifname, sub_iface["addresses"], a
):
msgs.append(
f"sub-interface {sub_ifname} IP address {a} conflicts with another"
)
result = False
if not valid_encapsulation(yaml, sub_ifname):
msgs.append(f"sub-interface {sub_ifname} has invalid encapsulation")
result = False
elif not unique_encapsulation(yaml, sub_ifname):
msgs.append(f"sub-interface {sub_ifname} does not have unique encapsulation")
msgs.append(
f"sub-interface {sub_ifname} does not have unique encapsulation"
)
result = False
if 'l2xc' in sub_iface:
if "l2xc" in sub_iface:
if has_lcp(yaml, sub_ifname):
msgs.append(f"sub-interface {sub_ifname} has l2xc so it cannot have an LCP")
msgs.append(
f"sub-interface {sub_ifname} has l2xc so it cannot have an LCP"
)
result = False
if has_address(yaml, sub_ifname):
msgs.append(f"sub-interface {sub_ifname} has l2xc so it cannot have an address")
msgs.append(
f"sub-interface {sub_ifname} has l2xc so it cannot have an address"
)
result = False
if (None, None) == get_by_name(yaml, sub_iface['l2xc']):
msgs.append(f"sub-interface {sub_ifname} l2xc target {sub_iface['l2xc']} does not exist")
if (None, None) == get_by_name(yaml, sub_iface["l2xc"]):
msgs.append(
f"sub-interface {sub_ifname} l2xc target {sub_iface['l2xc']} does not exist"
)
result = False
if sub_iface['l2xc'] == sub_ifname:
msgs.append(f"sub-interface {sub_ifname} l2xc target cannot be itself")
if sub_iface["l2xc"] == sub_ifname:
msgs.append(
f"sub-interface {sub_ifname} l2xc target cannot be itself"
)
result = False
target_mtu = get_mtu(yaml, sub_iface['l2xc'])
target_mtu = get_mtu(yaml, sub_iface["l2xc"])
if target_mtu != sub_mtu:
msgs.append(f"sub-interface {ifname} l2xc target MTU {int(target_mtu)} does not match source MTU {int(sub_mtu)}")
msgs.append(
f"sub-interface {ifname} l2xc target MTU {int(target_mtu)} does not match source MTU {int(sub_mtu)}"
)
result = False
if not is_l2xc_target_interface_unique(yaml, sub_iface['l2xc']):
msgs.append(f"sub-interface {sub_ifname} l2xc target {sub_iface['l2xc']} is not unique")
if not is_l2xc_target_interface_unique(yaml, sub_iface["l2xc"]):
msgs.append(
f"sub-interface {sub_ifname} l2xc target {sub_iface['l2xc']} is not unique"
)
result = False
if bridgedomain.is_bridge_interface(yaml, sub_iface['l2xc']):
msgs.append(f"sub-interface {sub_ifname} l2xc target {sub_iface['l2xc']} is in a bridgedomain")
if bridgedomain.is_bridge_interface(yaml, sub_iface["l2xc"]):
msgs.append(
f"sub-interface {sub_ifname} l2xc target {sub_iface['l2xc']} is in a bridgedomain"
)
result = False
if has_lcp(yaml, sub_iface['l2xc']):
msgs.append(f"sub-interface {sub_ifname} l2xc target {sub_iface['l2xc']} cannot have an LCP")
if has_lcp(yaml, sub_iface["l2xc"]):
msgs.append(
f"sub-interface {sub_ifname} l2xc target {sub_iface['l2xc']} cannot have an LCP"
)
result = False
if has_address(yaml, sub_iface['l2xc']):
msgs.append(f"sub-interface {sub_ifname} l2xc target {sub_iface['l2xc']} cannot have an address")
if has_address(yaml, sub_iface["l2xc"]):
msgs.append(
f"sub-interface {sub_ifname} l2xc target {sub_iface['l2xc']} cannot have an address"
)
result = False
return result, msgs

View File

@ -13,33 +13,35 @@
#
import logging
def get_lcps(yaml, interfaces=True, loopbacks=True, bridgedomains=True):
""" Returns a list of LCPs configured in the system. Optionally (de)select the different
types of LCP. Return an empty list if there are none of the given type(s). """
"""Returns a list of LCPs configured in the system. Optionally (de)select the different
types of LCP. Return an empty list if there are none of the given type(s)."""
ret = []
if interfaces and 'interfaces' in yaml:
for ifname, iface in yaml['interfaces'].items():
if 'lcp' in iface:
ret.append(iface['lcp'])
if 'sub-interfaces' in iface:
for subid, sub_iface in iface['sub-interfaces'].items():
if 'lcp' in sub_iface:
ret.append(sub_iface['lcp'])
if interfaces and "interfaces" in yaml:
for ifname, iface in yaml["interfaces"].items():
if "lcp" in iface:
ret.append(iface["lcp"])
if "sub-interfaces" in iface:
for subid, sub_iface in iface["sub-interfaces"].items():
if "lcp" in sub_iface:
ret.append(sub_iface["lcp"])
if loopbacks and 'loopbacks' in yaml:
for ifname, iface in yaml['loopbacks'].items():
if 'lcp' in iface:
ret.append(iface['lcp'])
if bridgedomains and 'bridgedomains' in yaml:
for ifname, iface in yaml['bridgedomains'].items():
if 'lcp' in iface:
ret.append(iface['lcp'])
if loopbacks and "loopbacks" in yaml:
for ifname, iface in yaml["loopbacks"].items():
if "lcp" in iface:
ret.append(iface["lcp"])
if bridgedomains and "bridgedomains" in yaml:
for ifname, iface in yaml["bridgedomains"].items():
if "lcp" in iface:
ret.append(iface["lcp"])
return ret
def is_unique(yaml, lcpname):
""" Returns True if there is at most one occurence of the LCP name in the entire config."""
"""Returns True if there is at most one occurence of the LCP name in the entire config."""
lcps = get_lcps(yaml)
return lcps.count(lcpname) < 2

View File

@ -16,37 +16,38 @@ import config.lcp as lcp
import config.address as address
import config.mac as mac
def get_loopbacks(yaml):
""" Return a list of all loopbacks. """
"""Return a list of all loopbacks."""
ret = []
if 'loopbacks' in yaml:
for ifname, iface in yaml['loopbacks'].items():
if "loopbacks" in yaml:
for ifname, iface in yaml["loopbacks"].items():
ret.append(ifname)
return ret
def get_by_lcp_name(yaml, lcpname):
""" Returns the loopback by a given lcp name, or None,None if it does not exist """
if not 'loopbacks' in yaml:
return None,None
for ifname, iface in yaml['loopbacks'].items():
if 'lcp' in iface and iface['lcp'] == lcpname:
"""Returns the loopback by a given lcp name, or None,None if it does not exist"""
if not "loopbacks" in yaml:
return None, None
for ifname, iface in yaml["loopbacks"].items():
if "lcp" in iface and iface["lcp"] == lcpname:
return ifname, iface
return None,None
return None, None
def get_by_name(yaml, ifname):
""" Return the loopback by name, if it exists. Return None otherwise. """
"""Return the loopback by name, if it exists. Return None otherwise."""
try:
if ifname in yaml['loopbacks']:
return ifname, yaml['loopbacks'][ifname]
if ifname in yaml["loopbacks"]:
return ifname, yaml["loopbacks"][ifname]
except:
pass
return None, None
def is_loopback(yaml, ifname):
""" Returns True if the interface name is an existing loopback. """
"""Returns True if the interface name is an existing loopback."""
ifname, iface = get_by_name(yaml, ifname)
return not iface == None
@ -54,28 +55,36 @@ def is_loopback(yaml, ifname):
def validate_loopbacks(yaml):
result = True
msgs = []
logger = logging.getLogger('vppcfg.config')
logger = logging.getLogger("vppcfg.config")
logger.addHandler(logging.NullHandler())
if not 'loopbacks' in yaml:
if not "loopbacks" in yaml:
return result, msgs
for ifname, iface in yaml['loopbacks'].items():
for ifname, iface in yaml["loopbacks"].items():
logger.debug(f"loopback {iface}")
instance = int(ifname[4:])
if instance > 4095:
msgs.append(f"loopback {ifname} has instance {int(instance)} which is too large")
msgs.append(
f"loopback {ifname} has instance {int(instance)} which is too large"
)
result = False
if 'lcp' in iface and not lcp.is_unique(yaml, iface['lcp']):
msgs.append(f"loopback {ifname} does not have a unique LCP name {iface['lcp']}")
if "lcp" in iface and not lcp.is_unique(yaml, iface["lcp"]):
msgs.append(
f"loopback {ifname} does not have a unique LCP name {iface['lcp']}"
)
result = False
if 'addresses' in iface:
for a in iface['addresses']:
if not address.is_allowed(yaml, ifname, iface['addresses'], a):
msgs.append(f"loopback {ifname} IP address {a} conflicts with another")
if "addresses" in iface:
for a in iface["addresses"]:
if not address.is_allowed(yaml, ifname, iface["addresses"], a):
msgs.append(
f"loopback {ifname} IP address {a} conflicts with another"
)
result = False
if 'mac' in iface and mac.is_multicast(iface['mac']):
msgs.append(f"loopback {ifname} MAC address {iface['mac']} cannot be multicast")
if "mac" in iface and mac.is_multicast(iface["mac"]):
msgs.append(
f"loopback {ifname} MAC address {iface['mac']} cannot be multicast"
)
result = False
return result, msgs

View File

@ -14,33 +14,37 @@
import logging
import netaddr
def is_valid(mac):
""" Return True if the string given in `mac` is a valid (6-byte) MAC address,
as defined by netaddr.EUI """
"""Return True if the string given in `mac` is a valid (6-byte) MAC address,
as defined by netaddr.EUI"""
try:
addr = netaddr.EUI(mac)
except:
return False
return True
def is_local(mac):
""" Return True if a MAC address is a valid locally administered one. """
"""Return True if a MAC address is a valid locally administered one."""
try:
addr = netaddr.EUI(mac)
except:
return False
return bool(addr.words[0] & 0b10)
def is_multicast(mac):
""" Return True if a MAC address is a valid multicast one. """
"""Return True if a MAC address is a valid multicast one."""
try:
addr = netaddr.EUI(mac)
except:
return False
return bool(addr.words[0] & 0b01)
def is_unicast(mac):
""" Return True if a MAC address is a valid unicast one. """
"""Return True if a MAC address is a valid unicast one."""
try:
addr = netaddr.EUI(mac)
except:

View File

@ -14,27 +14,28 @@
import logging
import config.mac as mac
def get_taps(yaml):
""" Return a list of all taps. """
"""Return a list of all taps."""
ret = []
if 'taps' in yaml:
for ifname, iface in yaml['taps'].items():
if "taps" in yaml:
for ifname, iface in yaml["taps"].items():
ret.append(ifname)
return ret
def get_by_name(yaml, ifname):
""" Return the tap by name, if it exists. Return None otherwise. """
"""Return the tap by name, if it exists. Return None otherwise."""
try:
if ifname in yaml['taps']:
return ifname, yaml['taps'][ifname]
if ifname in yaml["taps"]:
return ifname, yaml["taps"][ifname]
except:
pass
return None, None
def is_tap(yaml, ifname):
""" Returns True if the interface name is an existing tap in the config.
"""Returns True if the interface name is an existing tap in the config.
The TAP has to be explicitly named in the configuration, and notably
a TAP belonging to a Linux Control Plane (LCP) will return False.
"""
@ -43,25 +44,25 @@ def is_tap(yaml, ifname):
def is_host_name_unique(yaml, hostname):
""" Returns True if there is at most one occurence of the given ifname amonst all host-names of TAPs. """
if not 'taps' in yaml:
"""Returns True if there is at most one occurence of the given ifname amonst all host-names of TAPs."""
if not "taps" in yaml:
return True
host_names = []
for tap_ifname, tap_iface in yaml['taps'].items():
host_names.append(tap_iface['host']['name'])
for tap_ifname, tap_iface in yaml["taps"].items():
host_names.append(tap_iface["host"]["name"])
return host_names.count(hostname) < 2
def validate_taps(yaml):
result = True
msgs = []
logger = logging.getLogger('vppcfg.config')
logger = logging.getLogger("vppcfg.config")
logger.addHandler(logging.NullHandler())
if not 'taps' in yaml:
if not "taps" in yaml:
return result, msgs
for ifname, iface in yaml['taps'].items():
for ifname, iface in yaml["taps"].items():
logger.debug(f"tap {iface}")
instance = int(ifname[3:])
@ -70,32 +71,46 @@ def validate_taps(yaml):
msgs.append(f"tap {ifname} has instance {int(instance)} which is too large")
result = False
if not is_host_name_unique(yaml, iface['host']['name']):
msgs.append(f"tap {ifname} does not have a unique host name {iface['host']['name']}")
if not is_host_name_unique(yaml, iface["host"]["name"]):
msgs.append(
f"tap {ifname} does not have a unique host name {iface['host']['name']}"
)
result = False
if 'rx-ring-size' in iface:
n = iface['rx-ring-size']
if n & (n-1) != 0:
if "rx-ring-size" in iface:
n = iface["rx-ring-size"]
if n & (n - 1) != 0:
msgs.append(f"tap {ifname} rx-ring-size must be a power of two")
result = False
if 'tx-ring-size' in iface:
n = iface['tx-ring-size']
if n & (n-1) != 0:
if "tx-ring-size" in iface:
n = iface["tx-ring-size"]
if n & (n - 1) != 0:
msgs.append(f"tap {ifname} tx-ring-size must be a power of two")
result = False
if 'namespace-create' in iface['host'] and iface['host']['namespace-create'] and not 'namespace' in iface['host']:
msgs.append(f"tap {ifname} namespace-create can only be set if namespace is set")
if (
"namespace-create" in iface["host"]
and iface["host"]["namespace-create"]
and not "namespace" in iface["host"]
):
msgs.append(
f"tap {ifname} namespace-create can only be set if namespace is set"
)
result = False
if 'bridge-create' in iface['host'] and iface['host']['bridge-create'] and not 'bridge' in iface['host']:
if (
"bridge-create" in iface["host"]
and iface["host"]["bridge-create"]
and not "bridge" in iface["host"]
):
msgs.append(f"tap {ifname} bridge-create can only be set if bridge is set")
result = False
if 'mac' in iface['host'] and mac.is_multicast(iface['host']['mac']):
msgs.append(f"tap {ifname} host MAC address {iface['host']['mac']} cannot be multicast")
if "mac" in iface["host"] and mac.is_multicast(iface["host"]["mac"]):
msgs.append(
f"tap {ifname} host MAC address {iface['host']['mac']} cannot be multicast"
)
result = False
return result, msgs

View File

@ -2,17 +2,18 @@ import unittest
import yaml
import config.bondethernet as bondethernet
class TestBondEthernetMethods(unittest.TestCase):
def setUp(self):
with open("unittest/test_bondethernet.yaml", "r") as f:
self.cfg = yaml.load(f, Loader = yaml.FullLoader)
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
def test_get_by_name(self):
ifname, iface = bondethernet.get_by_name(self.cfg, "BondEthernet0")
self.assertIsNotNone(iface)
self.assertEqual("BondEthernet0", ifname)
self.assertIn("GigabitEthernet1/0/0", iface['interfaces'])
self.assertNotIn("GigabitEthernet2/0/0", iface['interfaces'])
self.assertIn("GigabitEthernet1/0/0", iface["interfaces"])
self.assertNotIn("GigabitEthernet2/0/0", iface["interfaces"])
ifname, iface = bondethernet.get_by_name(self.cfg, "BondEthernet-notexist")
self.assertIsNone(iface)
@ -22,11 +23,15 @@ class TestBondEthernetMethods(unittest.TestCase):
self.assertTrue(bondethernet.is_bond_member(self.cfg, "GigabitEthernet1/0/0"))
self.assertTrue(bondethernet.is_bond_member(self.cfg, "GigabitEthernet1/0/1"))
self.assertFalse(bondethernet.is_bond_member(self.cfg, "GigabitEthernet2/0/0"))
self.assertFalse(bondethernet.is_bond_member(self.cfg, "GigabitEthernet2/0/0.100"))
self.assertFalse(
bondethernet.is_bond_member(self.cfg, "GigabitEthernet2/0/0.100")
)
def test_is_bondethernet(self):
self.assertTrue(bondethernet.is_bondethernet(self.cfg, "BondEthernet0"))
self.assertFalse(bondethernet.is_bondethernet(self.cfg, "BondEthernet-notexist"))
self.assertFalse(
bondethernet.is_bondethernet(self.cfg, "BondEthernet-notexist")
)
self.assertFalse(bondethernet.is_bondethernet(self.cfg, "GigabitEthernet1/0/0"))
def test_enumerators(self):
@ -38,8 +43,8 @@ class TestBondEthernetMethods(unittest.TestCase):
self.assertNotIn("BondEthernet-noexist", ifs)
def test_get_mode(self):
self.assertEqual('lacp', bondethernet.get_mode(self.cfg, "BondEthernet0"))
self.assertEqual('xor', bondethernet.get_mode(self.cfg, "BondEthernet1"))
self.assertEqual("lacp", bondethernet.get_mode(self.cfg, "BondEthernet0"))
self.assertEqual("xor", bondethernet.get_mode(self.cfg, "BondEthernet1"))
def test_mode_to_int(self):
self.assertEqual(1, bondethernet.mode_to_int("round-robin"))
@ -59,8 +64,8 @@ class TestBondEthernetMethods(unittest.TestCase):
self.assertEqual("", bondethernet.int_to_mode(6))
def test_get_lb(self):
self.assertEqual('l34', bondethernet.get_lb(self.cfg, "BondEthernet0"))
self.assertEqual('l2', bondethernet.get_lb(self.cfg, "BondEthernet1"))
self.assertEqual("l34", bondethernet.get_lb(self.cfg, "BondEthernet0"))
self.assertEqual("l2", bondethernet.get_lb(self.cfg, "BondEthernet1"))
self.assertIsNone(bondethernet.get_lb(self.cfg, "BondEthernet2"))
def test_lb_to_int(self):

View File

@ -2,17 +2,18 @@ import unittest
import yaml
import config.bridgedomain as bridgedomain
class TestBridgeDomainMethods(unittest.TestCase):
def setUp(self):
with open("unittest/test_bridgedomain.yaml", "r") as f:
self.cfg = yaml.load(f, Loader = yaml.FullLoader)
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
def test_get_by_name(self):
ifname, iface = bridgedomain.get_by_name(self.cfg, "bd10")
self.assertIsNotNone(iface)
self.assertEqual("bd10", ifname)
self.assertEqual(iface['mtu'], 3000)
self.assertIn("BondEthernet0", iface['interfaces'])
self.assertEqual(iface["mtu"], 3000)
self.assertIn("BondEthernet0", iface["interfaces"])
ifname, iface = bridgedomain.get_by_name(self.cfg, "bd-notexist")
self.assertIsNone(iface)
@ -25,14 +26,28 @@ class TestBridgeDomainMethods(unittest.TestCase):
self.assertFalse(bridgedomain.is_bridgedomain(self.cfg, "GigabitEthernet1/0/0"))
def test_members(self):
self.assertTrue(bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet1/0/0"))
self.assertTrue(bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet2/0/0.100"))
self.assertFalse(bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet3/0/0"))
self.assertFalse(bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet3/0/0.100"))
self.assertTrue(
bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet1/0/0")
)
self.assertTrue(
bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet2/0/0.100")
)
self.assertFalse(
bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet3/0/0")
)
self.assertFalse(
bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet3/0/0.100")
)
def test_unique(self):
self.assertFalse(bridgedomain.is_bridge_interface_unique(self.cfg, "GigabitEthernet1/0/0"))
self.assertTrue(bridgedomain.is_bridge_interface_unique(self.cfg, "GigabitEthernet2/0/0.100"))
self.assertFalse(
bridgedomain.is_bridge_interface_unique(self.cfg, "GigabitEthernet1/0/0")
)
self.assertTrue(
bridgedomain.is_bridge_interface_unique(
self.cfg, "GigabitEthernet2/0/0.100"
)
)
def test_enumerators(self):
ifs = bridgedomain.get_bridge_interfaces(self.cfg)
@ -55,13 +70,13 @@ class TestBridgeDomainMethods(unittest.TestCase):
self.assertIsNone(settings)
settings = bridgedomain.get_settings(self.cfg, "bd10")
self.assertTrue(settings['learn'])
self.assertTrue(settings['unknown-unicast-flood'])
self.assertTrue(settings['unicast-flood'])
self.assertEqual(settings['mac-age-minutes'], 0)
self.assertTrue(settings["learn"])
self.assertTrue(settings["unknown-unicast-flood"])
self.assertTrue(settings["unicast-flood"])
self.assertEqual(settings["mac-age-minutes"], 0)
settings = bridgedomain.get_settings(self.cfg, "bd11")
self.assertTrue(settings['learn'])
self.assertFalse(settings['unknown-unicast-flood'])
self.assertFalse(settings['unicast-flood'])
self.assertEqual(settings['mac-age-minutes'], 10)
self.assertTrue(settings["learn"])
self.assertFalse(settings["unknown-unicast-flood"])
self.assertFalse(settings["unicast-flood"])
self.assertEqual(settings["mac-age-minutes"], 10)

View File

@ -2,10 +2,11 @@ import unittest
import yaml
import config.interface as interface
class TestInterfaceMethods(unittest.TestCase):
def setUp(self):
with open("unittest/test_interface.yaml", "r") as f:
self.cfg = yaml.load(f, Loader = yaml.FullLoader)
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
def test_enumerators(self):
ifs = interface.get_interfaces(self.cfg)
@ -52,22 +53,42 @@ class TestInterfaceMethods(unittest.TestCase):
self.assertEqual(interface.get_mtu(self.cfg, "GigabitEthernet1/0/1.201"), 1500)
def test_encapsulation(self):
self.assertTrue(interface.valid_encapsulation(self.cfg, "GigabitEthernet1/0/1.200"))
self.assertTrue(interface.unique_encapsulation(self.cfg, "GigabitEthernet1/0/1.200"))
self.assertEqual(interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.200"),
{ 'dot1q': 1000, 'dot1ad': 0, 'inner-dot1q': 0, 'exact-match': False })
self.assertEqual(interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.201"),
{ 'dot1q': 1000, 'dot1ad': 0, 'inner-dot1q': 1234, 'exact-match': False })
self.assertEqual(interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.202"),
{ 'dot1q': 0, 'dot1ad': 1000, 'inner-dot1q': 0, 'exact-match': False })
self.assertEqual(interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.203"),
{ 'dot1q': 0, 'dot1ad': 1000, 'inner-dot1q': 1000, 'exact-match': True })
self.assertTrue(
interface.valid_encapsulation(self.cfg, "GigabitEthernet1/0/1.200")
)
self.assertTrue(
interface.unique_encapsulation(self.cfg, "GigabitEthernet1/0/1.200")
)
self.assertEqual(
interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.200"),
{"dot1q": 1000, "dot1ad": 0, "inner-dot1q": 0, "exact-match": False},
)
self.assertEqual(
interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.201"),
{"dot1q": 1000, "dot1ad": 0, "inner-dot1q": 1234, "exact-match": False},
)
self.assertEqual(
interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.202"),
{"dot1q": 0, "dot1ad": 1000, "inner-dot1q": 0, "exact-match": False},
)
self.assertEqual(
interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.203"),
{"dot1q": 0, "dot1ad": 1000, "inner-dot1q": 1000, "exact-match": True},
)
self.assertFalse(interface.valid_encapsulation(self.cfg, "GigabitEthernet1/0/0.100"))
self.assertFalse(interface.valid_encapsulation(self.cfg, "GigabitEthernet1/0/0.101"))
self.assertFalse(
interface.valid_encapsulation(self.cfg, "GigabitEthernet1/0/0.100")
)
self.assertFalse(
interface.valid_encapsulation(self.cfg, "GigabitEthernet1/0/0.101")
)
self.assertFalse(interface.unique_encapsulation(self.cfg, "GigabitEthernet1/0/0.102"))
self.assertFalse(interface.unique_encapsulation(self.cfg, "GigabitEthernet1/0/0.103"))
self.assertFalse(
interface.unique_encapsulation(self.cfg, "GigabitEthernet1/0/0.102")
)
self.assertFalse(
interface.unique_encapsulation(self.cfg, "GigabitEthernet1/0/0.103")
)
def test_has_sub(self):
self.assertTrue(interface.has_sub(self.cfg, "GigabitEthernet1/0/1"))
@ -97,8 +118,12 @@ class TestInterfaceMethods(unittest.TestCase):
self.assertEqual(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1"), "e1")
self.assertEqual(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.100"), "foo")
self.assertEqual(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.101"), "e1.100")
self.assertEqual(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.102"), "e1.100.100")
self.assertEqual(
interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.101"), "e1.100"
)
self.assertEqual(
interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.102"), "e1.100.100"
)
self.assertIsNone(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.200"))
self.assertIsNone(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.201"))
@ -119,12 +144,16 @@ class TestInterfaceMethods(unittest.TestCase):
self.assertIn("GigabitEthernet3/0/0", l2xc_ifs)
self.assertIn("GigabitEthernet3/0/0", l2xc_target_ifs)
self.assertTrue(interface.is_l2xc_interface(self.cfg, "GigabitEthernet3/0/0"))
self.assertTrue(interface.is_l2xc_target_interface(self.cfg, "GigabitEthernet3/0/0"))
self.assertTrue(
interface.is_l2xc_target_interface(self.cfg, "GigabitEthernet3/0/0")
)
self.assertNotIn("GigabitEthernet2/0/0", l2xc_ifs)
self.assertNotIn("GigabitEthernet2/0/0", l2xc_target_ifs)
self.assertFalse(interface.is_l2xc_interface(self.cfg, "GigabitEthernet2/0/0"))
self.assertFalse(interface.is_l2xc_target_interface(self.cfg, "GigabitEthernet2/0/0"))
self.assertFalse(
interface.is_l2xc_target_interface(self.cfg, "GigabitEthernet2/0/0")
)
def test_l2(self):
self.assertTrue(interface.is_l2(self.cfg, "GigabitEthernet3/0/0"))
@ -151,22 +180,29 @@ class TestInterfaceMethods(unittest.TestCase):
self.assertEqual(ifname, "GigabitEthernet1/0/1.201")
self.assertIsNotNone(iface)
encap = interface.get_encapsulation(self.cfg, ifname)
self.assertEqual(encap, {'dot1q': 1000, 'dot1ad': 0, 'inner-dot1q': 1234, 'exact-match': False})
self.assertEqual(
encap,
{"dot1q": 1000, "dot1ad": 0, "inner-dot1q": 1234, "exact-match": False},
)
ifname, iface = interface.get_by_name(self.cfg, "GigabitEthernet1/0/1.1")
self.assertIsNone(ifname)
self.assertIsNone(iface)
def test_get_parent_by_name(self):
ifname, iface = interface.get_parent_by_name(self.cfg, "GigabitEthernet1/0/1.201")
ifname, iface = interface.get_parent_by_name(
self.cfg, "GigabitEthernet1/0/1.201"
)
self.assertEqual(ifname, "GigabitEthernet1/0/1")
self.assertIsNotNone(iface)
self.assertNotIn('encapsulation', iface)
self.assertNotIn("encapsulation", iface)
ifname, iface = interface.get_parent_by_name(self.cfg, "GigabitEthernet1/0/1.200")
ifname, iface = interface.get_parent_by_name(
self.cfg, "GigabitEthernet1/0/1.200"
)
self.assertEqual(ifname, "GigabitEthernet1/0/1")
self.assertIsNotNone(iface)
self.assertNotIn('encapsulation', iface)
self.assertNotIn("encapsulation", iface)
ifname, iface = interface.get_parent_by_name(self.cfg, "GigabitEthernet1/0/1")
self.assertIsNone(ifname)
@ -177,22 +213,34 @@ class TestInterfaceMethods(unittest.TestCase):
self.assertIsNone(iface)
def test_get_qinx_parent_by_name(self):
self.assertIsNotNone(interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.202"))
self.assertIsNotNone(interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.203"))
self.assertIsNotNone(
interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.202")
)
self.assertIsNotNone(
interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.203")
)
ifname, iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1")
ifname, iface = interface.get_qinx_parent_by_name(
self.cfg, "GigabitEthernet1/0/1"
)
self.assertIsNone(iface)
self.assertIsNone(ifname)
ifname, iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.100")
ifname, iface = interface.get_qinx_parent_by_name(
self.cfg, "GigabitEthernet1/0/1.100"
)
self.assertIsNone(iface)
self.assertIsNone(ifname)
ifname, iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.200")
ifname, iface = interface.get_qinx_parent_by_name(
self.cfg, "GigabitEthernet1/0/1.200"
)
self.assertIsNone(iface)
self.assertIsNone(ifname)
ifname, iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.201")
ifname, iface = interface.get_qinx_parent_by_name(
self.cfg, "GigabitEthernet1/0/1.201"
)
self.assertEqual(ifname, "GigabitEthernet1/0/1.200")
def test_get_phys(self):
@ -211,4 +259,6 @@ class TestInterfaceMethods(unittest.TestCase):
self.assertFalse(interface.get_admin_state(self.cfg, "GigabitEthernet2/0/0"))
self.assertTrue(interface.get_admin_state(self.cfg, "GigabitEthernet1/0/0"))
self.assertTrue(interface.get_admin_state(self.cfg, "GigabitEthernet1/0/0.101"))
self.assertFalse(interface.get_admin_state(self.cfg, "GigabitEthernet1/0/0.102"))
self.assertFalse(
interface.get_admin_state(self.cfg, "GigabitEthernet1/0/0.102")
)

View File

@ -3,10 +3,11 @@ import yaml
import config.lcp as lcp
import config.interface as interface
class TestLCPMethods(unittest.TestCase):
def setUp(self):
with open("unittest/test_lcp.yaml", "r") as f:
self.cfg = yaml.load(f, Loader = yaml.FullLoader)
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
def test_enumerators(self):
lcps = lcp.get_lcps(self.cfg)
@ -26,22 +27,36 @@ class TestLCPMethods(unittest.TestCase):
self.assertFalse(lcp.is_unique(self.cfg, "thrice"))
def test_qinx(self):
qinx_ifname, qinx_iface = interface.get_by_name(self.cfg, "GigabitEthernet1/0/1.201")
mid_ifname, mid_iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.201")
parent_ifname, parent_iface = interface.get_parent_by_name(self.cfg, "GigabitEthernet1/0/1.201")
qinx_ifname, qinx_iface = interface.get_by_name(
self.cfg, "GigabitEthernet1/0/1.201"
)
mid_ifname, mid_iface = interface.get_qinx_parent_by_name(
self.cfg, "GigabitEthernet1/0/1.201"
)
parent_ifname, parent_iface = interface.get_parent_by_name(
self.cfg, "GigabitEthernet1/0/1.201"
)
self.assertEqual(qinx_ifname, "GigabitEthernet1/0/1.201")
self.assertEqual(mid_ifname, "GigabitEthernet1/0/1.200")
self.assertEqual(parent_ifname, "GigabitEthernet1/0/1")
qinx_ifname, qinx_iface = interface.get_by_name(self.cfg, "GigabitEthernet1/0/1.201")
mid_ifname, mid_iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.201")
parent_ifname, parent_iface = interface.get_parent_by_name(self.cfg, "GigabitEthernet1/0/1.201")
qinx_ifname, qinx_iface = interface.get_by_name(
self.cfg, "GigabitEthernet1/0/1.201"
)
mid_ifname, mid_iface = interface.get_qinx_parent_by_name(
self.cfg, "GigabitEthernet1/0/1.201"
)
parent_ifname, parent_iface = interface.get_parent_by_name(
self.cfg, "GigabitEthernet1/0/1.201"
)
self.assertEqual(qinx_ifname, "GigabitEthernet1/0/1.201")
self.assertEqual(mid_ifname, "GigabitEthernet1/0/1.200")
self.assertEqual(parent_ifname, "GigabitEthernet1/0/1")
ifname, iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.100")
ifname, iface = interface.get_qinx_parent_by_name(
self.cfg, "GigabitEthernet1/0/1.100"
)
self.assertIsNone(ifname)
self.assertIsNone(iface)

View File

@ -2,10 +2,11 @@ import unittest
import yaml
import config.loopback as loopback
class TestLoopbackMethods(unittest.TestCase):
def setUp(self):
with open("unittest/test_loopback.yaml", "r") as f:
self.cfg = yaml.load(f, Loader = yaml.FullLoader)
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
def test_get_by_lcp_name(self):
ifname, iface = loopback.get_by_lcp_name(self.cfg, "loop56789012345")
@ -20,7 +21,7 @@ class TestLoopbackMethods(unittest.TestCase):
ifname, iface = loopback.get_by_name(self.cfg, "loop1")
self.assertIsNotNone(iface)
self.assertEqual("loop1", ifname)
self.assertEqual(iface['mtu'], 2000)
self.assertEqual(iface["mtu"], 2000)
ifname, iface = loopback.get_by_name(self.cfg, "loop-noexist")
self.assertIsNone(ifname)

View File

@ -1,6 +1,7 @@
import unittest
import config.mac as mac
class TestMACMethods(unittest.TestCase):
def test_is_valid(self):
self.assertTrue(mac.is_valid("00:01:02:03:04:05"))

View File

@ -2,10 +2,11 @@ import unittest
import yaml
import config.tap as tap
class TestTAPMethods(unittest.TestCase):
def setUp(self):
with open("unittest/test_tap.yaml", "r") as f:
self.cfg = yaml.load(f, Loader = yaml.FullLoader)
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
def test_get_by_name(self):
ifname, iface = tap.get_by_name(self.cfg, "tap0")

View File

@ -2,10 +2,11 @@ import unittest
import yaml
import config.vxlan_tunnel as vxlan_tunnel
class TestVXLANMethods(unittest.TestCase):
def setUp(self):
with open("unittest/test_vxlan_tunnel.yaml", "r") as f:
self.cfg = yaml.load(f, Loader = yaml.FullLoader)
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
def test_get_by_name(self):
ifname, iface = vxlan_tunnel.get_by_name(self.cfg, "vxlan_tunnel0")

View File

@ -15,42 +15,43 @@ import logging
import config.interface as interface
import ipaddress
def get_by_name(yaml, ifname):
""" Return the VXLAN by name, if it exists. Return None otherwise. """
"""Return the VXLAN by name, if it exists. Return None otherwise."""
try:
if ifname in yaml['vxlan_tunnels']:
return ifname, yaml['vxlan_tunnels'][ifname]
if ifname in yaml["vxlan_tunnels"]:
return ifname, yaml["vxlan_tunnels"][ifname]
except:
pass
return None, None
def is_vxlan_tunnel(yaml, ifname):
""" Returns True if the interface name is an existing VXLAN Tunnel. """
"""Returns True if the interface name is an existing VXLAN Tunnel."""
ifname, iface = get_by_name(yaml, ifname)
return not iface == None
def vni_unique(yaml, vni):
""" Return True if the VNI is unique amongst all VXLANs """
if not 'vxlan_tunnels' in yaml:
"""Return True if the VNI is unique amongst all VXLANs"""
if not "vxlan_tunnels" in yaml:
return True
ncount = 0
for ifname, iface in yaml['vxlan_tunnels'].items():
if iface['vni'] == vni:
for ifname, iface in yaml["vxlan_tunnels"].items():
if iface["vni"] == vni:
ncount = ncount + 1
return ncount < 2
def get_vxlan_tunnels(yaml):
""" Returns a list of all VXLAN tunnel interface names. """
"""Returns a list of all VXLAN tunnel interface names."""
ret = []
if not 'vxlan_tunnels' in yaml:
if not "vxlan_tunnels" in yaml:
return ret
for ifname, iface in yaml['vxlan_tunnels'].items():
for ifname, iface in yaml["vxlan_tunnels"].items():
ret.append(ifname)
return ret
@ -58,27 +59,31 @@ def get_vxlan_tunnels(yaml):
def validate_vxlan_tunnels(yaml):
result = True
msgs = []
logger = logging.getLogger('vppcfg.config')
logger = logging.getLogger("vppcfg.config")
logger.addHandler(logging.NullHandler())
if not 'vxlan_tunnels' in yaml:
if not "vxlan_tunnels" in yaml:
return result, msgs
for ifname, iface in yaml['vxlan_tunnels'].items():
for ifname, iface in yaml["vxlan_tunnels"].items():
logger.debug(f"vxlan_tunnel {ifname}: {iface}")
instance = int(ifname[12:])
if instance > 2147483647:
msgs.append(f"vxlan_tunnel {ifname} has instance {int(instance)} which is too large")
msgs.append(
f"vxlan_tunnel {ifname} has instance {int(instance)} which is too large"
)
result = False
vni = iface['vni']
vni = iface["vni"]
if not vni_unique(yaml, vni):
msgs.append(f"vxlan_tunnel {ifname} VNI {int(vni)} is not unique")
result = False
local = ipaddress.ip_address(iface['local'])
remote = ipaddress.ip_address(iface['remote'])
local = ipaddress.ip_address(iface["local"])
remote = ipaddress.ip_address(iface["remote"])
if local.version != remote.version:
msgs.append(f"vxlan_tunnel {ifname} local and remote are not the same address family")
msgs.append(
f"vxlan_tunnel {ifname} local and remote are not the same address family"
)
result = False
return result, msgs

View File

@ -27,15 +27,17 @@ except ImportError:
print("ERROR: install argparse manually: sudo pip install argparse")
sys.exit(-2)
def example_validator(yaml):
""" A simple example validator that takes the YAML configuration file as an input,
"""A simple example validator that takes the YAML configuration file as an input,
and returns a tuple of rv (return value, True is success), and a list of string
messages to the validation framework. """
messages to the validation framework."""
rv = True
msgs = []
return rv, msgs
class YAMLTest(unittest.TestCase):
def __init__(self, testName, yaml_filename, yaml_schema):
# calling the super class init varies for different python versions. This works for 2.7
@ -46,19 +48,19 @@ class YAMLTest(unittest.TestCase):
def test_yaml(self):
unittest = None
cfg = None
n=0
n = 0
try:
with open(self.yaml_filename, "r") as f:
for data in yaml.load_all(f, Loader=yaml.Loader):
if n==0:
if n == 0:
unittest = data
n = n + 1
elif n==1:
elif n == 1:
cfg = data
n = n + 1
except:
pass
self.assertEqual(n,2)
self.assertEqual(n, 2)
self.assertIsNotNone(unittest)
if not cfg:
return
@ -68,8 +70,12 @@ class YAMLTest(unittest.TestCase):
msgs_unexpected = 0
msgs_expected = []
if 'test' in unittest and 'errors' in unittest['test'] and 'expected' in unittest['test']['errors']:
msgs_expected = unittest['test']['errors']['expected']
if (
"test" in unittest
and "errors" in unittest["test"]
and "expected" in unittest["test"]["errors"]
):
msgs_expected = unittest["test"]["errors"]["expected"]
fail = False
for m in msgs:
@ -83,11 +89,18 @@ class YAMLTest(unittest.TestCase):
fail = True
count = 0
if 'test' in unittest and 'errors' in unittest['test'] and 'count' in unittest['test']['errors']:
count = unittest['test']['errors']['count']
if (
"test" in unittest
and "errors" in unittest["test"]
and "count" in unittest["test"]["errors"]
):
count = unittest["test"]["errors"]["count"]
if len(msgs) != count:
print(f"{self.yaml_filename}: Unexpected error count {len(msgs)} (expecting {int(count)})", file=sys.stderr)
print(
f"{self.yaml_filename}: Unexpected error count {len(msgs)} (expecting {int(count)})",
file=sys.stderr,
)
self.assertEqual(len(msgs), count)
self.assertFalse(fail)
@ -96,27 +109,63 @@ class YAMLTest(unittest.TestCase):
if __name__ == "__main__":
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('-t', '--test', dest='test', type=str, nargs='+', default=['unittest/yaml/*.yaml'], help="""YAML test file(s)""")
parser.add_argument('-s', '--schema', dest='schema', type=str, default='./schema.yaml', help="""YAML schema validation file""")
parser.add_argument('-d', '--debug', dest='debug', action='store_true', help="""Enable debug, default False""")
parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', help="""Be quiet (only log warnings/errors), default False""")
parser.add_argument(
"-t",
"--test",
dest="test",
type=str,
nargs="+",
default=["unittest/yaml/*.yaml"],
help="""YAML test file(s)""",
)
parser.add_argument(
"-s",
"--schema",
dest="schema",
type=str,
default="./schema.yaml",
help="""YAML schema validation file""",
)
parser.add_argument(
"-d",
"--debug",
dest="debug",
action="store_true",
help="""Enable debug, default False""",
)
parser.add_argument(
"-q",
"--quiet",
dest="quiet",
action="store_true",
help="""Be quiet (only log warnings/errors), default False""",
)
args = parser.parse_args()
if args.debug:
verbosity=2
verbosity = 2
elif args.quiet:
verbosity=0
verbosity = 0
else:
verbosity=1
verbosity = 1
yaml_suite = unittest.TestSuite()
for pattern in args.test:
for fn in glob.glob(pattern):
yaml_suite.addTest(YAMLTest('test_yaml', yaml_filename=fn, yaml_schema=args.schema))
yaml_ok = unittest.TextTestRunner(verbosity=verbosity, buffer=True).run(yaml_suite).wasSuccessful()
yaml_suite.addTest(
YAMLTest("test_yaml", yaml_filename=fn, yaml_schema=args.schema)
)
yaml_ok = (
unittest.TextTestRunner(verbosity=verbosity, buffer=True)
.run(yaml_suite)
.wasSuccessful()
)
tests = unittest.TestLoader().discover(start_dir=".", pattern='test_*.py')
unit_ok = unittest.TextTestRunner(verbosity=verbosity, buffer=True).run(tests).wasSuccessful()
tests = unittest.TestLoader().discover(start_dir=".", pattern="test_*.py")
unit_ok = (
unittest.TextTestRunner(verbosity=verbosity, buffer=True)
.run(tests)
.wasSuccessful()
)
retval = 0
if not yaml_ok:

View File

@ -1,23 +1,24 @@
'''
"""
The functions in this file interact with the VPP API to retrieve certain
interface metadata and write it to a YAML file.
'''
"""
from vpp.vppapi import VPPApi
import sys
import yaml
import config.bondethernet as bondethernet
class Dumper(VPPApi):
def __init__(self, address='/run/vpp/api.sock', clientname='vppcfg'):
def __init__(self, address="/run/vpp/api.sock", clientname="vppcfg"):
VPPApi.__init__(self, address, clientname)
def write(self, outfile):
if outfile and outfile == '-':
if outfile and outfile == "-":
fh = sys.stdout
outfile = "(stdout)"
else:
fh = open(outfile, 'w')
fh = open(outfile, "w")
config = self.cache_to_config()
@ -28,140 +29,187 @@ class Dumper(VPPApi):
self.logger.info(f"Wrote YAML config to {outfile}")
def cache_to_config(self):
config = {"loopbacks": {}, "bondethernets": {}, "interfaces": {}, "bridgedomains": {}, "vxlan_tunnels": {}, "taps": {} }
for idx, bond_iface in self.cache['bondethernets'].items():
config = {
"loopbacks": {},
"bondethernets": {},
"interfaces": {},
"bridgedomains": {},
"vxlan_tunnels": {},
"taps": {},
}
for idx, bond_iface in self.cache["bondethernets"].items():
bond = {"description": ""}
if bond_iface.sw_if_index in self.cache['bondethernet_members']:
members = [self.cache['interfaces'][x].interface_name for x in self.cache['bondethernet_members'][bond_iface.sw_if_index]]
if bond_iface.sw_if_index in self.cache["bondethernet_members"]:
members = [
self.cache["interfaces"][x].interface_name
for x in self.cache["bondethernet_members"][bond_iface.sw_if_index]
]
if len(members) > 0:
bond['interfaces'] = members
bond["interfaces"] = members
mode = bondethernet.int_to_mode(bond_iface.mode)
bond['mode'] = mode
if mode in ['xor', 'lacp']:
bond['load-balance'] = bondethernet.int_to_lb(bond_iface.lb)
iface = self.cache['interfaces'][bond_iface.sw_if_index]
bond['mac'] = str(iface.l2_address)
config['bondethernets'][iface.interface_name] = bond
bond["mode"] = mode
if mode in ["xor", "lacp"]:
bond["load-balance"] = bondethernet.int_to_lb(bond_iface.lb)
iface = self.cache["interfaces"][bond_iface.sw_if_index]
bond["mac"] = str(iface.l2_address)
config["bondethernets"][iface.interface_name] = bond
for numtags in [ 0, 1, 2 ]:
for idx, iface in self.cache['interfaces'].items():
for numtags in [0, 1, 2]:
for idx, iface in self.cache["interfaces"].items():
if iface.sub_number_of_tags != numtags:
continue
if iface.interface_dev_type=='Loopback':
if iface.interface_dev_type == "Loopback":
if iface.sub_id > 0:
self.logger.warning(f"Refusing to export sub-interfaces of loopback devices ({iface.interface_name})")
self.logger.warning(
f"Refusing to export sub-interfaces of loopback devices ({iface.interface_name})"
)
continue
loop = {"description": ""}
loop['mtu'] = iface.mtu[0]
loop['mac'] = str(iface.l2_address)
if iface.sw_if_index in self.cache['lcps']:
loop['lcp'] = self.cache['lcps'][iface.sw_if_index].host_if_name
if iface.sw_if_index in self.cache['interface_addresses']:
if len(self.cache['interface_addresses'][iface.sw_if_index]) > 0:
loop['addresses'] = self.cache['interface_addresses'][iface.sw_if_index]
config['loopbacks'][iface.interface_name] = loop
elif iface.interface_dev_type in ['bond', 'VXLAN', 'dpdk', 'virtio']:
i = {"description": "" }
if iface.sw_if_index in self.cache['lcps']:
i['lcp'] = self.cache['lcps'][iface.sw_if_index].host_if_name
if iface.sw_if_index in self.cache['interface_addresses']:
if len(self.cache['interface_addresses'][iface.sw_if_index]) > 0:
i['addresses'] = self.cache['interface_addresses'][iface.sw_if_index]
if iface.sw_if_index in self.cache['l2xcs']:
l2xc = self.cache['l2xcs'][iface.sw_if_index]
i['l2xc'] = self.cache['interfaces'][l2xc.tx_sw_if_index].interface_name
if not self.cache['interfaces'][idx].flags & 1: # IF_STATUS_API_FLAG_ADMIN_UP
i['state'] = 'down'
loop["mtu"] = iface.mtu[0]
loop["mac"] = str(iface.l2_address)
if iface.sw_if_index in self.cache["lcps"]:
loop["lcp"] = self.cache["lcps"][iface.sw_if_index].host_if_name
if iface.sw_if_index in self.cache["interface_addresses"]:
if (
len(self.cache["interface_addresses"][iface.sw_if_index])
> 0
):
loop["addresses"] = self.cache["interface_addresses"][
iface.sw_if_index
]
config["loopbacks"][iface.interface_name] = loop
elif iface.interface_dev_type in ["bond", "VXLAN", "dpdk", "virtio"]:
i = {"description": ""}
if iface.sw_if_index in self.cache["lcps"]:
i["lcp"] = self.cache["lcps"][iface.sw_if_index].host_if_name
if iface.sw_if_index in self.cache["interface_addresses"]:
if (
len(self.cache["interface_addresses"][iface.sw_if_index])
> 0
):
i["addresses"] = self.cache["interface_addresses"][
iface.sw_if_index
]
if iface.sw_if_index in self.cache["l2xcs"]:
l2xc = self.cache["l2xcs"][iface.sw_if_index]
i["l2xc"] = self.cache["interfaces"][
l2xc.tx_sw_if_index
].interface_name
if (
not self.cache["interfaces"][idx].flags & 1
): # IF_STATUS_API_FLAG_ADMIN_UP
i["state"] = "down"
if iface.interface_dev_type == 'dpdk' and iface.sub_number_of_tags == 0:
i['mac'] = str(iface.l2_address)
if (
iface.interface_dev_type == "dpdk"
and iface.sub_number_of_tags == 0
):
i["mac"] = str(iface.l2_address)
if self.tap_is_lcp(iface.interface_name):
continue
i['mtu'] = iface.mtu[0]
i["mtu"] = iface.mtu[0]
if iface.sub_number_of_tags == 0:
config['interfaces'][iface.interface_name] = i
config["interfaces"][iface.interface_name] = i
continue
encap = {}
if iface.sub_if_flags&8:
encap['dot1ad'] = iface.sub_outer_vlan_id
if iface.sub_if_flags & 8:
encap["dot1ad"] = iface.sub_outer_vlan_id
else:
encap['dot1q'] = iface.sub_outer_vlan_id
encap["dot1q"] = iface.sub_outer_vlan_id
if iface.sub_inner_vlan_id > 0:
encap['inner-dot1q'] = iface.sub_inner_vlan_id
encap['exact-match'] = bool(iface.sub_if_flags&16)
i['encapsulation'] = encap
encap["inner-dot1q"] = iface.sub_inner_vlan_id
encap["exact-match"] = bool(iface.sub_if_flags & 16)
i["encapsulation"] = encap
sup_iface = self.cache['interfaces'][iface.sup_sw_if_index]
sup_iface = self.cache["interfaces"][iface.sup_sw_if_index]
if iface.mtu[0] > 0:
i['mtu'] = iface.mtu[0]
i["mtu"] = iface.mtu[0]
else:
i['mtu'] = sup_iface.mtu[0]
if not 'sub-interfaces' in config['interfaces'][sup_iface.interface_name]:
config['interfaces'][sup_iface.interface_name]['sub-interfaces'] = {}
config['interfaces'][sup_iface.interface_name]['sub-interfaces'][iface.sub_id] = i
i["mtu"] = sup_iface.mtu[0]
if (
not "sub-interfaces"
in config["interfaces"][sup_iface.interface_name]
):
config["interfaces"][sup_iface.interface_name][
"sub-interfaces"
] = {}
config["interfaces"][sup_iface.interface_name]["sub-interfaces"][
iface.sub_id
] = i
for idx, iface in self.cache['vxlan_tunnels'].items():
vpp_iface = self.cache['interfaces'][iface.sw_if_index]
vxlan = { "description": "",
for idx, iface in self.cache["vxlan_tunnels"].items():
vpp_iface = self.cache["interfaces"][iface.sw_if_index]
vxlan = {
"description": "",
"vni": int(iface.vni),
"local": str(iface.src_address),
"remote": str(iface.dst_address) }
config['vxlan_tunnels'][vpp_iface.interface_name] = vxlan
"remote": str(iface.dst_address),
}
config["vxlan_tunnels"][vpp_iface.interface_name] = vxlan
for idx, iface in self.cache['taps'].items():
vpp_tap = self.cache['taps'][iface.sw_if_index]
vpp_iface = self.cache['interfaces'][vpp_tap.sw_if_index]
for idx, iface in self.cache["taps"].items():
vpp_tap = self.cache["taps"][iface.sw_if_index]
vpp_iface = self.cache["interfaces"][vpp_tap.sw_if_index]
if self.tap_is_lcp(vpp_iface.interface_name):
continue
tap = { "description": "",
tap = {
"description": "",
"tx-ring-size": vpp_tap.tx_ring_sz,
"rx-ring-size": vpp_tap.rx_ring_sz,
"host": {
"mac": str(vpp_tap.host_mac_addr),
"name": vpp_tap.host_if_name,
} }
},
}
if vpp_tap.host_mtu_size > 0:
tap['host']['mtu'] = vpp_tap.host_mtu_size
tap["host"]["mtu"] = vpp_tap.host_mtu_size
if vpp_tap.host_namespace:
tap['host']['namespace'] = vpp_tap.host_namespace
tap["host"]["namespace"] = vpp_tap.host_namespace
if vpp_tap.host_bridge:
tap['host']['bridge'] = vpp_tap.host_bridge
config['taps'][vpp_iface.interface_name] = tap
tap["host"]["bridge"] = vpp_tap.host_bridge
config["taps"][vpp_iface.interface_name] = tap
for idx, iface in self.cache['bridgedomains'].items():
for idx, iface in self.cache["bridgedomains"].items():
bridge_name = f"bd{int(idx)}"
mtu = 1500
bridge = {"description": ""}
settings = {}
settings['learn'] = iface.learn
settings['unicast-flood'] = iface.flood
settings['unknown-unicast-flood'] = iface.uu_flood
settings['unicast-forward'] = iface.forward
settings['arp-termination'] = iface.arp_term
settings['arp-unicast-forward'] = iface.arp_ufwd
settings['mac-age-minutes'] = int(iface.mac_age)
bridge['settings'] = settings
settings["learn"] = iface.learn
settings["unicast-flood"] = iface.flood
settings["unknown-unicast-flood"] = iface.uu_flood
settings["unicast-forward"] = iface.forward
settings["arp-termination"] = iface.arp_term
settings["arp-unicast-forward"] = iface.arp_ufwd
settings["mac-age-minutes"] = int(iface.mac_age)
bridge["settings"] = settings
bvi = None
if iface.bvi_sw_if_index != 2**32-1:
bvi = self.cache['interfaces'][iface.bvi_sw_if_index]
if iface.bvi_sw_if_index != 2**32 - 1:
bvi = self.cache["interfaces"][iface.bvi_sw_if_index]
mtu = bvi.mtu[0]
bridge['bvi'] = bvi.interface_name
bridge["bvi"] = bvi.interface_name
members = []
for member in iface.sw_if_details:
if bvi and bvi.interface_name == self.cache['interfaces'][member.sw_if_index].interface_name == bvi.interface_name:
if (
bvi
and bvi.interface_name
== self.cache["interfaces"][member.sw_if_index].interface_name
== bvi.interface_name
):
continue
members.append(self.cache['interfaces'][member.sw_if_index].interface_name)
mtu = self.cache['interfaces'][member.sw_if_index].mtu[0]
members.append(
self.cache["interfaces"][member.sw_if_index].interface_name
)
mtu = self.cache["interfaces"][member.sw_if_index].mtu[0]
if len(members) > 0:
bridge['interfaces'] = members
bridge['mtu'] = mtu
config['bridgedomains'][bridge_name] = bridge
bridge["interfaces"] = members
bridge["mtu"] = mtu
config["bridgedomains"][bridge_name] = bridge
return config

File diff suppressed because it is too large Load Diff

View File

@ -1,17 +1,18 @@
'''
"""
The functions in this file interact with the VPP API to retrieve certain
interface metadata. Its base class will never change state. See the
derived classes VPPApiDumper() and VPPApiApplier()
'''
"""
from vpp_papi import VPPApiClient
import os
import fnmatch
import logging
class VPPApi():
def __init__(self, address='/run/vpp/api.sock', clientname='vppcfg'):
self.logger = logging.getLogger('vppcfg.vppapi')
class VPPApi:
def __init__(self, address="/run/vpp/api.sock", clientname="vppcfg"):
self.logger = logging.getLogger("vppcfg.vppapi")
self.logger.addHandler(logging.NullHandler())
self.address = address
@ -26,116 +27,135 @@ class VPPApi():
if self.connected:
return True
vpp_json_dir = '/usr/share/vpp/api/'
vpp_json_dir = "/usr/share/vpp/api/"
## vpp_json_dir = "/home/pim/src/vpp/build-root/build-vpp_debug-native/vpp/CMakeFiles/"
# construct a list of all the json api files
jsonfiles = []
for root, dirnames, filenames in os.walk(vpp_json_dir):
for filename in fnmatch.filter(filenames, '*.api.json'):
for filename in fnmatch.filter(filenames, "*.api.json"):
jsonfiles.append(os.path.join(root, filename))
if not jsonfiles:
self.logger.error('no json api files found')
self.logger.error("no json api files found")
return False
self.vpp = VPPApiClient(apifiles=jsonfiles,
server_address=self.address)
self.vpp = VPPApiClient(apifiles=jsonfiles, server_address=self.address)
try:
self.logger.debug('Connecting to VPP')
self.logger.debug("Connecting to VPP")
self.vpp.connect(self.clientname)
except:
return False
v = self.vpp.api.show_version()
self.logger.info(f'VPP version is {v.version}')
self.logger.info(f"VPP version is {v.version}")
self.connected = True
return True
def disconnect(self):
if not self.connected:
return True
self.vpp.disconnect()
self.logger.debug('Disconnected from VPP')
self.logger.debug("Disconnected from VPP")
self.connected = False
return True
def cache_clear(self):
self.cache_read = False
return {"lcps": {}, "interface_names": {}, "interfaces": {}, "interface_addresses": {},
"bondethernets": {}, "bondethernet_members": {},
"bridgedomains": {}, "vxlan_tunnels": {}, "l2xcs": {}, "taps": {}}
return {
"lcps": {},
"interface_names": {},
"interfaces": {},
"interface_addresses": {},
"bondethernets": {},
"bondethernet_members": {},
"bridgedomains": {},
"vxlan_tunnels": {},
"l2xcs": {},
"taps": {},
}
def cache_remove_lcp(self, lcpname):
""" Removes the LCP and TAP interface, identified by lcpname, from the config. """
found=False
for idx, lcp in self.cache['lcps'].items():
"""Removes the LCP and TAP interface, identified by lcpname, from the config."""
found = False
for idx, lcp in self.cache["lcps"].items():
if lcp.host_if_name == lcpname:
found = True
break
if not found:
self.logger.warning(f"Trying to remove an LCP which is not in the config: {lcpname}")
self.logger.warning(
f"Trying to remove an LCP which is not in the config: {lcpname}"
)
return False
ifname = self.cache['interfaces'][lcp.host_sw_if_index].interface_name
del self.cache['lcps'][lcp.phy_sw_if_index]
ifname = self.cache["interfaces"][lcp.host_sw_if_index].interface_name
del self.cache["lcps"][lcp.phy_sw_if_index]
# Remove the TAP interface and its dependencies
return self.cache_remove_interface(ifname)
def cache_remove_bondethernet_member(self, ifname):
""" Removes the bonderthernet member interface, identified by name, from the config. """
if not ifname in self.cache['interface_names']:
self.logger.warning(f"Trying to remove a bondethernet member interface which is not in the config: {ifname}")
"""Removes the bonderthernet member interface, identified by name, from the config."""
if not ifname in self.cache["interface_names"]:
self.logger.warning(
f"Trying to remove a bondethernet member interface which is not in the config: {ifname}"
)
return False
iface = self.cache['interface_names'][ifname]
for bond_idx, members in self.cache['bondethernet_members'].items():
iface = self.cache["interface_names"][ifname]
for bond_idx, members in self.cache["bondethernet_members"].items():
if iface.sw_if_index in members:
self.cache['bondethernet_members'][bond_idx].remove(iface.sw_if_index)
self.cache["bondethernet_members"][bond_idx].remove(iface.sw_if_index)
return True
def cache_remove_l2xc(self, ifname):
if not ifname in self.cache['interface_names']:
self.logger.warning(f"Trying to remove an L2XC which is not in the config: {ifname}")
if not ifname in self.cache["interface_names"]:
self.logger.warning(
f"Trying to remove an L2XC which is not in the config: {ifname}"
)
return False
iface = self.cache['interface_names'][ifname]
self.cache['l2xcs'].pop(iface.sw_if_index, None)
iface = self.cache["interface_names"][ifname]
self.cache["l2xcs"].pop(iface.sw_if_index, None)
return True
def cache_remove_vxlan_tunnel(self, ifname):
if not ifname in self.cache['interface_names']:
self.logger.warning(f"Trying to remove a VXLAN Tunnel which is not in the config: {ifname}")
if not ifname in self.cache["interface_names"]:
self.logger.warning(
f"Trying to remove a VXLAN Tunnel which is not in the config: {ifname}"
)
return False
iface = self.cache['interface_names'][ifname]
self.cache['vxlan_tunnels'].pop(iface.sw_if_index, None)
iface = self.cache["interface_names"][ifname]
self.cache["vxlan_tunnels"].pop(iface.sw_if_index, None)
return True
def cache_remove_interface(self, ifname):
""" Removes the interface, identified by name, from the config. """
if not ifname in self.cache['interface_names']:
self.logger.warning(f"Trying to remove an interface which is not in the config: {ifname}")
"""Removes the interface, identified by name, from the config."""
if not ifname in self.cache["interface_names"]:
self.logger.warning(
f"Trying to remove an interface which is not in the config: {ifname}"
)
return False
iface = self.cache['interface_names'][ifname]
del self.cache['interfaces'][iface.sw_if_index]
if len(self.cache['interface_addresses'][iface.sw_if_index]) > 0:
iface = self.cache["interface_names"][ifname]
del self.cache["interfaces"][iface.sw_if_index]
if len(self.cache["interface_addresses"][iface.sw_if_index]) > 0:
self.logger.warning(f"Not all addresses were removed on {ifname}")
del self.cache['interface_addresses'][iface.sw_if_index]
del self.cache['interface_names'][ifname]
del self.cache["interface_addresses"][iface.sw_if_index]
del self.cache["interface_names"][ifname]
## Use my_dict.pop('key', None), as it allows 'key' to be absent
if iface.sw_if_index in self.cache['bondethernet_members']:
if len(self.cache['bondethernet_members'][iface.sw_if_index]) != 0:
self.logger.warning(f"When removing BondEthernet {ifname}, its members are not empty: {self.cache['bondethernet_members'][iface.sw_if_index]}")
if iface.sw_if_index in self.cache["bondethernet_members"]:
if len(self.cache["bondethernet_members"][iface.sw_if_index]) != 0:
self.logger.warning(
f"When removing BondEthernet {ifname}, its members are not empty: {self.cache['bondethernet_members'][iface.sw_if_index]}"
)
else:
del self.cache['bondethernet_members'][iface.sw_if_index]
self.cache['bondethernets'].pop(iface.sw_if_index, None)
self.cache['taps'].pop(iface.sw_if_index, None)
del self.cache["bondethernet_members"][iface.sw_if_index]
self.cache["bondethernets"].pop(iface.sw_if_index, None)
self.cache["taps"].pop(iface.sw_if_index, None)
return True
def readconfig(self):
@ -155,116 +175,169 @@ class VPPApi():
if lcp.phy_sw_if_index > 65535 or lcp.host_sw_if_index > 65535:
## Work around endianness bug: https://gerrit.fd.io/r/c/vpp/+/35479
## TODO(pim) - remove this when 22.06 ships
lcp = lcp._replace(phy_sw_if_index=socket.ntohl(lcp.phy_sw_if_index))
lcp = lcp._replace(host_sw_if_index=socket.ntohl(lcp.host_sw_if_index))
lcp = lcp._replace(
phy_sw_if_index=socket.ntohl(lcp.phy_sw_if_index)
)
lcp = lcp._replace(
host_sw_if_index=socket.ntohl(lcp.host_sw_if_index)
)
lcp = lcp._replace(vif_index=socket.ntohl(lcp.vif_index))
self.logger.warning(f"LCP workaround for endianness issue on {lcp.host_if_name}")
self.cache['lcps'][lcp.phy_sw_if_index] = lcp
self.logger.warning(
f"LCP workaround for endianness issue on {lcp.host_if_name}"
)
self.cache["lcps"][lcp.phy_sw_if_index] = lcp
self.lcp_enabled = True
except:
self.logger.warning("linux-cp not found, will not reconcile Linux Control Plane")
self.logger.warning(
"linux-cp not found, will not reconcile Linux Control Plane"
)
self.logger.debug("Retrieving interfaces")
r = self.vpp.api.sw_interface_dump()
for iface in r:
self.cache['interfaces'][iface.sw_if_index] = iface
self.cache['interface_names'][iface.interface_name] = iface
self.cache['interface_addresses'][iface.sw_if_index] = []
self.cache["interfaces"][iface.sw_if_index] = iface
self.cache["interface_names"][iface.interface_name] = iface
self.cache["interface_addresses"][iface.sw_if_index] = []
self.logger.debug(f"Retrieving IPv4 addresses for {iface.interface_name}")
ipr = self.vpp.api.ip_address_dump(sw_if_index=iface.sw_if_index, is_ipv6=False)
ipr = self.vpp.api.ip_address_dump(
sw_if_index=iface.sw_if_index, is_ipv6=False
)
for ip in ipr:
self.cache['interface_addresses'][iface.sw_if_index].append(str(ip.prefix))
self.cache["interface_addresses"][iface.sw_if_index].append(
str(ip.prefix)
)
self.logger.debug(f"Retrieving IPv6 addresses for {iface.interface_name}")
ipr = self.vpp.api.ip_address_dump(sw_if_index=iface.sw_if_index, is_ipv6=True)
ipr = self.vpp.api.ip_address_dump(
sw_if_index=iface.sw_if_index, is_ipv6=True
)
for ip in ipr:
self.cache['interface_addresses'][iface.sw_if_index].append(str(ip.prefix))
self.cache["interface_addresses"][iface.sw_if_index].append(
str(ip.prefix)
)
self.logger.debug("Retrieving bondethernets")
r = self.vpp.api.sw_bond_interface_dump()
for iface in r:
self.cache['bondethernets'][iface.sw_if_index] = iface
self.cache['bondethernet_members'][iface.sw_if_index] = []
for member in self.vpp.api.sw_member_interface_dump(sw_if_index=iface.sw_if_index):
self.cache['bondethernet_members'][iface.sw_if_index].append(member.sw_if_index)
self.cache["bondethernets"][iface.sw_if_index] = iface
self.cache["bondethernet_members"][iface.sw_if_index] = []
for member in self.vpp.api.sw_member_interface_dump(
sw_if_index=iface.sw_if_index
):
self.cache["bondethernet_members"][iface.sw_if_index].append(
member.sw_if_index
)
self.logger.debug("Retrieving bridgedomains")
r = self.vpp.api.bridge_domain_dump()
for bridge in r:
self.cache['bridgedomains'][bridge.bd_id] = bridge
self.cache["bridgedomains"][bridge.bd_id] = bridge
self.logger.debug("Retrieving vxlan_tunnels")
r = self.vpp.api.vxlan_tunnel_v2_dump()
for vxlan in r:
self.cache['vxlan_tunnels'][vxlan.sw_if_index] = vxlan
self.cache["vxlan_tunnels"][vxlan.sw_if_index] = vxlan
self.logger.debug("Retrieving L2 Cross Connects")
r = self.vpp.api.l2_xconnect_dump()
for l2xc in r:
self.cache['l2xcs'][l2xc.rx_sw_if_index] = l2xc
self.cache["l2xcs"][l2xc.rx_sw_if_index] = l2xc
self.logger.debug("Retrieving TAPs")
r = self.vpp.api.sw_interface_tap_v2_dump()
for tap in r:
self.cache['taps'][tap.sw_if_index] = tap
self.cache["taps"][tap.sw_if_index] = tap
self.cache_read = True
return self.cache_read
def phys_exist(self, ifname_list):
""" Return True if all interfaces in the `ifname_list` exist as physical interface names
"""Return True if all interfaces in the `ifname_list` exist as physical interface names
in VPP. Return False otherwise."""
ret = True
for ifname in ifname_list:
if not ifname in self.cache['interface_names']:
if not ifname in self.cache["interface_names"]:
self.logger.warning(f"Interface {ifname} does not exist in VPP")
ret = False
return ret
def get_sub_interfaces(self):
subints = [self.cache['interfaces'][x].interface_name for x in self.cache['interfaces'] if self.cache['interfaces'][x].sub_id>0 and self.cache['interfaces'][x].sub_number_of_tags > 0]
subints = [
self.cache["interfaces"][x].interface_name
for x in self.cache["interfaces"]
if self.cache["interfaces"][x].sub_id > 0
and self.cache["interfaces"][x].sub_number_of_tags > 0
]
return subints
def get_qinx_interfaces(self):
qinx_subints = [self.cache['interfaces'][x].interface_name for x in self.cache['interfaces'] if self.cache['interfaces'][x].sub_id>0 and self.cache['interfaces'][x].sub_inner_vlan_id>0]
qinx_subints = [
self.cache["interfaces"][x].interface_name
for x in self.cache["interfaces"]
if self.cache["interfaces"][x].sub_id > 0
and self.cache["interfaces"][x].sub_inner_vlan_id > 0
]
return qinx_subints
def get_dot1x_interfaces(self):
dot1x_subints = [self.cache['interfaces'][x].interface_name for x in self.cache['interfaces'] if self.cache['interfaces'][x].sub_id>0 and self.cache['interfaces'][x].sub_inner_vlan_id==0]
dot1x_subints = [
self.cache["interfaces"][x].interface_name
for x in self.cache["interfaces"]
if self.cache["interfaces"][x].sub_id > 0
and self.cache["interfaces"][x].sub_inner_vlan_id == 0
]
return dot1x_subints
def get_loopbacks(self):
loopbacks = [self.cache['interfaces'][x].interface_name for x in self.cache['interfaces'] if self.cache['interfaces'][x].interface_dev_type=='Loopback']
loopbacks = [
self.cache["interfaces"][x].interface_name
for x in self.cache["interfaces"]
if self.cache["interfaces"][x].interface_dev_type == "Loopback"
]
return loopbacks
def get_phys(self):
phys = [self.cache['interfaces'][x].interface_name for x in self.cache['interfaces'] if self.cache['interfaces'][x].sw_if_index == self.cache['interfaces'][x].sup_sw_if_index and self.cache['interfaces'][x].interface_dev_type not in ['virtio', 'BVI', 'Loopback', 'VXLAN', 'local', 'bond']]
phys = [
self.cache["interfaces"][x].interface_name
for x in self.cache["interfaces"]
if self.cache["interfaces"][x].sw_if_index
== self.cache["interfaces"][x].sup_sw_if_index
and self.cache["interfaces"][x].interface_dev_type
not in ["virtio", "BVI", "Loopback", "VXLAN", "local", "bond"]
]
return phys
def get_bondethernets(self):
bonds = [self.cache['bondethernets'][x].interface_name for x in self.cache['bondethernets']]
bonds = [
self.cache["bondethernets"][x].interface_name
for x in self.cache["bondethernets"]
]
return bonds
def get_vxlan_tunnels(self):
vxlan_tunnels = [self.cache['interfaces'][x].interface_name for x in self.cache['interfaces'] if self.cache['interfaces'][x].interface_dev_type in ['VXLAN']]
vxlan_tunnels = [
self.cache["interfaces"][x].interface_name
for x in self.cache["interfaces"]
if self.cache["interfaces"][x].interface_dev_type in ["VXLAN"]
]
return vxlan_tunnels
def get_lcp_by_interface(self, sw_if_index):
for idx, lcp in self.cache['lcps'].items():
for idx, lcp in self.cache["lcps"].items():
if lcp.phy_sw_if_index == sw_if_index:
return lcp
return None
def tap_is_lcp(self, tap_ifname):
""" Returns True if the given tap_ifname is a TAP interface belonging to an LCP,
"""Returns True if the given tap_ifname is a TAP interface belonging to an LCP,
or False otherwise."""
if not tap_ifname in self.cache['interface_names']:
if not tap_ifname in self.cache["interface_names"]:
return False
vpp_iface = self.cache['interface_names'][tap_ifname]
if not vpp_iface.interface_dev_type=="virtio":
vpp_iface = self.cache["interface_names"][tap_ifname]
if not vpp_iface.interface_dev_type == "virtio":
return False
for idx, lcp in self.cache['lcps'].items():
for idx, lcp in self.cache["lcps"].items():
if vpp_iface.sw_if_index == lcp.host_sw_if_index:
return True
return False