build: reworked with setuptools
Signed-off-by: Ray Kinsella <mdr@ashroe.eu>
This commit is contained in:
170
vppcfg/config/__init__.py
Normal file
170
vppcfg/config/__init__.py
Normal file
@ -0,0 +1,170 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
# -*- coding: utf-8 -*-
|
||||
""" A vppcfg configuration module that exposes its semantic/syntax validators """
|
||||
from __future__ import (
|
||||
absolute_import,
|
||||
division,
|
||||
print_function,
|
||||
)
|
||||
|
||||
import logging
|
||||
import ipaddress
|
||||
import os.path
|
||||
import sys
|
||||
|
||||
try:
|
||||
import yamale
|
||||
except ImportError:
|
||||
print("ERROR: install yamale manually: sudo pip install yamale")
|
||||
sys.exit(-2)
|
||||
from yamale.validators import DefaultValidators, Validator
|
||||
|
||||
from config.loopback import validate_loopbacks
|
||||
from config.bondethernet import validate_bondethernets
|
||||
from config.interface import validate_interfaces
|
||||
from config.bridgedomain import validate_bridgedomains
|
||||
from config.vxlan_tunnel import validate_vxlan_tunnels
|
||||
from config.tap import validate_taps
|
||||
|
||||
|
||||
class IPInterfaceWithPrefixLength(Validator):
|
||||
"""Custom IPAddress config - takes IP/prefixlen as input:
|
||||
192.0.2.1/29 or 2001:db8::1/64 are correct. The PrefixLength
|
||||
is required, and must be a number (0-32 for IPv4 and 0-128 for
|
||||
IPv6).
|
||||
"""
|
||||
|
||||
tag = "ip_interface"
|
||||
|
||||
def _is_valid(self, value):
|
||||
try:
|
||||
_network = ipaddress.ip_interface(value)
|
||||
except:
|
||||
return False
|
||||
if not isinstance(value, str):
|
||||
return False
|
||||
if not "/" in value:
|
||||
return False
|
||||
elems = value.split("/")
|
||||
if not len(elems) == 2:
|
||||
return False
|
||||
if not elems[1].isnumeric():
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class Validator:
|
||||
"""The Validator class takes a schema filename (which may be None, in which
|
||||
case a built-in default is used), and a given YAML file represented as a string,
|
||||
and holds it against syntax and semantic validators, returning a tuple of (bool,list)
|
||||
where the boolean signals success/failure, and the list of strings are messages
|
||||
that were added when validating the YAML config.
|
||||
|
||||
The purpose is to ensure that the YAML file is both syntactically correct,
|
||||
which is ensured by Yamale, and semantically correct, which is ensured by a set
|
||||
of built-in validators, and user-added validators (see the add_validator() method)."""
|
||||
|
||||
def __init__(self, schema):
|
||||
self.logger = logging.getLogger("vppcfg.config")
|
||||
self.logger.addHandler(logging.NullHandler())
|
||||
|
||||
self.schema = schema
|
||||
self.validators = [
|
||||
validate_bondethernets,
|
||||
validate_interfaces,
|
||||
validate_loopbacks,
|
||||
validate_bridgedomains,
|
||||
validate_vxlan_tunnels,
|
||||
validate_taps,
|
||||
]
|
||||
|
||||
def validate(self, yaml):
|
||||
"""Validate the semantics of all YAML maps, by calling self.validators in turn,
|
||||
and then optionally calling validators that were added with add_validator()"""
|
||||
ret_retval = True
|
||||
ret_msgs = []
|
||||
if not yaml:
|
||||
return ret_retval, ret_msgs
|
||||
|
||||
validators = DefaultValidators.copy()
|
||||
validators[IPInterfaceWithPrefixLength.tag] = IPInterfaceWithPrefixLength
|
||||
if self.schema:
|
||||
fname = self.schema
|
||||
self.logger.debug(f"Validating against --schema {fname}")
|
||||
elif hasattr(sys, "_MEIPASS"):
|
||||
## See vppcfg.spec data_files that includes schema.yaml into the bundle
|
||||
self.logger.debug("Validating against built-in schema")
|
||||
fname = os.path.join(sys._MEIPASS, "schema.yaml")
|
||||
else:
|
||||
fname = "./schema.yaml"
|
||||
self.logger.debug(f"Validating against fallthrough default schema {fname}")
|
||||
|
||||
if not os.path.isfile(fname):
|
||||
self.logger.error(f"Cannot file schema file: {fname}")
|
||||
return False, ret_msgs
|
||||
|
||||
try:
|
||||
schema = yamale.make_schema(fname, validators=validators)
|
||||
data = yamale.make_data(content=str(yaml))
|
||||
yamale.validate(schema, data)
|
||||
self.logger.debug("Schema correctly validated by yamale")
|
||||
except ValueError as e:
|
||||
ret_retval = False
|
||||
for result in e.results:
|
||||
for error in result.errors:
|
||||
ret_msgs.extend([f"yamale: {error}"])
|
||||
return ret_retval, ret_msgs
|
||||
|
||||
self.logger.debug("Validating Semantics...")
|
||||
|
||||
for validator in self.validators:
|
||||
retval, msgs = validator(yaml)
|
||||
if msgs:
|
||||
ret_msgs.extend(msgs)
|
||||
if not retval:
|
||||
ret_retval = False
|
||||
|
||||
if ret_retval:
|
||||
self.logger.debug("Semantics correctly validated")
|
||||
return ret_retval, ret_msgs
|
||||
|
||||
def valid_config(self, yaml):
|
||||
"""Validate the given YAML configuration in 'yaml' against syntax
|
||||
validation given in the yamale 'schema', and all semantic configs.
|
||||
|
||||
Returns True if the configuration is valid, False otherwise.
|
||||
"""
|
||||
|
||||
retval, msgs = self.validate(yaml)
|
||||
if not retval:
|
||||
for msg in msgs:
|
||||
self.logger.error(msg)
|
||||
return False
|
||||
|
||||
self.logger.info("Configuration validated successfully")
|
||||
return True
|
||||
|
||||
def add_validator(self, func):
|
||||
"""Add a validator function, which strictly takes the prototype
|
||||
rv, msgs = func(yaml)
|
||||
returning a Boolean success value in rv and a List of strings
|
||||
in msgs. The function will be passed the configuration YAML and
|
||||
gets to opine if it's valid or not.
|
||||
|
||||
Note: will only be called iff Yamale syntax-check succeeded,
|
||||
and it will be called after all built-in validators.
|
||||
"""
|
||||
self.validators.append(func)
|
115
vppcfg/config/address.py
Normal file
115
vppcfg/config/address.py
Normal file
@ -0,0 +1,115 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
""" A vppcfg configuration module that handles addresses """
|
||||
import ipaddress
|
||||
|
||||
|
||||
def get_all_addresses_except_ifname(yaml, except_ifname):
|
||||
"""Return a list of all ipaddress.ip_interface() instances in the entire config,
|
||||
except for those that belong to 'ifname'.
|
||||
"""
|
||||
ret = []
|
||||
if "interfaces" in yaml:
|
||||
for ifname, iface in yaml["interfaces"].items():
|
||||
if ifname == except_ifname:
|
||||
continue
|
||||
|
||||
if "addresses" in iface:
|
||||
for addr in iface["addresses"]:
|
||||
ret.append(ipaddress.ip_interface(addr))
|
||||
if "sub-interfaces" in iface:
|
||||
for subid, sub_iface in iface["sub-interfaces"].items():
|
||||
sub_ifname = f"{ifname}.{int(subid)}"
|
||||
if sub_ifname == except_ifname:
|
||||
continue
|
||||
|
||||
if "addresses" in sub_iface:
|
||||
for addr in sub_iface["addresses"]:
|
||||
ret.append(ipaddress.ip_interface(addr))
|
||||
if "loopbacks" in yaml:
|
||||
for ifname, iface in yaml["loopbacks"].items():
|
||||
if ifname == except_ifname:
|
||||
continue
|
||||
|
||||
if "addresses" in iface:
|
||||
for addr in iface["addresses"]:
|
||||
ret.append(ipaddress.ip_interface(addr))
|
||||
if "bridgedomains" in yaml:
|
||||
for ifname, iface in yaml["bridgedomains"].items():
|
||||
if ifname == except_ifname:
|
||||
continue
|
||||
|
||||
if "addresses" in iface:
|
||||
for addr in iface["addresses"]:
|
||||
ret.append(ipaddress.ip_interface(addr))
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
def is_allowed(yaml, ifname, iface_addresses, ip_interface):
|
||||
"""Returns True if there is at most one occurence of the ip_interface (an IPv4/IPv6 prefix+len)
|
||||
in the entire config. That said, we need the 'iface_addresses' because VPP is a bit fickle in
|
||||
this regard.
|
||||
|
||||
IP addresses from the same prefix/len can be added to a given interface (ie 192.0.2.1/24 and
|
||||
192.0.2.2/24), but other than that, any prefix can not occur as a more-specific or less-specific
|
||||
of any other interface.
|
||||
|
||||
So, we will allow:
|
||||
- any ip_interface that is of equal network/len of existing one(s) _on the same interface_
|
||||
|
||||
And, we will reject
|
||||
- any ip_interface that is a more specific of any existing one
|
||||
- any ip_interface that is a less specific of any existing one
|
||||
|
||||
Examples:
|
||||
vpp# set interface ip address loop0 192.0.2.1/24
|
||||
vpp# set interface ip address loop0 192.0.2.2/24
|
||||
vpp# set interface ip address loop0 192.0.2.1/29
|
||||
set interface ip address: failed to add 192.0.2.1/29 on loop0 which conflicts with 192.0.2.1/24 for interface loop0
|
||||
vpp# set interface ip address loop0 192.0.2.3/23
|
||||
set interface ip address: failed to add 192.0.2.3/23 on loop0 which conflicts with 192.0.2.1/24 for interface loop0
|
||||
"""
|
||||
all_other_addresses = get_all_addresses_except_ifname(yaml, ifname)
|
||||
|
||||
my_ip_network = ipaddress.ip_network(ip_interface, strict=False)
|
||||
|
||||
for ipi in all_other_addresses:
|
||||
if ipi.version != my_ip_network.version:
|
||||
continue
|
||||
|
||||
if ipaddress.ip_network(ipi, strict=False) == my_ip_network:
|
||||
return False
|
||||
|
||||
if ipaddress.ip_network(ipi, strict=False).subnet_of(my_ip_network):
|
||||
return False
|
||||
|
||||
if my_ip_network.subnet_of(ipaddress.ip_network(ipi, strict=False)):
|
||||
return False
|
||||
|
||||
for addr in iface_addresses:
|
||||
ipi = ipaddress.ip_interface(addr)
|
||||
if ipi.version != my_ip_network.version:
|
||||
continue
|
||||
|
||||
if ipaddress.ip_network(ipi, strict=False) == my_ip_network:
|
||||
return True
|
||||
|
||||
if ipaddress.ip_network(ipi, strict=False).subnet_of(my_ip_network):
|
||||
return False
|
||||
|
||||
if my_ip_network.subnet_of(ipaddress.ip_network(ipi, strict=False)):
|
||||
return False
|
||||
|
||||
return True
|
230
vppcfg/config/bondethernet.py
Normal file
230
vppcfg/config/bondethernet.py
Normal file
@ -0,0 +1,230 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
""" A vppcfg configuration module that handles bondethernets """
|
||||
import logging
|
||||
from config import interface
|
||||
from config import mac
|
||||
|
||||
|
||||
def get_bondethernets(yaml):
|
||||
"""Return a list of all bondethernets."""
|
||||
ret = []
|
||||
if "bondethernets" in yaml:
|
||||
for ifname, _iface in yaml["bondethernets"].items():
|
||||
ret.append(ifname)
|
||||
return ret
|
||||
|
||||
|
||||
def get_by_name(yaml, ifname):
|
||||
"""Return the BondEthernet by name, if it exists. Return None,None otherwise."""
|
||||
try:
|
||||
if ifname in yaml["bondethernets"]:
|
||||
return ifname, yaml["bondethernets"][ifname]
|
||||
except KeyError:
|
||||
pass
|
||||
return None, None
|
||||
|
||||
|
||||
def is_bondethernet(yaml, ifname):
|
||||
"""Returns True if the interface name is an existing BondEthernet."""
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
return iface is not None
|
||||
|
||||
|
||||
def is_bond_member(yaml, ifname):
|
||||
"""Returns True if this interface is a member of a BondEthernet."""
|
||||
if not "bondethernets" in yaml:
|
||||
return False
|
||||
|
||||
for _bond, iface in yaml["bondethernets"].items():
|
||||
if not "interfaces" in iface:
|
||||
continue
|
||||
if ifname in iface["interfaces"]:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def get_mode(yaml, ifname):
|
||||
"""Return the mode of the BondEthernet as a string, defaulting to 'lacp'
|
||||
if no mode is given. Return None if the bond interface doesn't exist.
|
||||
|
||||
Return values: 'round-robin','active-backup','broadcast','lacp','xor'
|
||||
"""
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
if not iface:
|
||||
return None
|
||||
|
||||
if not "mode" in iface:
|
||||
return "lacp"
|
||||
return iface["mode"]
|
||||
|
||||
|
||||
def mode_to_int(mode):
|
||||
"""Returns the integer representation in VPP of a given bondethernet mode,
|
||||
or -1 if 'mode' is not a valid string.
|
||||
|
||||
See src/vnet/bonding/bond.api and schema.yaml for valid pairs."""
|
||||
|
||||
ret = {"round-robin": 1, "active-backup": 2, "xor": 3, "broadcast": 4, "lacp": 5}
|
||||
try:
|
||||
return ret[mode]
|
||||
except KeyError:
|
||||
pass
|
||||
return -1
|
||||
|
||||
|
||||
def int_to_mode(mode):
|
||||
"""Returns the string representation in VPP of a given bondethernet mode,
|
||||
or "" if 'mode' is not a valid id.
|
||||
|
||||
See src/vnet/bonding/bond.api and schema.yaml for valid pairs."""
|
||||
|
||||
ret = {1: "round-robin", 2: "active-backup", 3: "xor", 4: "broadcast", 5: "lacp"}
|
||||
try:
|
||||
return ret[mode]
|
||||
except KeyError:
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
def get_lb(yaml, ifname):
|
||||
"""Return the loadbalance strategy of the BondEthernet as a string. Only
|
||||
'xor' and 'lacp' modes have loadbalance strategies, so return None if
|
||||
those modes are not used.
|
||||
|
||||
Return values: 'l2', 'l23', 'l34', with 'l34' being the default if
|
||||
the bond is in xor/lacp mode without a load-balance strategy set
|
||||
explicitly."""
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
if not iface:
|
||||
return None
|
||||
mode = get_mode(yaml, ifname)
|
||||
if mode not in ["xor", "lacp"]:
|
||||
return None
|
||||
|
||||
if not "load-balance" in iface:
|
||||
return "l34"
|
||||
return iface["load-balance"]
|
||||
|
||||
|
||||
def lb_to_int(loadbalance):
|
||||
"""Returns the integer representation in VPP of a given load-balance strategy,
|
||||
or -1 if 'lb' is not a valid string.
|
||||
|
||||
See src/vnet/bonding/bond.api and schema.yaml for valid pairs, although
|
||||
bond.api defined more than we use in vppcfg."""
|
||||
|
||||
ret = {
|
||||
"l2": 0,
|
||||
"l34": 1,
|
||||
"l23": 2,
|
||||
"round-robin": 3,
|
||||
"broadcast": 4,
|
||||
"active-backup": 5,
|
||||
}
|
||||
try:
|
||||
return ret[loadbalance]
|
||||
except KeyError:
|
||||
pass
|
||||
return -1
|
||||
|
||||
|
||||
def int_to_lb(loadbalance):
|
||||
"""Returns the string representation in VPP of a given load-balance strategy,
|
||||
or "" if 'lb' is not a valid int.
|
||||
|
||||
See src/vnet/bonding/bond.api and schema.yaml for valid pairs, although
|
||||
bond.api defined more than we use in vppcfg."""
|
||||
|
||||
ret = {
|
||||
0: "l2",
|
||||
1: "l34",
|
||||
2: "l23",
|
||||
3: "round-robin",
|
||||
4: "broadcast",
|
||||
5: "active-backup",
|
||||
}
|
||||
try:
|
||||
return ret[loadbalance]
|
||||
except KeyError:
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
def validate_bondethernets(yaml):
|
||||
"""Validate the semantics of all YAML 'bondethernets' entries"""
|
||||
result = True
|
||||
msgs = []
|
||||
logger = logging.getLogger("vppcfg.config")
|
||||
logger.addHandler(logging.NullHandler())
|
||||
|
||||
if not "bondethernets" in yaml:
|
||||
return result, msgs
|
||||
|
||||
for ifname, iface in yaml["bondethernets"].items():
|
||||
logger.debug(f"bondethernet {ifname}: {iface}")
|
||||
bond_ifname, bond_iface = interface.get_by_name(yaml, ifname)
|
||||
bond_mtu = 1500
|
||||
if not bond_iface:
|
||||
msgs.append(f"bondethernet {ifname} does not exist in interfaces")
|
||||
result = False
|
||||
else:
|
||||
bond_mtu = interface.get_mtu(yaml, bond_ifname)
|
||||
instance = int(ifname[12:])
|
||||
if instance > 4294967294:
|
||||
msgs.append(
|
||||
f"bondethernet {ifname} has instance {int(instance)} which is too large"
|
||||
)
|
||||
result = False
|
||||
if (
|
||||
get_mode(yaml, bond_ifname) not in ["xor", "lacp"]
|
||||
and "load-balance" in iface
|
||||
):
|
||||
msgs.append(
|
||||
f"bondethernet {ifname} can only have load-balance if in mode XOR or LACP"
|
||||
)
|
||||
result = False
|
||||
if "mac" in iface and mac.is_multicast(iface["mac"]):
|
||||
msgs.append(
|
||||
f"bondethernet {ifname} MAC address {iface['mac']} cannot be multicast"
|
||||
)
|
||||
result = False
|
||||
|
||||
if not "interfaces" in iface:
|
||||
continue
|
||||
|
||||
for member in iface["interfaces"]:
|
||||
if (None, None) == interface.get_by_name(yaml, member):
|
||||
msgs.append(f"bondethernet {ifname} member {member} does not exist")
|
||||
result = False
|
||||
continue
|
||||
|
||||
if interface.has_sub(yaml, member):
|
||||
msgs.append(
|
||||
f"bondethernet {ifname} member {member} has sub-interface(s)"
|
||||
)
|
||||
result = False
|
||||
if interface.has_lcp(yaml, member):
|
||||
msgs.append(f"bondethernet {ifname} member {member} has an LCP")
|
||||
result = False
|
||||
if interface.has_address(yaml, member):
|
||||
msgs.append(f"bondethernet {ifname} member {member} has an address")
|
||||
result = False
|
||||
member_mtu = interface.get_mtu(yaml, member)
|
||||
if member_mtu != bond_mtu:
|
||||
msgs.append(
|
||||
f"bondethernet {ifname} member {member} MTU {int(member_mtu)} does not match BondEthernet MTU {int(bond_mtu)}"
|
||||
)
|
||||
result = False
|
||||
return result, msgs
|
187
vppcfg/config/bridgedomain.py
Normal file
187
vppcfg/config/bridgedomain.py
Normal file
@ -0,0 +1,187 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
""" A vppcfg configuration module that handles bridgedomains """
|
||||
import logging
|
||||
from config import interface
|
||||
from config import loopback
|
||||
|
||||
|
||||
def get_bridgedomains(yaml):
|
||||
"""Return a list of all bridgedomains."""
|
||||
ret = []
|
||||
if not "bridgedomains" in yaml:
|
||||
return ret
|
||||
for ifname, _iface in yaml["bridgedomains"].items():
|
||||
ret.append(ifname)
|
||||
return ret
|
||||
|
||||
|
||||
def get_by_name(yaml, ifname):
|
||||
"""Return the BridgeDomain by name (bd*), if it exists. Return None,None otherwise."""
|
||||
try:
|
||||
if ifname in yaml["bridgedomains"]:
|
||||
return ifname, yaml["bridgedomains"][ifname]
|
||||
except KeyError:
|
||||
pass
|
||||
return None, None
|
||||
|
||||
|
||||
def is_bridgedomain(yaml, ifname):
|
||||
"""Returns True if the name (bd*) is an existing bridgedomain."""
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
return iface is not None
|
||||
|
||||
|
||||
def get_bridge_interfaces(yaml):
|
||||
"""Returns a list of all interfaces that are bridgedomain members"""
|
||||
|
||||
ret = []
|
||||
if not "bridgedomains" in yaml:
|
||||
return ret
|
||||
|
||||
for _ifname, iface in yaml["bridgedomains"].items():
|
||||
if "interfaces" in iface:
|
||||
ret.extend(iface["interfaces"])
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
def is_bridge_interface_unique(yaml, ifname):
|
||||
"""Returns True if this interface is referenced in bridgedomains zero or one times"""
|
||||
|
||||
ifs = get_bridge_interfaces(yaml)
|
||||
return ifs.count(ifname) < 2
|
||||
|
||||
|
||||
def is_bridge_interface(yaml, ifname):
|
||||
"""Returns True if this interface is a member of a BridgeDomain"""
|
||||
|
||||
return ifname in get_bridge_interfaces(yaml)
|
||||
|
||||
|
||||
def bvi_unique(yaml, bviname):
|
||||
"""Returns True if the BVI identified by bviname is unique among all BridgeDomains."""
|
||||
if not "bridgedomains" in yaml:
|
||||
return True
|
||||
ncount = 0
|
||||
for _ifname, iface in yaml["bridgedomains"].items():
|
||||
if "bvi" in iface and iface["bvi"] == bviname:
|
||||
ncount += 1
|
||||
return ncount < 2
|
||||
|
||||
|
||||
def get_settings(yaml, ifname):
|
||||
"""Return a dictionary of 'settings' including their VPP defaults, for the
|
||||
bridgedomain identified by 'ifname' (bd10)"""
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
if not iface:
|
||||
return None
|
||||
|
||||
settings = {
|
||||
"learn": True,
|
||||
"unicast-flood": True,
|
||||
"unknown-unicast-flood": True,
|
||||
"unicast-forward": True,
|
||||
"arp-termination": False,
|
||||
"arp-unicast-forward": False,
|
||||
"mac-age-minutes": 0, ## 0 means disabled
|
||||
}
|
||||
if "settings" in iface:
|
||||
if "learn" in iface["settings"]:
|
||||
settings["learn"] = iface["settings"]["learn"]
|
||||
if "unicast-flood" in iface["settings"]:
|
||||
settings["unicast-flood"] = iface["settings"]["unicast-flood"]
|
||||
if "unknown-unicast-flood" in iface["settings"]:
|
||||
settings["unknown-unicast-flood"] = iface["settings"][
|
||||
"unknown-unicast-flood"
|
||||
]
|
||||
if "unicast-forward" in iface["settings"]:
|
||||
settings["unicast-forward"] = iface["settings"]["unicast-forward"]
|
||||
if "arp-termination" in iface["settings"]:
|
||||
settings["arp-termination"] = iface["settings"]["arp-termination"]
|
||||
if "arp-unicast-forward" in iface["settings"]:
|
||||
settings["arp-unicast-forward"] = iface["settings"]["arp-unicast-forward"]
|
||||
if "mac-age-minutes" in iface["settings"]:
|
||||
settings["mac-age-minutes"] = int(iface["settings"]["mac-age-minutes"])
|
||||
return settings
|
||||
|
||||
|
||||
def validate_bridgedomains(yaml):
|
||||
"""Validate the semantics of all YAML 'bridgedomains' entries"""
|
||||
result = True
|
||||
msgs = []
|
||||
logger = logging.getLogger("vppcfg.config")
|
||||
logger.addHandler(logging.NullHandler())
|
||||
|
||||
if not "bridgedomains" in yaml:
|
||||
return result, msgs
|
||||
|
||||
for ifname, iface in yaml["bridgedomains"].items():
|
||||
logger.debug(f"bridgedomain {iface}")
|
||||
bd_mtu = 1500
|
||||
if "mtu" in iface:
|
||||
bd_mtu = iface["mtu"]
|
||||
instance = int(ifname[2:])
|
||||
if instance == 0:
|
||||
msgs.append(f"bridgedomain {ifname} is reserved")
|
||||
result = False
|
||||
elif instance > 16777215:
|
||||
msgs.append(
|
||||
f"bridgedomain {ifname} has instance {int(instance)} which is too large"
|
||||
)
|
||||
result = False
|
||||
|
||||
if "bvi" in iface:
|
||||
bvi_ifname, bvi_iface = loopback.get_by_name(yaml, iface["bvi"])
|
||||
if not bvi_unique(yaml, bvi_ifname):
|
||||
msgs.append(f"bridgedomain {ifname} BVI {bvi_ifname} is not unique")
|
||||
result = False
|
||||
if not bvi_iface:
|
||||
msgs.append(f"bridgedomain {ifname} BVI {bvi_ifname} does not exist")
|
||||
result = False
|
||||
continue
|
||||
|
||||
bvi_mtu = 1500
|
||||
if "mtu" in bvi_iface:
|
||||
bvi_mtu = bvi_iface["mtu"]
|
||||
if bvi_mtu != bd_mtu:
|
||||
msgs.append(
|
||||
f"bridgedomain {ifname} BVI {bvi_ifname} has MTU {int(bvi_mtu)}, while bridge has {int(bd_mtu)}"
|
||||
)
|
||||
result = False
|
||||
|
||||
if "interfaces" in iface:
|
||||
for member in iface["interfaces"]:
|
||||
if (None, None) == interface.get_by_name(yaml, member):
|
||||
msgs.append(f"bridgedomain {ifname} member {member} does not exist")
|
||||
result = False
|
||||
continue
|
||||
|
||||
if not is_bridge_interface_unique(yaml, member):
|
||||
msgs.append(f"bridgedomain {ifname} member {member} is not unique")
|
||||
result = False
|
||||
if interface.has_lcp(yaml, member):
|
||||
msgs.append(f"bridgedomain {ifname} member {member} has an LCP")
|
||||
result = False
|
||||
if interface.has_address(yaml, member):
|
||||
msgs.append(f"bridgedomain {ifname} member {member} has an address")
|
||||
result = False
|
||||
member_mtu = interface.get_mtu(yaml, member)
|
||||
if member_mtu != bd_mtu:
|
||||
msgs.append(
|
||||
f"bridgedomain {ifname} member {member} has MTU {int(member_mtu)}, while bridge has {int(bd_mtu)}"
|
||||
)
|
||||
result = False
|
||||
|
||||
return result, msgs
|
692
vppcfg/config/interface.py
Normal file
692
vppcfg/config/interface.py
Normal file
@ -0,0 +1,692 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
""" A vppcfg configuration module that validates interfaces """
|
||||
import logging
|
||||
from config import bondethernet
|
||||
from config import bridgedomain
|
||||
from config import loopback
|
||||
from config import vxlan_tunnel
|
||||
from config import lcp
|
||||
from config import address
|
||||
from config import mac
|
||||
from config import tap
|
||||
|
||||
|
||||
def get_qinx_parent_by_name(yaml, ifname):
|
||||
"""Returns the sub-interface which matches a QinAD or QinQ outer tag, or None,None
|
||||
if that sub-interface doesn't exist."""
|
||||
|
||||
if not is_qinx(yaml, ifname):
|
||||
return None, None
|
||||
_qinx_ifname, qinx_iface = get_by_name(yaml, ifname)
|
||||
if not qinx_iface:
|
||||
return None, None
|
||||
|
||||
qinx_encap = get_encapsulation(yaml, ifname)
|
||||
if not qinx_encap:
|
||||
return None, None
|
||||
|
||||
parent_ifname, parent_iface = get_parent_by_name(yaml, ifname)
|
||||
if not parent_iface:
|
||||
return None, None
|
||||
|
||||
for subid, sub_iface in parent_iface["sub-interfaces"].items():
|
||||
sub_ifname = f"{parent_ifname}.{int(subid)}"
|
||||
sub_encap = get_encapsulation(yaml, sub_ifname)
|
||||
if not sub_encap:
|
||||
continue
|
||||
if qinx_encap["dot1q"] > 0 and sub_encap["dot1q"] == qinx_encap["dot1q"]:
|
||||
return sub_ifname, sub_iface
|
||||
if qinx_encap["dot1ad"] > 0 and sub_encap["dot1ad"] == qinx_encap["dot1ad"]:
|
||||
return sub_ifname, sub_iface
|
||||
return None, None
|
||||
|
||||
|
||||
def get_parent_by_name(yaml, ifname):
|
||||
"""Returns the sub-interface's parent, or None,None if the sub-int doesn't exist."""
|
||||
if not ifname:
|
||||
return None, None
|
||||
|
||||
try:
|
||||
parent_ifname, subid = ifname.split(".")
|
||||
subid = int(subid)
|
||||
iface = yaml["interfaces"][parent_ifname]
|
||||
return parent_ifname, iface
|
||||
except KeyError:
|
||||
pass
|
||||
except ValueError:
|
||||
pass
|
||||
return None, None
|
||||
|
||||
|
||||
def get_by_lcp_name(yaml, lcpname):
|
||||
"""Returns the interface or sub-interface by a given lcp name, or None,None if it does not exist"""
|
||||
if not "interfaces" in yaml:
|
||||
return None, None
|
||||
for ifname, iface in yaml["interfaces"].items():
|
||||
if "lcp" in iface and iface["lcp"] == lcpname:
|
||||
return ifname, iface
|
||||
if not "sub-interfaces" in iface:
|
||||
continue
|
||||
for subid, sub_iface in yaml["interfaces"][ifname]["sub-interfaces"].items():
|
||||
sub_ifname = f"{ifname}.{int(subid)}"
|
||||
if "lcp" in sub_iface and sub_iface["lcp"] == lcpname:
|
||||
return sub_ifname, sub_iface
|
||||
return None, None
|
||||
|
||||
|
||||
def get_by_name(yaml, ifname):
|
||||
"""Returns the interface or sub-interface by a given name, or None,None if it does not exist"""
|
||||
if "." in ifname:
|
||||
try:
|
||||
phy_ifname, subid = ifname.split(".")
|
||||
subid = int(subid)
|
||||
iface = yaml["interfaces"][phy_ifname]["sub-interfaces"][subid]
|
||||
return ifname, iface
|
||||
except ValueError:
|
||||
return None, None
|
||||
except KeyError:
|
||||
return None, None
|
||||
|
||||
try:
|
||||
iface = yaml["interfaces"][ifname]
|
||||
return ifname, iface
|
||||
except KeyError:
|
||||
pass
|
||||
return None, None
|
||||
|
||||
|
||||
def is_sub(yaml, ifname):
|
||||
"""Returns True if this interface is a sub-interface"""
|
||||
_parent_ifname, parent_iface = get_parent_by_name(yaml, ifname)
|
||||
return isinstance(parent_iface, dict)
|
||||
|
||||
|
||||
def has_sub(yaml, ifname):
|
||||
"""Returns True if this interface has sub-interfaces"""
|
||||
if not "interfaces" in yaml:
|
||||
return False
|
||||
|
||||
if ifname in yaml["interfaces"]:
|
||||
iface = yaml["interfaces"][ifname]
|
||||
if "sub-interfaces" in iface and len(iface["sub-interfaces"]) > 0:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def has_address(yaml, ifname):
|
||||
"""Returns True if this interface or sub-interface has one or more addresses"""
|
||||
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
if not iface:
|
||||
return False
|
||||
return "addresses" in iface
|
||||
|
||||
|
||||
def get_l2xc_interfaces(yaml):
|
||||
"""Returns a list of all interfaces that have an L2 CrossConnect"""
|
||||
ret = []
|
||||
if not "interfaces" in yaml:
|
||||
return ret
|
||||
for ifname, iface in yaml["interfaces"].items():
|
||||
if "l2xc" in iface:
|
||||
ret.append(ifname)
|
||||
if "sub-interfaces" in iface:
|
||||
for subid, sub_iface in iface["sub-interfaces"].items():
|
||||
sub_ifname = f"{ifname}.{int(subid)}"
|
||||
if "l2xc" in sub_iface:
|
||||
ret.append(sub_ifname)
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
def is_l2xc_interface(yaml, ifname):
|
||||
"""Returns True if this interface has an L2 CrossConnect"""
|
||||
|
||||
return ifname in get_l2xc_interfaces(yaml)
|
||||
|
||||
|
||||
def get_l2xc_target_interfaces(yaml):
|
||||
"""Returns a list of all interfaces that are the target of an L2 CrossConnect"""
|
||||
ret = []
|
||||
if "interfaces" in yaml:
|
||||
for _ifname, iface in yaml["interfaces"].items():
|
||||
if "l2xc" in iface:
|
||||
ret.append(iface["l2xc"])
|
||||
if "sub-interfaces" in iface:
|
||||
for _subid, sub_iface in iface["sub-interfaces"].items():
|
||||
if "l2xc" in sub_iface:
|
||||
ret.append(sub_iface["l2xc"])
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
def is_l2xc_target_interface(yaml, ifname):
|
||||
"""Returns True if this interface is the target of an L2 CrossConnect"""
|
||||
|
||||
return ifname in get_l2xc_target_interfaces(yaml)
|
||||
|
||||
|
||||
def is_l2xc_target_interface_unique(yaml, ifname):
|
||||
"""Returns True if this interface is referenced as an l2xc target zero or one times"""
|
||||
|
||||
ifs = get_l2xc_target_interfaces(yaml)
|
||||
return ifs.count(ifname) < 2
|
||||
|
||||
|
||||
def has_lcp(yaml, ifname):
|
||||
"""Returns True if this interface or sub-interface has an LCP"""
|
||||
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
if not iface:
|
||||
return False
|
||||
return "lcp" in iface
|
||||
|
||||
|
||||
def valid_encapsulation(yaml, ifname):
|
||||
"""Returns True if the sub interface has a valid encapsulation, or
|
||||
none at all"""
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
if not iface:
|
||||
return True
|
||||
if not "encapsulation" in iface:
|
||||
return True
|
||||
|
||||
encap = iface["encapsulation"]
|
||||
if "dot1ad" in encap and "dot1q" in encap:
|
||||
return False
|
||||
if "inner-dot1q" in encap and not ("dot1ad" in encap or "dot1q" in encap):
|
||||
return False
|
||||
if "exact-match" in encap and not encap["exact-match"] and has_lcp(yaml, ifname):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def get_encapsulation(yaml, ifname):
|
||||
"""Returns the encapsulation of an interface name as a fully formed dictionary:
|
||||
|
||||
dot1q: int (default 0)
|
||||
dot1ad: int (default 0)
|
||||
inner-dot1q: int (default 0)
|
||||
exact-match: bool (default False)
|
||||
|
||||
If the interface is not a sub-int with valid encapsulation, None is returned.
|
||||
"""
|
||||
if not valid_encapsulation(yaml, ifname):
|
||||
return None
|
||||
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
if not iface:
|
||||
return None
|
||||
|
||||
_parent_ifname, parent_iface = get_parent_by_name(yaml, ifname)
|
||||
if not iface or not parent_iface:
|
||||
return None
|
||||
_parent_ifname, subid = ifname.split(".")
|
||||
|
||||
dot1q = 0
|
||||
dot1ad = 0
|
||||
inner_dot1q = 0
|
||||
exact_match = False
|
||||
if not "encapsulation" in iface:
|
||||
dot1q = int(subid)
|
||||
exact_match = True
|
||||
else:
|
||||
if "dot1q" in iface["encapsulation"]:
|
||||
dot1q = iface["encapsulation"]["dot1q"]
|
||||
elif "dot1ad" in iface["encapsulation"]:
|
||||
dot1ad = iface["encapsulation"]["dot1ad"]
|
||||
if "inner-dot1q" in iface["encapsulation"]:
|
||||
inner_dot1q = iface["encapsulation"]["inner-dot1q"]
|
||||
if "exact-match" in iface["encapsulation"]:
|
||||
exact_match = iface["encapsulation"]["exact-match"]
|
||||
|
||||
return {
|
||||
"dot1q": int(dot1q),
|
||||
"dot1ad": int(dot1ad),
|
||||
"inner-dot1q": int(inner_dot1q),
|
||||
"exact-match": bool(exact_match),
|
||||
}
|
||||
|
||||
|
||||
def get_phys(yaml):
|
||||
"""Return a list of all toplevel (ie. non-sub) interfaces which are
|
||||
assumed to be physical network cards, eg TenGigabitEthernet1/0/0. Note
|
||||
that derived/created interfaces such as Tunnels, BondEthernets and
|
||||
Loopbacks are not returned"""
|
||||
ret = []
|
||||
if not "interfaces" in yaml:
|
||||
return ret
|
||||
for ifname, _iface in yaml["interfaces"].items():
|
||||
if is_phy(yaml, ifname):
|
||||
ret.append(ifname)
|
||||
return ret
|
||||
|
||||
|
||||
def is_phy(yaml, ifname):
|
||||
"""Returns True if the ifname is the name of a physical network interface."""
|
||||
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
if iface is None:
|
||||
return False
|
||||
if is_sub(yaml, ifname):
|
||||
return False
|
||||
|
||||
if bondethernet.is_bondethernet(yaml, ifname):
|
||||
return False
|
||||
if loopback.is_loopback(yaml, ifname):
|
||||
return False
|
||||
if vxlan_tunnel.is_vxlan_tunnel(yaml, ifname):
|
||||
return False
|
||||
if tap.is_tap(yaml, ifname):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def get_interfaces(yaml):
|
||||
"""Return a list of all interface and sub-interface names"""
|
||||
ret = []
|
||||
if not "interfaces" in yaml:
|
||||
return ret
|
||||
for ifname, iface in yaml["interfaces"].items():
|
||||
ret.append(ifname)
|
||||
if not "sub-interfaces" in iface:
|
||||
continue
|
||||
for subid, _sub_iface in iface["sub-interfaces"].items():
|
||||
ret.append(f"{ifname}.{int(subid)}")
|
||||
return ret
|
||||
|
||||
|
||||
def get_sub_interfaces(yaml):
|
||||
"""Return all interfaces which are a subinterface."""
|
||||
ret = []
|
||||
for ifname in get_interfaces(yaml):
|
||||
if is_sub(yaml, ifname):
|
||||
ret.append(ifname)
|
||||
return ret
|
||||
|
||||
|
||||
def get_qinx_interfaces(yaml):
|
||||
"""Return all interfaces which are double-tagged, either QinAD or QinQ.
|
||||
These interfaces will always have a valid encapsulation with 'inner-dot1q'
|
||||
set to non-zero.
|
||||
|
||||
Note: this is always a strict subset of get_sub_interfaces()
|
||||
"""
|
||||
ret = []
|
||||
for ifname in get_interfaces(yaml):
|
||||
if not is_sub(yaml, ifname):
|
||||
continue
|
||||
encap = get_encapsulation(yaml, ifname)
|
||||
if not encap:
|
||||
continue
|
||||
if encap["inner-dot1q"] > 0:
|
||||
ret.append(ifname)
|
||||
return ret
|
||||
|
||||
|
||||
def is_qinx(yaml, ifname):
|
||||
"""Returns True if the interface is a double-tagged (QinQ or QinAD) interface"""
|
||||
return ifname in get_qinx_interfaces(yaml)
|
||||
|
||||
|
||||
def unique_encapsulation(yaml, sub_ifname):
|
||||
"""Ensures that for the sub_ifname specified, there exist no other sub-ints on the
|
||||
parent with the same encapsulation."""
|
||||
new_ifname, iface = get_by_name(yaml, sub_ifname)
|
||||
parent_ifname, parent_iface = get_parent_by_name(yaml, new_ifname)
|
||||
if not iface or not parent_iface:
|
||||
return False
|
||||
|
||||
sub_encap = get_encapsulation(yaml, new_ifname)
|
||||
if not sub_encap:
|
||||
return False
|
||||
|
||||
ncount = 0
|
||||
for subid, _sibling_iface in parent_iface["sub-interfaces"].items():
|
||||
sibling_ifname = f"{parent_ifname}.{int(subid)}"
|
||||
sibling_encap = get_encapsulation(yaml, sibling_ifname)
|
||||
if sub_encap == sibling_encap and new_ifname != sibling_ifname:
|
||||
ncount = ncount + 1
|
||||
|
||||
return ncount == 0
|
||||
|
||||
|
||||
def is_l2(yaml, ifname):
|
||||
"""Returns True if the interface is an L2XC source, L2XC target or a member of a bridgedomain"""
|
||||
if bridgedomain.is_bridge_interface(yaml, ifname):
|
||||
return True
|
||||
if is_l2xc_interface(yaml, ifname):
|
||||
return True
|
||||
if is_l2xc_target_interface(yaml, ifname):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def is_l3(yaml, ifname):
|
||||
"""Returns True if the interface exists and is neither l2xc target nor bridgedomain"""
|
||||
return not is_l2(yaml, ifname)
|
||||
|
||||
|
||||
def get_lcp(yaml, ifname):
|
||||
"""Returns the LCP of the interface. If the interface is a sub-interface with L3
|
||||
enabled, synthesize it based on its parent, using smart QinQ syntax.
|
||||
Return None if no LCP can be found."""
|
||||
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
if iface and "lcp" in iface:
|
||||
return iface["lcp"]
|
||||
return None
|
||||
|
||||
|
||||
def get_mtu(yaml, ifname):
|
||||
"""Returns MTU of the interface. If it's not set, return the parent's MTU, and
|
||||
return 1500 if no MTU was set on the sub-int or the parent."""
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
if iface and "mtu" in iface:
|
||||
return iface["mtu"]
|
||||
|
||||
_parent_ifname, parent_iface = get_parent_by_name(yaml, ifname)
|
||||
if parent_iface and "mtu" in parent_iface:
|
||||
return parent_iface["mtu"]
|
||||
return 1500
|
||||
|
||||
|
||||
def get_admin_state(yaml, ifname):
|
||||
"""Return True if the interface admin state should be 'up'. Return False
|
||||
if it does not exist, or if it's set to 'down'."""
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
if not iface:
|
||||
return False
|
||||
if not "state" in iface:
|
||||
return True
|
||||
return iface["state"] == "up"
|
||||
|
||||
|
||||
def validate_interfaces(yaml):
|
||||
"""Validate the semantics of all YAML 'interfaces' entries"""
|
||||
result = True
|
||||
msgs = []
|
||||
logger = logging.getLogger("vppcfg.config")
|
||||
logger.addHandler(logging.NullHandler())
|
||||
|
||||
if not "interfaces" in yaml:
|
||||
return result, msgs
|
||||
|
||||
for ifname, iface in yaml["interfaces"].items():
|
||||
logger.debug(f"interface {iface}")
|
||||
if ifname.startswith("BondEthernet") and (
|
||||
None,
|
||||
None,
|
||||
) == bondethernet.get_by_name(yaml, ifname):
|
||||
msgs.append(f"interface {ifname} does not exist in bondethernets")
|
||||
result = False
|
||||
if ifname.startswith("BondEthernet") and "mac" in iface:
|
||||
msgs.append(
|
||||
f"interface {ifname} is a member of bondethernet, cannot set MAC"
|
||||
)
|
||||
result = False
|
||||
if not "state" in iface:
|
||||
iface["state"] = "up"
|
||||
|
||||
if "mac" in iface and mac.is_multicast(iface["mac"]):
|
||||
msgs.append(
|
||||
f"interface {ifname} MAC address {iface['mac']} cannot be multicast"
|
||||
)
|
||||
result = False
|
||||
|
||||
iface_mtu = get_mtu(yaml, ifname)
|
||||
iface_lcp = get_lcp(yaml, ifname)
|
||||
iface_address = has_address(yaml, ifname)
|
||||
|
||||
if ifname.startswith("tap"):
|
||||
_tap_ifname, tap_iface = tap.get_by_name(yaml, ifname)
|
||||
if not tap_iface:
|
||||
msgs.append(f"interface {ifname} is a TAP but does not exist in taps")
|
||||
result = False
|
||||
elif "mtu" in tap_iface["host"]:
|
||||
host_mtu = tap_iface["host"]["mtu"]
|
||||
if host_mtu != iface_mtu:
|
||||
msgs.append(
|
||||
f"interface {ifname} is a TAP so its MTU {int(iface_mtu)} must match host MTU {int(host_mtu)}"
|
||||
)
|
||||
result = False
|
||||
if iface_address:
|
||||
msgs.append(f"interface {ifname} is a TAP so it cannot have an address")
|
||||
result = False
|
||||
if iface_lcp:
|
||||
msgs.append(f"interface {ifname} is a TAP so it cannot have an LCP")
|
||||
result = False
|
||||
if has_sub(yaml, ifname):
|
||||
msgs.append(
|
||||
f"interface {ifname} is a TAP so it cannot have sub-interfaces"
|
||||
)
|
||||
result = False
|
||||
|
||||
if is_l2(yaml, ifname) and iface_lcp:
|
||||
msgs.append(
|
||||
f"interface {ifname} is in L2 mode but has LCP name {iface_lcp}"
|
||||
)
|
||||
result = False
|
||||
if is_l2(yaml, ifname) and iface_address:
|
||||
msgs.append(f"interface {ifname} is in L2 mode but has an address")
|
||||
result = False
|
||||
if iface_lcp and not lcp.is_unique(yaml, iface_lcp):
|
||||
msgs.append(
|
||||
f"interface {ifname} does not have a unique LCP name {iface_lcp}"
|
||||
)
|
||||
result = False
|
||||
|
||||
if "addresses" in iface:
|
||||
for addr in iface["addresses"]:
|
||||
if not address.is_allowed(yaml, ifname, iface["addresses"], addr):
|
||||
msgs.append(
|
||||
f"interface {ifname} IP address {addr} conflicts with another"
|
||||
)
|
||||
result = False
|
||||
|
||||
if "l2xc" in iface:
|
||||
if has_sub(yaml, ifname):
|
||||
msgs.append(
|
||||
f"interface {ifname} has l2xc so it cannot have sub-interfaces"
|
||||
)
|
||||
result = False
|
||||
if iface_lcp:
|
||||
msgs.append(f"interface {ifname} has l2xc so it cannot have an LCP")
|
||||
result = False
|
||||
if iface_address:
|
||||
msgs.append(f"interface {ifname} has l2xc so it cannot have an address")
|
||||
result = False
|
||||
if (None, None) == get_by_name(yaml, iface["l2xc"]):
|
||||
msgs.append(
|
||||
f"interface {ifname} l2xc target {iface['l2xc']} does not exist"
|
||||
)
|
||||
result = False
|
||||
if iface["l2xc"] == ifname:
|
||||
msgs.append(f"interface {ifname} l2xc target cannot be itself")
|
||||
result = False
|
||||
target_mtu = get_mtu(yaml, iface["l2xc"])
|
||||
if target_mtu != iface_mtu:
|
||||
msgs.append(
|
||||
f"interface {ifname} l2xc target MTU {int(target_mtu)} does not match source MTU {int(iface_mtu)}"
|
||||
)
|
||||
result = False
|
||||
if not is_l2xc_target_interface_unique(yaml, iface["l2xc"]):
|
||||
msgs.append(
|
||||
f"interface {ifname} l2xc target {iface['l2xc']} is not unique"
|
||||
)
|
||||
result = False
|
||||
if bridgedomain.is_bridge_interface(yaml, iface["l2xc"]):
|
||||
msgs.append(
|
||||
f"interface {ifname} l2xc target {iface['l2xc']} is in a bridgedomain"
|
||||
)
|
||||
result = False
|
||||
if has_lcp(yaml, iface["l2xc"]):
|
||||
msgs.append(
|
||||
f"interface {ifname} l2xc target {iface['l2xc']} cannot have an LCP"
|
||||
)
|
||||
result = False
|
||||
if has_address(yaml, iface["l2xc"]):
|
||||
msgs.append(
|
||||
f"interface {ifname} l2xc target {iface['l2xc']} cannot have an address"
|
||||
)
|
||||
result = False
|
||||
|
||||
if has_sub(yaml, ifname):
|
||||
for sub_id, sub_iface in yaml["interfaces"][ifname][
|
||||
"sub-interfaces"
|
||||
].items():
|
||||
logger.debug(f"sub-interface {sub_iface}")
|
||||
sub_ifname = f"{ifname}.{int(sub_id)}"
|
||||
if not sub_iface:
|
||||
msgs.append(f"sub-interface {sub_ifname} has no config")
|
||||
result = False
|
||||
continue
|
||||
|
||||
if not "state" in sub_iface:
|
||||
sub_iface["state"] = "up"
|
||||
if sub_iface["state"] == "up" and iface["state"] == "down":
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} cannot be up if parent {ifname} is down"
|
||||
)
|
||||
result = False
|
||||
|
||||
sub_mtu = get_mtu(yaml, sub_ifname)
|
||||
if sub_mtu > iface_mtu:
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} has MTU {int(sub_iface['mtu'])} higher than parent {ifname} MTU {int(iface_mtu)}"
|
||||
)
|
||||
result = False
|
||||
if is_qinx(yaml, sub_ifname):
|
||||
mid_ifname, mid_iface = get_qinx_parent_by_name(yaml, sub_ifname)
|
||||
mid_mtu = get_mtu(yaml, mid_ifname)
|
||||
if sub_mtu > mid_mtu:
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} has MTU {int(sub_iface['mtu'])} higher than parent {mid_ifname} MTU {int(mid_mtu)}"
|
||||
)
|
||||
result = False
|
||||
|
||||
sub_lcp = get_lcp(yaml, sub_ifname)
|
||||
if is_l2(yaml, sub_ifname) and sub_lcp:
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} is in L2 mode but has LCP name {sub_lcp}"
|
||||
)
|
||||
result = False
|
||||
if sub_lcp and not lcp.is_unique(yaml, sub_lcp):
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} does not have a unique LCP name {sub_lcp}"
|
||||
)
|
||||
result = False
|
||||
if sub_lcp and not iface_lcp:
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} has LCP name {sub_lcp} but {ifname} does not have an LCP"
|
||||
)
|
||||
result = False
|
||||
if sub_lcp and is_qinx(yaml, sub_ifname):
|
||||
mid_ifname, mid_iface = get_qinx_parent_by_name(yaml, sub_ifname)
|
||||
if not mid_iface:
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} is QinX and has LCP name {sub_lcp} which requires a parent"
|
||||
)
|
||||
result = False
|
||||
elif not get_lcp(yaml, mid_ifname):
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} is QinX and has LCP name {sub_lcp} but {mid_ifname} does not have an LCP"
|
||||
)
|
||||
result = False
|
||||
|
||||
encap = get_encapsulation(yaml, sub_ifname)
|
||||
if sub_lcp and (not encap or not encap["exact-match"]):
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} has LCP name {sub_lcp} but its encapsulation is not exact-match"
|
||||
)
|
||||
result = False
|
||||
|
||||
if has_address(yaml, sub_ifname):
|
||||
if not encap or not encap["exact-match"]:
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} has an address but its encapsulation is not exact-match"
|
||||
)
|
||||
result = False
|
||||
if is_l2(yaml, sub_ifname):
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} is in L2 mode but has an address"
|
||||
)
|
||||
result = False
|
||||
for addr in sub_iface["addresses"]:
|
||||
if not address.is_allowed(
|
||||
yaml, sub_ifname, sub_iface["addresses"], addr
|
||||
):
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} IP address {addr} conflicts with another"
|
||||
)
|
||||
result = False
|
||||
if not valid_encapsulation(yaml, sub_ifname):
|
||||
msgs.append(f"sub-interface {sub_ifname} has invalid encapsulation")
|
||||
result = False
|
||||
elif not unique_encapsulation(yaml, sub_ifname):
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} does not have unique encapsulation"
|
||||
)
|
||||
result = False
|
||||
if "l2xc" in sub_iface:
|
||||
if has_lcp(yaml, sub_ifname):
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} has l2xc so it cannot have an LCP"
|
||||
)
|
||||
result = False
|
||||
if has_address(yaml, sub_ifname):
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} has l2xc so it cannot have an address"
|
||||
)
|
||||
result = False
|
||||
if (None, None) == get_by_name(yaml, sub_iface["l2xc"]):
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} l2xc target {sub_iface['l2xc']} does not exist"
|
||||
)
|
||||
result = False
|
||||
if sub_iface["l2xc"] == sub_ifname:
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} l2xc target cannot be itself"
|
||||
)
|
||||
result = False
|
||||
target_mtu = get_mtu(yaml, sub_iface["l2xc"])
|
||||
if target_mtu != sub_mtu:
|
||||
msgs.append(
|
||||
f"sub-interface {ifname} l2xc target MTU {int(target_mtu)} does not match source MTU {int(sub_mtu)}"
|
||||
)
|
||||
result = False
|
||||
if not is_l2xc_target_interface_unique(yaml, sub_iface["l2xc"]):
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} l2xc target {sub_iface['l2xc']} is not unique"
|
||||
)
|
||||
result = False
|
||||
if bridgedomain.is_bridge_interface(yaml, sub_iface["l2xc"]):
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} l2xc target {sub_iface['l2xc']} is in a bridgedomain"
|
||||
)
|
||||
result = False
|
||||
if has_lcp(yaml, sub_iface["l2xc"]):
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} l2xc target {sub_iface['l2xc']} cannot have an LCP"
|
||||
)
|
||||
result = False
|
||||
if has_address(yaml, sub_iface["l2xc"]):
|
||||
msgs.append(
|
||||
f"sub-interface {sub_ifname} l2xc target {sub_iface['l2xc']} cannot have an address"
|
||||
)
|
||||
result = False
|
||||
|
||||
return result, msgs
|
47
vppcfg/config/lcp.py
Normal file
47
vppcfg/config/lcp.py
Normal file
@ -0,0 +1,47 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
""" A vppcfg configuration module that validates Linux Control Plane (lcp) elements """
|
||||
|
||||
|
||||
def get_lcps(yaml, interfaces=True, loopbacks=True, bridgedomains=True):
|
||||
"""Returns a list of LCPs configured in the system. Optionally (de)select the different
|
||||
types of LCP. Return an empty list if there are none of the given type(s)."""
|
||||
|
||||
ret = []
|
||||
if interfaces and "interfaces" in yaml:
|
||||
for _ifname, iface in yaml["interfaces"].items():
|
||||
if "lcp" in iface:
|
||||
ret.append(iface["lcp"])
|
||||
if "sub-interfaces" in iface:
|
||||
for _subid, sub_iface in iface["sub-interfaces"].items():
|
||||
if "lcp" in sub_iface:
|
||||
ret.append(sub_iface["lcp"])
|
||||
|
||||
if loopbacks and "loopbacks" in yaml:
|
||||
for _ifname, iface in yaml["loopbacks"].items():
|
||||
if "lcp" in iface:
|
||||
ret.append(iface["lcp"])
|
||||
if bridgedomains and "bridgedomains" in yaml:
|
||||
for _ifname, iface in yaml["bridgedomains"].items():
|
||||
if "lcp" in iface:
|
||||
ret.append(iface["lcp"])
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
def is_unique(yaml, lcpname):
|
||||
"""Returns True if there is at most one occurence of the LCP name in the entire config."""
|
||||
|
||||
lcps = get_lcps(yaml)
|
||||
return lcps.count(lcpname) < 2
|
92
vppcfg/config/loopback.py
Normal file
92
vppcfg/config/loopback.py
Normal file
@ -0,0 +1,92 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
""" A vppcfg configuration module that validates loopbacks """
|
||||
import logging
|
||||
from config import lcp
|
||||
from config import address
|
||||
from config import mac
|
||||
|
||||
|
||||
def get_loopbacks(yaml):
|
||||
"""Return a list of all loopbacks."""
|
||||
ret = []
|
||||
if "loopbacks" in yaml:
|
||||
for ifname, _iface in yaml["loopbacks"].items():
|
||||
ret.append(ifname)
|
||||
return ret
|
||||
|
||||
|
||||
def get_by_lcp_name(yaml, lcpname):
|
||||
"""Returns the loopback by a given lcp name, or None,None if it does not exist"""
|
||||
if not "loopbacks" in yaml:
|
||||
return None, None
|
||||
for ifname, iface in yaml["loopbacks"].items():
|
||||
if "lcp" in iface and iface["lcp"] == lcpname:
|
||||
return ifname, iface
|
||||
return None, None
|
||||
|
||||
|
||||
def get_by_name(yaml, ifname):
|
||||
"""Return the loopback by name, if it exists. Return None otherwise."""
|
||||
try:
|
||||
if ifname in yaml["loopbacks"]:
|
||||
return ifname, yaml["loopbacks"][ifname]
|
||||
except KeyError:
|
||||
pass
|
||||
return None, None
|
||||
|
||||
|
||||
def is_loopback(yaml, ifname):
|
||||
"""Returns True if the interface name is an existing loopback."""
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
return iface is not None
|
||||
|
||||
|
||||
def validate_loopbacks(yaml):
|
||||
"""Validate the semantics of all YAML 'loopbacks' entries"""
|
||||
result = True
|
||||
msgs = []
|
||||
logger = logging.getLogger("vppcfg.config")
|
||||
logger.addHandler(logging.NullHandler())
|
||||
|
||||
if not "loopbacks" in yaml:
|
||||
return result, msgs
|
||||
|
||||
for ifname, iface in yaml["loopbacks"].items():
|
||||
logger.debug(f"loopback {iface}")
|
||||
instance = int(ifname[4:])
|
||||
if instance > 4095:
|
||||
msgs.append(
|
||||
f"loopback {ifname} has instance {int(instance)} which is too large"
|
||||
)
|
||||
result = False
|
||||
if "lcp" in iface and not lcp.is_unique(yaml, iface["lcp"]):
|
||||
msgs.append(
|
||||
f"loopback {ifname} does not have a unique LCP name {iface['lcp']}"
|
||||
)
|
||||
result = False
|
||||
if "addresses" in iface:
|
||||
for addr in iface["addresses"]:
|
||||
if not address.is_allowed(yaml, ifname, iface["addresses"], addr):
|
||||
msgs.append(
|
||||
f"loopback {ifname} IP address {addr} conflicts with another"
|
||||
)
|
||||
result = False
|
||||
if "mac" in iface and mac.is_multicast(iface["mac"]):
|
||||
msgs.append(
|
||||
f"loopback {ifname} MAC address {iface['mac']} cannot be multicast"
|
||||
)
|
||||
result = False
|
||||
|
||||
return result, msgs
|
52
vppcfg/config/mac.py
Normal file
52
vppcfg/config/mac.py
Normal file
@ -0,0 +1,52 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
""" A vppcfg configuration module that validates MAC addresses """
|
||||
import netaddr
|
||||
|
||||
|
||||
def is_valid(mac):
|
||||
"""Return True if the string given in `mac` is a valid (6-byte) MAC address,
|
||||
as defined by netaddr.EUI"""
|
||||
try:
|
||||
_addr = netaddr.EUI(mac)
|
||||
except netaddr.core.AddrFormatError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def is_local(mac):
|
||||
"""Return True if a MAC address is a valid locally administered one."""
|
||||
try:
|
||||
addr = netaddr.EUI(mac)
|
||||
except netaddr.core.AddrFormatError:
|
||||
return False
|
||||
return bool(addr.words[0] & 0b10)
|
||||
|
||||
|
||||
def is_multicast(mac):
|
||||
"""Return True if a MAC address is a valid multicast one."""
|
||||
try:
|
||||
addr = netaddr.EUI(mac)
|
||||
except netaddr.core.AddrFormatError:
|
||||
return False
|
||||
return bool(addr.words[0] & 0b01)
|
||||
|
||||
|
||||
def is_unicast(mac):
|
||||
"""Return True if a MAC address is a valid unicast one."""
|
||||
try:
|
||||
addr = netaddr.EUI(mac)
|
||||
except netaddr.core.AddrFormatError:
|
||||
return False
|
||||
return not bool(addr.words[0] & 0b01)
|
118
vppcfg/config/tap.py
Normal file
118
vppcfg/config/tap.py
Normal file
@ -0,0 +1,118 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
""" A vppcfg configuration module that validates taps """
|
||||
import logging
|
||||
from config import mac
|
||||
|
||||
|
||||
def get_taps(yaml):
|
||||
"""Return a list of all taps."""
|
||||
ret = []
|
||||
if "taps" in yaml:
|
||||
for ifname, _iface in yaml["taps"].items():
|
||||
ret.append(ifname)
|
||||
return ret
|
||||
|
||||
|
||||
def get_by_name(yaml, ifname):
|
||||
"""Return the tap by name, if it exists. Return None otherwise."""
|
||||
try:
|
||||
if ifname in yaml["taps"]:
|
||||
return ifname, yaml["taps"][ifname]
|
||||
except KeyError:
|
||||
pass
|
||||
return None, None
|
||||
|
||||
|
||||
def is_tap(yaml, ifname):
|
||||
"""Returns True if the interface name is an existing tap in the config.
|
||||
The TAP has to be explicitly named in the configuration, and notably
|
||||
a TAP belonging to a Linux Control Plane (LCP) will return False.
|
||||
"""
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
return iface is not None
|
||||
|
||||
|
||||
def is_host_name_unique(yaml, hostname):
|
||||
"""Returns True if there is at most one occurence of the given ifname amonst all host-names of TAPs."""
|
||||
if not "taps" in yaml:
|
||||
return True
|
||||
host_names = []
|
||||
for _tap_ifname, tap_iface in yaml["taps"].items():
|
||||
host_names.append(tap_iface["host"]["name"])
|
||||
return host_names.count(hostname) < 2
|
||||
|
||||
|
||||
def validate_taps(yaml):
|
||||
"""Validate the semantics of all YAML 'taps' entries"""
|
||||
result = True
|
||||
msgs = []
|
||||
logger = logging.getLogger("vppcfg.config")
|
||||
logger.addHandler(logging.NullHandler())
|
||||
|
||||
if not "taps" in yaml:
|
||||
return result, msgs
|
||||
|
||||
for ifname, iface in yaml["taps"].items():
|
||||
logger.debug(f"tap {iface}")
|
||||
instance = int(ifname[3:])
|
||||
|
||||
## NOTE(pim): 1024 is not off-by-one, tap1024 is precisely the highest permissible id
|
||||
if instance > 1024:
|
||||
msgs.append(f"tap {ifname} has instance {int(instance)} which is too large")
|
||||
result = False
|
||||
|
||||
if not is_host_name_unique(yaml, iface["host"]["name"]):
|
||||
msgs.append(
|
||||
f"tap {ifname} does not have a unique host name {iface['host']['name']}"
|
||||
)
|
||||
result = False
|
||||
|
||||
if "rx-ring-size" in iface:
|
||||
ncount = iface["rx-ring-size"]
|
||||
if ncount & (ncount - 1) != 0:
|
||||
msgs.append(f"tap {ifname} rx-ring-size must be a power of two")
|
||||
result = False
|
||||
|
||||
if "tx-ring-size" in iface:
|
||||
ncount = iface["tx-ring-size"]
|
||||
if ncount & (ncount - 1) != 0:
|
||||
msgs.append(f"tap {ifname} tx-ring-size must be a power of two")
|
||||
result = False
|
||||
|
||||
if (
|
||||
"namespace-create" in iface["host"]
|
||||
and iface["host"]["namespace-create"]
|
||||
and not "namespace" in iface["host"]
|
||||
):
|
||||
msgs.append(
|
||||
f"tap {ifname} namespace-create can only be set if namespace is set"
|
||||
)
|
||||
result = False
|
||||
|
||||
if (
|
||||
"bridge-create" in iface["host"]
|
||||
and iface["host"]["bridge-create"]
|
||||
and not "bridge" in iface["host"]
|
||||
):
|
||||
msgs.append(f"tap {ifname} bridge-create can only be set if bridge is set")
|
||||
result = False
|
||||
|
||||
if "mac" in iface["host"] and mac.is_multicast(iface["host"]["mac"]):
|
||||
msgs.append(
|
||||
f"tap {ifname} host MAC address {iface['host']['mac']} cannot be multicast"
|
||||
)
|
||||
result = False
|
||||
|
||||
return result, msgs
|
102
vppcfg/config/test_bondethernet.py
Normal file
102
vppcfg/config/test_bondethernet.py
Normal file
@ -0,0 +1,102 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
# -*- coding: utf-8 -*-
|
||||
""" Unit tests for bondethernet """
|
||||
import unittest
|
||||
import yaml
|
||||
import config.bondethernet as bondethernet
|
||||
|
||||
|
||||
class TestBondEthernetMethods(unittest.TestCase):
|
||||
def setUp(self):
|
||||
with open("unittest/test_bondethernet.yaml", "r") as f:
|
||||
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
|
||||
|
||||
def test_get_by_name(self):
|
||||
ifname, iface = bondethernet.get_by_name(self.cfg, "BondEthernet0")
|
||||
self.assertIsNotNone(iface)
|
||||
self.assertEqual("BondEthernet0", ifname)
|
||||
self.assertIn("GigabitEthernet1/0/0", iface["interfaces"])
|
||||
self.assertNotIn("GigabitEthernet2/0/0", iface["interfaces"])
|
||||
|
||||
ifname, iface = bondethernet.get_by_name(self.cfg, "BondEthernet-notexist")
|
||||
self.assertIsNone(iface)
|
||||
self.assertIsNone(ifname)
|
||||
|
||||
def test_members(self):
|
||||
self.assertTrue(bondethernet.is_bond_member(self.cfg, "GigabitEthernet1/0/0"))
|
||||
self.assertTrue(bondethernet.is_bond_member(self.cfg, "GigabitEthernet1/0/1"))
|
||||
self.assertFalse(bondethernet.is_bond_member(self.cfg, "GigabitEthernet2/0/0"))
|
||||
self.assertFalse(
|
||||
bondethernet.is_bond_member(self.cfg, "GigabitEthernet2/0/0.100")
|
||||
)
|
||||
|
||||
def test_is_bondethernet(self):
|
||||
self.assertTrue(bondethernet.is_bondethernet(self.cfg, "BondEthernet0"))
|
||||
self.assertFalse(
|
||||
bondethernet.is_bondethernet(self.cfg, "BondEthernet-notexist")
|
||||
)
|
||||
self.assertFalse(bondethernet.is_bondethernet(self.cfg, "GigabitEthernet1/0/0"))
|
||||
|
||||
def test_enumerators(self):
|
||||
ifs = bondethernet.get_bondethernets(self.cfg)
|
||||
self.assertEqual(len(ifs), 3)
|
||||
self.assertIn("BondEthernet0", ifs)
|
||||
self.assertIn("BondEthernet1", ifs)
|
||||
self.assertIn("BondEthernet2", ifs)
|
||||
self.assertNotIn("BondEthernet-noexist", ifs)
|
||||
|
||||
def test_get_mode(self):
|
||||
self.assertEqual("lacp", bondethernet.get_mode(self.cfg, "BondEthernet0"))
|
||||
self.assertEqual("xor", bondethernet.get_mode(self.cfg, "BondEthernet1"))
|
||||
|
||||
def test_mode_to_int(self):
|
||||
self.assertEqual(1, bondethernet.mode_to_int("round-robin"))
|
||||
self.assertEqual(2, bondethernet.mode_to_int("active-backup"))
|
||||
self.assertEqual(3, bondethernet.mode_to_int("xor"))
|
||||
self.assertEqual(4, bondethernet.mode_to_int("broadcast"))
|
||||
self.assertEqual(5, bondethernet.mode_to_int("lacp"))
|
||||
self.assertEqual(-1, bondethernet.mode_to_int("not-exist"))
|
||||
|
||||
def test_int_to_mode(self):
|
||||
self.assertEqual("round-robin", bondethernet.int_to_mode(1))
|
||||
self.assertEqual("active-backup", bondethernet.int_to_mode(2))
|
||||
self.assertEqual("xor", bondethernet.int_to_mode(3))
|
||||
self.assertEqual("broadcast", bondethernet.int_to_mode(4))
|
||||
self.assertEqual("lacp", bondethernet.int_to_mode(5))
|
||||
self.assertEqual("", bondethernet.int_to_mode(0))
|
||||
self.assertEqual("", bondethernet.int_to_mode(6))
|
||||
|
||||
def test_get_lb(self):
|
||||
self.assertEqual("l34", bondethernet.get_lb(self.cfg, "BondEthernet0"))
|
||||
self.assertEqual("l2", bondethernet.get_lb(self.cfg, "BondEthernet1"))
|
||||
self.assertIsNone(bondethernet.get_lb(self.cfg, "BondEthernet2"))
|
||||
|
||||
def test_lb_to_int(self):
|
||||
self.assertEqual(0, bondethernet.lb_to_int("l2"))
|
||||
self.assertEqual(1, bondethernet.lb_to_int("l34"))
|
||||
self.assertEqual(2, bondethernet.lb_to_int("l23"))
|
||||
self.assertEqual(3, bondethernet.lb_to_int("round-robin"))
|
||||
self.assertEqual(4, bondethernet.lb_to_int("broadcast"))
|
||||
self.assertEqual(5, bondethernet.lb_to_int("active-backup"))
|
||||
self.assertEqual(-1, bondethernet.lb_to_int("not-exist"))
|
||||
|
||||
def test_int_to_lb(self):
|
||||
self.assertEqual("l2", bondethernet.int_to_lb(0))
|
||||
self.assertEqual("l34", bondethernet.int_to_lb(1))
|
||||
self.assertEqual("l23", bondethernet.int_to_lb(2))
|
||||
self.assertEqual("round-robin", bondethernet.int_to_lb(3))
|
||||
self.assertEqual("broadcast", bondethernet.int_to_lb(4))
|
||||
self.assertEqual("active-backup", bondethernet.int_to_lb(5))
|
||||
self.assertEqual("", bondethernet.int_to_lb(-1))
|
97
vppcfg/config/test_bridgedomain.py
Normal file
97
vppcfg/config/test_bridgedomain.py
Normal file
@ -0,0 +1,97 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
# -*- coding: utf-8 -*-
|
||||
""" Unit tests for bridgedomains """
|
||||
import unittest
|
||||
import yaml
|
||||
import config.bridgedomain as bridgedomain
|
||||
|
||||
|
||||
class TestBridgeDomainMethods(unittest.TestCase):
|
||||
def setUp(self):
|
||||
with open("unittest/test_bridgedomain.yaml", "r") as f:
|
||||
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
|
||||
|
||||
def test_get_by_name(self):
|
||||
ifname, iface = bridgedomain.get_by_name(self.cfg, "bd10")
|
||||
self.assertIsNotNone(iface)
|
||||
self.assertEqual("bd10", ifname)
|
||||
self.assertEqual(iface["mtu"], 3000)
|
||||
self.assertIn("BondEthernet0", iface["interfaces"])
|
||||
|
||||
ifname, iface = bridgedomain.get_by_name(self.cfg, "bd-notexist")
|
||||
self.assertIsNone(iface)
|
||||
self.assertIsNone(ifname)
|
||||
|
||||
def test_is_bridgedomain(self):
|
||||
self.assertTrue(bridgedomain.is_bridgedomain(self.cfg, "bd10"))
|
||||
self.assertTrue(bridgedomain.is_bridgedomain(self.cfg, "bd11"))
|
||||
self.assertFalse(bridgedomain.is_bridgedomain(self.cfg, "bd-notexist"))
|
||||
self.assertFalse(bridgedomain.is_bridgedomain(self.cfg, "GigabitEthernet1/0/0"))
|
||||
|
||||
def test_members(self):
|
||||
self.assertTrue(
|
||||
bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet1/0/0")
|
||||
)
|
||||
self.assertTrue(
|
||||
bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet2/0/0.100")
|
||||
)
|
||||
self.assertFalse(
|
||||
bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet3/0/0")
|
||||
)
|
||||
self.assertFalse(
|
||||
bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet3/0/0.100")
|
||||
)
|
||||
|
||||
def test_unique(self):
|
||||
self.assertFalse(
|
||||
bridgedomain.is_bridge_interface_unique(self.cfg, "GigabitEthernet1/0/0")
|
||||
)
|
||||
self.assertTrue(
|
||||
bridgedomain.is_bridge_interface_unique(
|
||||
self.cfg, "GigabitEthernet2/0/0.100"
|
||||
)
|
||||
)
|
||||
|
||||
def test_enumerators(self):
|
||||
ifs = bridgedomain.get_bridge_interfaces(self.cfg)
|
||||
self.assertEqual(len(ifs), 8)
|
||||
self.assertIn("BondEthernet0", ifs)
|
||||
self.assertIn("GigabitEthernet1/0/0", ifs)
|
||||
self.assertIn("GigabitEthernet2/0/0.100", ifs)
|
||||
|
||||
def test_bvi_unique(self):
|
||||
self.assertTrue(bridgedomain.bvi_unique(self.cfg, "loop0"))
|
||||
self.assertFalse(bridgedomain.bvi_unique(self.cfg, "loop1"))
|
||||
self.assertTrue(bridgedomain.bvi_unique(self.cfg, "loop2"))
|
||||
|
||||
def test_get_bridgedomains(self):
|
||||
ifs = bridgedomain.get_bridgedomains(self.cfg)
|
||||
self.assertEqual(len(ifs), 6)
|
||||
|
||||
def test_get_settings(self):
|
||||
settings = bridgedomain.get_settings(self.cfg, "bd1")
|
||||
self.assertIsNone(settings)
|
||||
|
||||
settings = bridgedomain.get_settings(self.cfg, "bd10")
|
||||
self.assertTrue(settings["learn"])
|
||||
self.assertTrue(settings["unknown-unicast-flood"])
|
||||
self.assertTrue(settings["unicast-flood"])
|
||||
self.assertEqual(settings["mac-age-minutes"], 0)
|
||||
|
||||
settings = bridgedomain.get_settings(self.cfg, "bd11")
|
||||
self.assertTrue(settings["learn"])
|
||||
self.assertFalse(settings["unknown-unicast-flood"])
|
||||
self.assertFalse(settings["unicast-flood"])
|
||||
self.assertEqual(settings["mac-age-minutes"], 10)
|
278
vppcfg/config/test_interface.py
Normal file
278
vppcfg/config/test_interface.py
Normal file
@ -0,0 +1,278 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
# -*- coding: utf-8 -*-
|
||||
""" Unit tests for interfaces """
|
||||
import unittest
|
||||
import yaml
|
||||
import config.interface as interface
|
||||
|
||||
|
||||
class TestInterfaceMethods(unittest.TestCase):
|
||||
def setUp(self):
|
||||
with open("unittest/test_interface.yaml", "r") as f:
|
||||
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
|
||||
|
||||
def test_enumerators(self):
|
||||
ifs = interface.get_interfaces(self.cfg)
|
||||
self.assertEqual(len(ifs), 19)
|
||||
self.assertIn("GigabitEthernet1/0/1", ifs)
|
||||
self.assertIn("GigabitEthernet1/0/1.200", ifs)
|
||||
|
||||
ifs = interface.get_sub_interfaces(self.cfg)
|
||||
self.assertEqual(len(ifs), 13)
|
||||
self.assertNotIn("GigabitEthernet1/0/1", ifs)
|
||||
self.assertIn("GigabitEthernet1/0/1.200", ifs)
|
||||
self.assertIn("GigabitEthernet1/0/1.201", ifs)
|
||||
self.assertIn("GigabitEthernet1/0/1.202", ifs)
|
||||
self.assertIn("GigabitEthernet1/0/1.203", ifs)
|
||||
|
||||
ifs = interface.get_qinx_interfaces(self.cfg)
|
||||
self.assertEqual(len(ifs), 3)
|
||||
self.assertNotIn("GigabitEthernet1/0/1.200", ifs)
|
||||
self.assertNotIn("GigabitEthernet1/0/1.202", ifs)
|
||||
self.assertIn("GigabitEthernet1/0/1.201", ifs)
|
||||
self.assertIn("GigabitEthernet1/0/1.203", ifs)
|
||||
|
||||
ifs = interface.get_l2xc_interfaces(self.cfg)
|
||||
self.assertEqual(len(ifs), 3)
|
||||
self.assertIn("GigabitEthernet3/0/0", ifs)
|
||||
self.assertIn("GigabitEthernet3/0/1", ifs)
|
||||
self.assertIn("GigabitEthernet3/0/2.100", ifs)
|
||||
self.assertNotIn("GigabitEthernet3/0/2.200", ifs)
|
||||
|
||||
target_ifs = interface.get_l2xc_target_interfaces(self.cfg)
|
||||
self.assertEqual(len(target_ifs), 3)
|
||||
self.assertIn("GigabitEthernet3/0/0", target_ifs)
|
||||
self.assertIn("GigabitEthernet3/0/1", target_ifs)
|
||||
self.assertNotIn("GigabitEthernet3/0/2.100", target_ifs)
|
||||
self.assertIn("GigabitEthernet3/0/2.200", target_ifs)
|
||||
|
||||
## Since l2xc cannot connect to itself, and the target must exist,
|
||||
## it follows that the same number of l2xc target interfaces must exist.
|
||||
self.assertEqual(len(target_ifs), len(ifs))
|
||||
|
||||
def test_mtu(self):
|
||||
self.assertEqual(interface.get_mtu(self.cfg, "GigabitEthernet1/0/1"), 9216)
|
||||
self.assertEqual(interface.get_mtu(self.cfg, "GigabitEthernet1/0/1.200"), 9000)
|
||||
self.assertEqual(interface.get_mtu(self.cfg, "GigabitEthernet1/0/1.201"), 9216)
|
||||
|
||||
def test_encapsulation(self):
|
||||
self.assertTrue(
|
||||
interface.valid_encapsulation(self.cfg, "GigabitEthernet1/0/1.200")
|
||||
)
|
||||
self.assertTrue(
|
||||
interface.unique_encapsulation(self.cfg, "GigabitEthernet1/0/1.200")
|
||||
)
|
||||
self.assertEqual(
|
||||
interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.200"),
|
||||
{"dot1q": 1000, "dot1ad": 0, "inner-dot1q": 0, "exact-match": False},
|
||||
)
|
||||
self.assertEqual(
|
||||
interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.201"),
|
||||
{"dot1q": 1000, "dot1ad": 0, "inner-dot1q": 1234, "exact-match": False},
|
||||
)
|
||||
self.assertEqual(
|
||||
interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.202"),
|
||||
{"dot1q": 0, "dot1ad": 1000, "inner-dot1q": 0, "exact-match": False},
|
||||
)
|
||||
self.assertEqual(
|
||||
interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.203"),
|
||||
{"dot1q": 0, "dot1ad": 1000, "inner-dot1q": 1000, "exact-match": True},
|
||||
)
|
||||
|
||||
self.assertFalse(
|
||||
interface.valid_encapsulation(self.cfg, "GigabitEthernet1/0/0.100")
|
||||
)
|
||||
self.assertFalse(
|
||||
interface.valid_encapsulation(self.cfg, "GigabitEthernet1/0/0.101")
|
||||
)
|
||||
|
||||
self.assertFalse(
|
||||
interface.unique_encapsulation(self.cfg, "GigabitEthernet1/0/0.102")
|
||||
)
|
||||
self.assertFalse(
|
||||
interface.unique_encapsulation(self.cfg, "GigabitEthernet1/0/0.103")
|
||||
)
|
||||
|
||||
def test_has_sub(self):
|
||||
self.assertTrue(interface.has_sub(self.cfg, "GigabitEthernet1/0/1"))
|
||||
self.assertFalse(interface.has_sub(self.cfg, "GigabitEthernet1/0/1.200"))
|
||||
self.assertFalse(interface.has_sub(self.cfg, "GigabitEthernet2/0/0"))
|
||||
self.assertFalse(interface.has_sub(self.cfg, "GigabitEthernet3/0/0"))
|
||||
|
||||
def test_is_sub(self):
|
||||
self.assertFalse(interface.is_sub(self.cfg, "GigabitEthernet1/0/1"))
|
||||
self.assertTrue(interface.is_sub(self.cfg, "GigabitEthernet1/0/1.200"))
|
||||
|
||||
def test_is_qinx(self):
|
||||
self.assertFalse(interface.is_qinx(self.cfg, "GigabitEthernet1/0/1"))
|
||||
self.assertFalse(interface.is_qinx(self.cfg, "GigabitEthernet1/0/1.200"))
|
||||
self.assertFalse(interface.is_qinx(self.cfg, "GigabitEthernet1/0/1.202"))
|
||||
|
||||
self.assertTrue(interface.is_qinx(self.cfg, "GigabitEthernet1/0/1.201"))
|
||||
self.assertTrue(interface.is_qinx(self.cfg, "GigabitEthernet1/0/1.203"))
|
||||
|
||||
def test_has_lcp(self):
|
||||
self.assertTrue(interface.has_lcp(self.cfg, "GigabitEthernet1/0/1"))
|
||||
self.assertFalse(interface.has_lcp(self.cfg, "GigabitEthernet1/0/0"))
|
||||
|
||||
def test_get_lcp(self):
|
||||
self.assertIsNone(interface.get_lcp(self.cfg, "GigabitEthernet1/0/0"))
|
||||
self.assertIsNone(interface.get_lcp(self.cfg, "GigabitEthernet1/0/0.100"))
|
||||
|
||||
self.assertEqual(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1"), "e1")
|
||||
self.assertEqual(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.100"), "foo")
|
||||
self.assertEqual(
|
||||
interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.101"), "e1.100"
|
||||
)
|
||||
self.assertEqual(
|
||||
interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.102"), "e1.100.100"
|
||||
)
|
||||
|
||||
self.assertIsNone(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.200"))
|
||||
self.assertIsNone(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.201"))
|
||||
self.assertIsNone(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.202"))
|
||||
self.assertIsNone(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.203"))
|
||||
|
||||
def test_address(self):
|
||||
self.assertFalse(interface.has_address(self.cfg, "GigabitEthernet1/0/0"))
|
||||
self.assertFalse(interface.has_address(self.cfg, "GigabitEthernet1/0/0.100"))
|
||||
|
||||
self.assertTrue(interface.has_address(self.cfg, "GigabitEthernet1/0/1"))
|
||||
self.assertTrue(interface.has_address(self.cfg, "GigabitEthernet1/0/1.100"))
|
||||
|
||||
def test_lx2c(self):
|
||||
l2xc_ifs = interface.get_l2xc_interfaces(self.cfg)
|
||||
l2xc_target_ifs = interface.get_l2xc_target_interfaces(self.cfg)
|
||||
|
||||
self.assertIn("GigabitEthernet3/0/0", l2xc_ifs)
|
||||
self.assertIn("GigabitEthernet3/0/0", l2xc_target_ifs)
|
||||
self.assertTrue(interface.is_l2xc_interface(self.cfg, "GigabitEthernet3/0/0"))
|
||||
self.assertTrue(
|
||||
interface.is_l2xc_target_interface(self.cfg, "GigabitEthernet3/0/0")
|
||||
)
|
||||
|
||||
self.assertNotIn("GigabitEthernet2/0/0", l2xc_ifs)
|
||||
self.assertNotIn("GigabitEthernet2/0/0", l2xc_target_ifs)
|
||||
self.assertFalse(interface.is_l2xc_interface(self.cfg, "GigabitEthernet2/0/0"))
|
||||
self.assertFalse(
|
||||
interface.is_l2xc_target_interface(self.cfg, "GigabitEthernet2/0/0")
|
||||
)
|
||||
|
||||
def test_l2(self):
|
||||
self.assertTrue(interface.is_l2(self.cfg, "GigabitEthernet3/0/0"))
|
||||
self.assertFalse(interface.is_l2(self.cfg, "GigabitEthernet1/0/0"))
|
||||
self.assertTrue(interface.is_l2(self.cfg, "GigabitEthernet3/0/2.100"))
|
||||
self.assertTrue(interface.is_l2(self.cfg, "GigabitEthernet3/0/2.200"))
|
||||
|
||||
def test_l3(self):
|
||||
self.assertTrue(interface.is_l3(self.cfg, "GigabitEthernet1/0/0"))
|
||||
self.assertFalse(interface.is_l3(self.cfg, "GigabitEthernet3/0/0"))
|
||||
|
||||
def test_get_by_lcp_name(self):
|
||||
ifname, iface = interface.get_by_lcp_name(self.cfg, "notexist")
|
||||
self.assertIsNone(ifname)
|
||||
self.assertIsNone(iface)
|
||||
|
||||
ifname, iface = interface.get_by_lcp_name(self.cfg, "e1.100.100")
|
||||
self.assertEqual(ifname, "GigabitEthernet1/0/1.102")
|
||||
ifname, iface = interface.get_by_lcp_name(self.cfg, "e2")
|
||||
self.assertEqual(ifname, "GigabitEthernet2/0/0")
|
||||
|
||||
def test_get_by_name(self):
|
||||
ifname, iface = interface.get_by_name(self.cfg, "GigabitEthernet1/0/1.201")
|
||||
self.assertEqual(ifname, "GigabitEthernet1/0/1.201")
|
||||
self.assertIsNotNone(iface)
|
||||
encap = interface.get_encapsulation(self.cfg, ifname)
|
||||
self.assertEqual(
|
||||
encap,
|
||||
{"dot1q": 1000, "dot1ad": 0, "inner-dot1q": 1234, "exact-match": False},
|
||||
)
|
||||
|
||||
ifname, iface = interface.get_by_name(self.cfg, "GigabitEthernet1/0/1.1")
|
||||
self.assertIsNone(ifname)
|
||||
self.assertIsNone(iface)
|
||||
|
||||
def test_get_parent_by_name(self):
|
||||
ifname, iface = interface.get_parent_by_name(
|
||||
self.cfg, "GigabitEthernet1/0/1.201"
|
||||
)
|
||||
self.assertEqual(ifname, "GigabitEthernet1/0/1")
|
||||
self.assertIsNotNone(iface)
|
||||
self.assertNotIn("encapsulation", iface)
|
||||
|
||||
ifname, iface = interface.get_parent_by_name(
|
||||
self.cfg, "GigabitEthernet1/0/1.200"
|
||||
)
|
||||
self.assertEqual(ifname, "GigabitEthernet1/0/1")
|
||||
self.assertIsNotNone(iface)
|
||||
self.assertNotIn("encapsulation", iface)
|
||||
|
||||
ifname, iface = interface.get_parent_by_name(self.cfg, "GigabitEthernet1/0/1")
|
||||
self.assertIsNone(ifname)
|
||||
self.assertIsNone(iface)
|
||||
|
||||
ifname, iface = interface.get_parent_by_name(self.cfg, None)
|
||||
self.assertIsNone(ifname)
|
||||
self.assertIsNone(iface)
|
||||
|
||||
def test_get_qinx_parent_by_name(self):
|
||||
self.assertIsNotNone(
|
||||
interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.202")
|
||||
)
|
||||
self.assertIsNotNone(
|
||||
interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.203")
|
||||
)
|
||||
|
||||
ifname, iface = interface.get_qinx_parent_by_name(
|
||||
self.cfg, "GigabitEthernet1/0/1"
|
||||
)
|
||||
self.assertIsNone(iface)
|
||||
self.assertIsNone(ifname)
|
||||
|
||||
ifname, iface = interface.get_qinx_parent_by_name(
|
||||
self.cfg, "GigabitEthernet1/0/1.100"
|
||||
)
|
||||
self.assertIsNone(iface)
|
||||
self.assertIsNone(ifname)
|
||||
|
||||
ifname, iface = interface.get_qinx_parent_by_name(
|
||||
self.cfg, "GigabitEthernet1/0/1.200"
|
||||
)
|
||||
self.assertIsNone(iface)
|
||||
self.assertIsNone(ifname)
|
||||
|
||||
ifname, iface = interface.get_qinx_parent_by_name(
|
||||
self.cfg, "GigabitEthernet1/0/1.201"
|
||||
)
|
||||
self.assertEqual(ifname, "GigabitEthernet1/0/1.200")
|
||||
|
||||
def test_get_phys(self):
|
||||
phys = interface.get_phys(self.cfg)
|
||||
self.assertEqual(len(phys), 6)
|
||||
self.assertIn("GigabitEthernet1/0/0", phys)
|
||||
self.assertNotIn("GigabitEthernet1/0/0.100", phys)
|
||||
|
||||
def test_is_phy(self):
|
||||
self.assertTrue(interface.is_phy(self.cfg, "GigabitEthernet1/0/0"))
|
||||
self.assertFalse(interface.is_phy(self.cfg, "GigabitEthernet1/0/0.100"))
|
||||
|
||||
def test_get_admin_state(self):
|
||||
self.assertFalse(interface.get_admin_state(self.cfg, "notexist"))
|
||||
self.assertFalse(interface.get_admin_state(self.cfg, "GigabitEthernet2/0/0"))
|
||||
self.assertTrue(interface.get_admin_state(self.cfg, "GigabitEthernet1/0/0"))
|
||||
self.assertTrue(interface.get_admin_state(self.cfg, "GigabitEthernet1/0/0.101"))
|
||||
self.assertFalse(
|
||||
interface.get_admin_state(self.cfg, "GigabitEthernet1/0/0.102")
|
||||
)
|
77
vppcfg/config/test_lcp.py
Normal file
77
vppcfg/config/test_lcp.py
Normal file
@ -0,0 +1,77 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
# -*- coding: utf-8 -*-
|
||||
""" Unit tests for LCPs """
|
||||
import unittest
|
||||
import yaml
|
||||
import config.lcp as lcp
|
||||
import config.interface as interface
|
||||
|
||||
|
||||
class TestLCPMethods(unittest.TestCase):
|
||||
def setUp(self):
|
||||
with open("unittest/test_lcp.yaml", "r") as f:
|
||||
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
|
||||
|
||||
def test_enumerators(self):
|
||||
lcps = lcp.get_lcps(self.cfg)
|
||||
self.assertIn("e1", lcps)
|
||||
self.assertIn("foo", lcps)
|
||||
self.assertIn("e2", lcps)
|
||||
loopback_lcps = lcp.get_lcps(self.cfg, interfaces=False, bridgedomains=False)
|
||||
self.assertIn("thrice", loopback_lcps)
|
||||
self.assertNotIn("e1", loopback_lcps)
|
||||
|
||||
def test_lcp(self):
|
||||
self.assertTrue(lcp.is_unique(self.cfg, "e1"))
|
||||
self.assertTrue(lcp.is_unique(self.cfg, "foo"))
|
||||
self.assertTrue(lcp.is_unique(self.cfg, "notexist"))
|
||||
|
||||
self.assertFalse(lcp.is_unique(self.cfg, "twice"))
|
||||
self.assertFalse(lcp.is_unique(self.cfg, "thrice"))
|
||||
|
||||
def test_qinx(self):
|
||||
qinx_ifname, qinx_iface = interface.get_by_name(
|
||||
self.cfg, "GigabitEthernet1/0/1.201"
|
||||
)
|
||||
mid_ifname, mid_iface = interface.get_qinx_parent_by_name(
|
||||
self.cfg, "GigabitEthernet1/0/1.201"
|
||||
)
|
||||
parent_ifname, parent_iface = interface.get_parent_by_name(
|
||||
self.cfg, "GigabitEthernet1/0/1.201"
|
||||
)
|
||||
|
||||
self.assertEqual(qinx_ifname, "GigabitEthernet1/0/1.201")
|
||||
self.assertEqual(mid_ifname, "GigabitEthernet1/0/1.200")
|
||||
self.assertEqual(parent_ifname, "GigabitEthernet1/0/1")
|
||||
|
||||
qinx_ifname, qinx_iface = interface.get_by_name(
|
||||
self.cfg, "GigabitEthernet1/0/1.201"
|
||||
)
|
||||
mid_ifname, mid_iface = interface.get_qinx_parent_by_name(
|
||||
self.cfg, "GigabitEthernet1/0/1.201"
|
||||
)
|
||||
parent_ifname, parent_iface = interface.get_parent_by_name(
|
||||
self.cfg, "GigabitEthernet1/0/1.201"
|
||||
)
|
||||
|
||||
self.assertEqual(qinx_ifname, "GigabitEthernet1/0/1.201")
|
||||
self.assertEqual(mid_ifname, "GigabitEthernet1/0/1.200")
|
||||
self.assertEqual(parent_ifname, "GigabitEthernet1/0/1")
|
||||
|
||||
ifname, iface = interface.get_qinx_parent_by_name(
|
||||
self.cfg, "GigabitEthernet1/0/1.100"
|
||||
)
|
||||
self.assertIsNone(ifname)
|
||||
self.assertIsNone(iface)
|
51
vppcfg/config/test_loopback.py
Normal file
51
vppcfg/config/test_loopback.py
Normal file
@ -0,0 +1,51 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
# -*- coding: utf-8 -*-
|
||||
""" Unit tests for loopbacks """
|
||||
import unittest
|
||||
import yaml
|
||||
import config.loopback as loopback
|
||||
|
||||
|
||||
class TestLoopbackMethods(unittest.TestCase):
|
||||
def setUp(self):
|
||||
with open("unittest/test_loopback.yaml", "r") as f:
|
||||
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
|
||||
|
||||
def test_get_by_lcp_name(self):
|
||||
ifname, iface = loopback.get_by_lcp_name(self.cfg, "loop56789012345")
|
||||
self.assertIsNotNone(iface)
|
||||
self.assertEqual("loop1", ifname)
|
||||
|
||||
ifname, iface = loopback.get_by_lcp_name(self.cfg, "lcp-noexist")
|
||||
self.assertIsNone(iface)
|
||||
self.assertIsNone(ifname)
|
||||
|
||||
def test_get_by_name(self):
|
||||
ifname, iface = loopback.get_by_name(self.cfg, "loop1")
|
||||
self.assertIsNotNone(iface)
|
||||
self.assertEqual("loop1", ifname)
|
||||
self.assertEqual(iface["mtu"], 2000)
|
||||
|
||||
ifname, iface = loopback.get_by_name(self.cfg, "loop-noexist")
|
||||
self.assertIsNone(ifname)
|
||||
self.assertIsNone(iface)
|
||||
|
||||
def test_enumerators(self):
|
||||
ifs = loopback.get_loopbacks(self.cfg)
|
||||
self.assertEqual(len(ifs), 3)
|
||||
self.assertIn("loop0", ifs)
|
||||
self.assertIn("loop1", ifs)
|
||||
self.assertIn("loop2", ifs)
|
||||
self.assertNotIn("loop-noexist", ifs)
|
37
vppcfg/config/test_mac.py
Normal file
37
vppcfg/config/test_mac.py
Normal file
@ -0,0 +1,37 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
# -*- coding: utf-8 -*-
|
||||
""" Unit tests for MAC addresses """
|
||||
import unittest
|
||||
import config.mac as mac
|
||||
|
||||
|
||||
class TestMACMethods(unittest.TestCase):
|
||||
def test_is_valid(self):
|
||||
self.assertTrue(mac.is_valid("00:01:02:03:04:05"))
|
||||
self.assertTrue(mac.is_valid("00-01-02-03-04-05"))
|
||||
self.assertTrue(mac.is_valid("0001.0203.0405"))
|
||||
self.assertFalse(mac.is_valid("hoi"))
|
||||
|
||||
def test_is_local(self):
|
||||
self.assertTrue(mac.is_local("02:00:00:00:00:00"))
|
||||
self.assertFalse(mac.is_local("00:00:00:00:00:00"))
|
||||
|
||||
def test_is_multicast(self):
|
||||
self.assertTrue(mac.is_multicast("01:00:00:00:00:00"))
|
||||
self.assertFalse(mac.is_multicast("00:00:00:00:00:00"))
|
||||
|
||||
def test_is_unicast(self):
|
||||
self.assertFalse(mac.is_unicast("01:00:00:00:00:00"))
|
||||
self.assertTrue(mac.is_unicast("00:00:00:00:00:00"))
|
51
vppcfg/config/test_tap.py
Normal file
51
vppcfg/config/test_tap.py
Normal file
@ -0,0 +1,51 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
# -*- coding: utf-8 -*-
|
||||
""" Unit tests for taps """
|
||||
import unittest
|
||||
import yaml
|
||||
import config.tap as tap
|
||||
|
||||
|
||||
class TestTAPMethods(unittest.TestCase):
|
||||
def setUp(self):
|
||||
with open("unittest/test_tap.yaml", "r") as f:
|
||||
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
|
||||
|
||||
def test_get_by_name(self):
|
||||
ifname, iface = tap.get_by_name(self.cfg, "tap0")
|
||||
self.assertIsNotNone(iface)
|
||||
self.assertEqual("tap0", ifname)
|
||||
|
||||
ifname, iface = tap.get_by_name(self.cfg, "tap-noexist")
|
||||
self.assertIsNone(ifname)
|
||||
self.assertIsNone(iface)
|
||||
|
||||
def test_is_tap(self):
|
||||
self.assertTrue(tap.is_tap(self.cfg, "tap0"))
|
||||
self.assertTrue(tap.is_tap(self.cfg, "tap1"))
|
||||
self.assertFalse(tap.is_tap(self.cfg, "tap-noexist"))
|
||||
|
||||
def test_is_host_name_unique(self):
|
||||
self.assertTrue(tap.is_host_name_unique(self.cfg, "tap0"))
|
||||
self.assertTrue(tap.is_host_name_unique(self.cfg, "tap1"))
|
||||
self.assertTrue(tap.is_host_name_unique(self.cfg, "tap-noexist"))
|
||||
self.assertFalse(tap.is_host_name_unique(self.cfg, "vpp-tap"))
|
||||
|
||||
def test_enumerators(self):
|
||||
ifs = tap.get_taps(self.cfg)
|
||||
self.assertEqual(len(ifs), 4)
|
||||
self.assertIn("tap0", ifs)
|
||||
self.assertIn("tap1", ifs)
|
||||
self.assertNotIn("tap-noexist", ifs)
|
52
vppcfg/config/test_vxlan_tunnel.py
Normal file
52
vppcfg/config/test_vxlan_tunnel.py
Normal file
@ -0,0 +1,52 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
# -*- coding: utf-8 -*-
|
||||
""" Unit tests for vxlan_tunnels """
|
||||
import unittest
|
||||
import yaml
|
||||
import config.vxlan_tunnel as vxlan_tunnel
|
||||
|
||||
|
||||
class TestVXLANMethods(unittest.TestCase):
|
||||
def setUp(self):
|
||||
with open("unittest/test_vxlan_tunnel.yaml", "r") as f:
|
||||
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
|
||||
|
||||
def test_get_by_name(self):
|
||||
ifname, iface = vxlan_tunnel.get_by_name(self.cfg, "vxlan_tunnel0")
|
||||
self.assertIsNotNone(iface)
|
||||
self.assertEqual("vxlan_tunnel0", ifname)
|
||||
|
||||
ifname, iface = vxlan_tunnel.get_by_name(self.cfg, "vxlan_tunnel-noexist")
|
||||
self.assertIsNone(ifname)
|
||||
self.assertIsNone(iface)
|
||||
|
||||
def test_is_vxlan_tunnel(self):
|
||||
self.assertTrue(vxlan_tunnel.is_vxlan_tunnel(self.cfg, "vxlan_tunnel0"))
|
||||
self.assertFalse(vxlan_tunnel.is_vxlan_tunnel(self.cfg, "vxlan_tunnel-noexist"))
|
||||
self.assertFalse(vxlan_tunnel.is_vxlan_tunnel(self.cfg, "GigabitEthernet1/0/0"))
|
||||
|
||||
def test_enumerators(self):
|
||||
ifs = vxlan_tunnel.get_vxlan_tunnels(self.cfg)
|
||||
self.assertEqual(len(ifs), 4)
|
||||
self.assertIn("vxlan_tunnel0", ifs)
|
||||
self.assertIn("vxlan_tunnel1", ifs)
|
||||
self.assertIn("vxlan_tunnel2", ifs)
|
||||
self.assertIn("vxlan_tunnel3", ifs)
|
||||
self.assertNotIn("vxlan_tunnel-noexist", ifs)
|
||||
|
||||
def test_vni_unique(self):
|
||||
self.assertTrue(vxlan_tunnel.vni_unique(self.cfg, 100))
|
||||
self.assertFalse(vxlan_tunnel.vni_unique(self.cfg, 101))
|
||||
self.assertTrue(vxlan_tunnel.vni_unique(self.cfg, 102))
|
90
vppcfg/config/vxlan_tunnel.py
Normal file
90
vppcfg/config/vxlan_tunnel.py
Normal file
@ -0,0 +1,90 @@
|
||||
#
|
||||
# Copyright (c) 2022 Pim van Pelt
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at:
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
""" A vppcfg configuration module that validates vxlan_tunnels """
|
||||
import logging
|
||||
import ipaddress
|
||||
|
||||
|
||||
def get_by_name(yaml, ifname):
|
||||
"""Return the VXLAN by name, if it exists. Return None otherwise."""
|
||||
try:
|
||||
if ifname in yaml["vxlan_tunnels"]:
|
||||
return ifname, yaml["vxlan_tunnels"][ifname]
|
||||
except KeyError:
|
||||
pass
|
||||
return None, None
|
||||
|
||||
|
||||
def is_vxlan_tunnel(yaml, ifname):
|
||||
"""Returns True if the interface name is an existing VXLAN Tunnel."""
|
||||
ifname, iface = get_by_name(yaml, ifname)
|
||||
return iface is not None
|
||||
|
||||
|
||||
def vni_unique(yaml, vni):
|
||||
"""Return True if the VNI is unique amongst all VXLANs"""
|
||||
if not "vxlan_tunnels" in yaml:
|
||||
return True
|
||||
|
||||
ncount = 0
|
||||
for _ifname, iface in yaml["vxlan_tunnels"].items():
|
||||
if iface["vni"] == vni:
|
||||
ncount = ncount + 1
|
||||
|
||||
return ncount < 2
|
||||
|
||||
|
||||
def get_vxlan_tunnels(yaml):
|
||||
"""Returns a list of all VXLAN tunnel interface names."""
|
||||
ret = []
|
||||
if not "vxlan_tunnels" in yaml:
|
||||
return ret
|
||||
|
||||
for ifname, _iface in yaml["vxlan_tunnels"].items():
|
||||
ret.append(ifname)
|
||||
return ret
|
||||
|
||||
|
||||
def validate_vxlan_tunnels(yaml):
|
||||
"""Validate the semantics of all YAML 'vxlan_tunnels' entries"""
|
||||
result = True
|
||||
msgs = []
|
||||
logger = logging.getLogger("vppcfg.config")
|
||||
logger.addHandler(logging.NullHandler())
|
||||
|
||||
if not "vxlan_tunnels" in yaml:
|
||||
return result, msgs
|
||||
|
||||
for ifname, iface in yaml["vxlan_tunnels"].items():
|
||||
logger.debug(f"vxlan_tunnel {ifname}: {iface}")
|
||||
instance = int(ifname[12:])
|
||||
if instance > 2147483647:
|
||||
msgs.append(
|
||||
f"vxlan_tunnel {ifname} has instance {int(instance)} which is too large"
|
||||
)
|
||||
result = False
|
||||
|
||||
vni = iface["vni"]
|
||||
if not vni_unique(yaml, vni):
|
||||
msgs.append(f"vxlan_tunnel {ifname} VNI {int(vni)} is not unique")
|
||||
result = False
|
||||
local = ipaddress.ip_address(iface["local"])
|
||||
remote = ipaddress.ip_address(iface["remote"])
|
||||
if local.version != remote.version:
|
||||
msgs.append(
|
||||
f"vxlan_tunnel {ifname} local and remote are not the same address family"
|
||||
)
|
||||
result = False
|
||||
|
||||
return result, msgs
|
Reference in New Issue
Block a user