Rename validator/ to config/

This commit is contained in:
Pim van Pelt
2022-03-24 13:56:04 +00:00
parent e8e41098be
commit 672dd65f11
16 changed files with 39 additions and 39 deletions

148
config/__init__.py Normal file
View File

@ -0,0 +1,148 @@
#!/usr/bin/env python
#
# Copyright (c) 2022 Pim van Pelt
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# -*- coding: utf-8 -*-
from __future__ import (
absolute_import,
division,
print_function,
)
import logging
try:
import yamale
except ImportError:
print("ERROR: install yamale manually: sudo pip install yamale")
sys.exit(-2)
from config.loopback import validate_loopbacks
from config.bondethernet import validate_bondethernets
from config.interface import validate_interfaces
from config.bridgedomain import validate_bridgedomains
from config.vxlan_tunnel import validate_vxlan_tunnels
from yamale.validators import DefaultValidators, Validator
import ipaddress
class IPInterfaceWithPrefixLength(Validator):
""" Custom IPAddress config - takes IP/prefixlen as input:
192.0.2.1/29 or 2001:db8::1/64 are correct. The PrefixLength
is required, and must be a number (0-32 for IPv4 and 0-128 for
IPv6).
"""
tag = 'ip_interface'
def _is_valid(self, value):
try:
network = ipaddress.ip_interface(value)
except:
return False
if not isinstance(value, str):
return False
if not '/' in value:
return False
e = value.split('/')
if not len(e) == 2:
return False
if not e[1].isnumeric():
return False
return True
class Validator(object):
def __init__(self, schema):
self.logger = logging.getLogger('vppcfg.config')
self.logger.addHandler(logging.NullHandler())
self.schema = schema
def validate(self, yaml):
ret_rv = True
ret_msgs = []
if not yaml:
return ret_rv, ret_msgs
if self.schema:
try:
self.logger.debug("Validating against schema %s" % self.schema)
validators = DefaultValidators.copy()
validators[IPInterfaceWithPrefixLength.tag] = IPInterfaceWithPrefixLength
schema = yamale.make_schema(self.schema, validators=validators)
data = yamale.make_data(content=str(yaml))
yamale.validate(schema, data)
self.logger.debug("Schema correctly validated by yamale")
except ValueError as e:
ret_rv = False
for result in e.results:
for error in result.errors:
ret_msgs.extend(['yamale: %s' % error])
return ret_rv, ret_msgs
else:
self.logger.warning("Schema validation disabled")
self.logger.debug("Validating Semantics...")
rv, msgs = validate_bondethernets(yaml)
if msgs:
ret_msgs.extend(msgs)
if not rv:
ret_rv = False
rv, msgs = validate_interfaces(yaml)
if msgs:
ret_msgs.extend(msgs)
if not rv:
ret_rv = False
rv, msgs = validate_loopbacks(yaml)
if msgs:
ret_msgs.extend(msgs)
if not rv:
ret_rv = False
rv, msgs = validate_bridgedomains(yaml)
if msgs:
ret_msgs.extend(msgs)
if not rv:
ret_rv = False
rv, msgs = validate_vxlan_tunnels(yaml)
if msgs:
ret_msgs.extend(msgs)
if not rv:
ret_rv = False
if ret_rv:
self.logger.debug("Semantics correctly validated")
return ret_rv, ret_msgs
def valid_config(self, yaml):
""" Validate the given YAML configuration in 'yaml' against syntax
validation given in the yamale 'schema', and all semantic configs.
Returns True if the configuration is valid, False otherwise.
"""
rv, msgs = self.validate(yaml)
if not rv:
for m in msgs:
self.logger.error(m)
return False
self.logger.info("Configuration validated successfully")
return True
def get_phys(self, yaml):
""" Return all PHYs in the config """
return interface.get_phys(yaml)

122
config/address.py Normal file
View File

@ -0,0 +1,122 @@
#
# Copyright (c) 2022 Pim van Pelt
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import config.interface as interface
import ipaddress
def get_all_addresses_except_ifname(yaml, except_ifname):
""" Return a list of all ipaddress.ip_interface() instances in the entire config,
except for those that belong to 'ifname'.
"""
ret = []
if 'interfaces' in yaml:
for ifname, iface in yaml['interfaces'].items():
if ifname == except_ifname:
continue
if 'addresses' in iface:
for a in iface['addresses']:
ret.append(ipaddress.ip_interface(a))
if 'sub-interfaces' in iface:
for subid, sub_iface in iface['sub-interfaces'].items():
sub_ifname = "%s.%d" % (ifname, subid)
if sub_ifname == except_ifname:
continue
if 'addresses' in sub_iface:
for a in sub_iface['addresses']:
ret.append(ipaddress.ip_interface(a))
if 'loopbacks' in yaml:
for ifname, iface in yaml['loopbacks'].items():
if ifname == except_ifname:
continue
if 'addresses' in iface:
for a in iface['addresses']:
ret.append(ipaddress.ip_interface(a))
if 'bridgedomains' in yaml:
for ifname, iface in yaml['bridgedomains'].items():
if ifname == except_ifname:
continue
if 'addresses' in iface:
for a in iface['addresses']:
ret.append(ipaddress.ip_interface(a))
return ret
def is_allowed(yaml, ifname, iface_addresses, ip_interface):
""" Returns True if there is at most one occurence of the ip_interface (an IPv4/IPv6 prefix+len)
in the entire config. That said, we need the 'iface_addresses' because VPP is a bit fickle in
this regard.
IP addresses from the same prefix/len can be added to a given interface (ie 192.0.2.1/24 and
192.0.2.2/24), but other than that, any prefix can not occur as a more-specific or less-specific
of any other interface.
So, we will allow:
- any ip_interface that is of equal network/len of existing one(s) _on the same interface_
And, we will reject
- any ip_interface that is a more specific of any existing one
- any ip_interface that is a less specific of any existing one
Examples:
vpp# set interface ip address loop0 192.0.2.1/24
vpp# set interface ip address loop0 192.0.2.2/24
vpp# set interface ip address loop0 192.0.2.1/29
set interface ip address: failed to add 192.0.2.1/29 on loop0 which conflicts with 192.0.2.1/24 for interface loop0
vpp# set interface ip address loop0 192.0.2.3/23
set interface ip address: failed to add 192.0.2.3/23 on loop0 which conflicts with 192.0.2.1/24 for interface loop0
"""
## print("input: (%s,%s)" % (iface_addresses, ip_interface))
all_other_addresses = get_all_addresses_except_ifname(yaml, ifname)
## print("All IPs: %s" % all_other_addresses)
my_ip_network = ipaddress.ip_network(ip_interface, strict=False)
for ipi in all_other_addresses:
if ipi.version != my_ip_network.version:
continue
if ipaddress.ip_network(ipi, strict=False) == my_ip_network:
## print("Same: %s == %s" % (ip_interface, ipi))
return False
if ipaddress.ip_network(ipi, strict=False).subnet_of(my_ip_network):
## print("More: %s == %s" % (ip_interface, ipi))
return False
if my_ip_network.subnet_of(ipaddress.ip_network(ipi, strict=False)):
## print("Less: %s == %s" % (ip_interface, ipi))
return False
for ip in iface_addresses:
ipi = ipaddress.ip_interface(ip)
if ipi.version != my_ip_network.version:
continue
if ipaddress.ip_network(ipi, strict=False) == my_ip_network:
## print("iface same: %s == %s" % (ip_interface, ipi))
return True
if ipaddress.ip_network(ipi, strict=False).subnet_of(my_ip_network):
## print("iface more: %s == %s" % (ip_interface, ipi))
return False
if my_ip_network.subnet_of(ipaddress.ip_network(ipi, strict=False)):
## print("iface less: %s == %s" % (ip_interface, ipi))
return False
return True

71
config/bondethernet.py Normal file
View File

@ -0,0 +1,71 @@
#
# Copyright (c) 2022 Pim van Pelt
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import config.interface as interface
def get_by_name(yaml, ifname):
""" Return the BondEthernet by name, if it exists. Return None,None otherwise. """
try:
if ifname in yaml['bondethernets']:
return ifname, yaml['bondethernets'][ifname]
except:
pass
return None, None
def is_bondethernet(yaml, ifname):
""" Returns True if the interface name is an existing BondEthernet. """
ifname, iface = get_by_name(yaml, ifname)
return not iface == None
def is_bond_member(yaml, ifname):
""" Returns True if this interface is a member of a BondEthernet. """
if not 'bondethernets' in yaml:
return False
for bond, iface in yaml['bondethernets'].items():
if not 'interfaces' in iface:
continue
if ifname in iface['interfaces']:
return True
return False
def validate_bondethernets(yaml):
result = True
msgs = []
logger = logging.getLogger('vppcfg.config')
logger.addHandler(logging.NullHandler())
if not 'bondethernets' in yaml:
return result, msgs
for ifname, iface in yaml['bondethernets'].items():
logger.debug("bondethernet %s: %s" % (ifname, iface))
for member in iface['interfaces']:
if (None, None) == interface.get_by_name(yaml, member):
msgs.append("bondethernet %s member %s does not exist" % (ifname, member))
result = False
if interface.has_sub(yaml, member):
msgs.append("bondethernet %s member %s has sub-interface(s)" % (ifname, member))
result = False
if interface.has_lcp(yaml, member):
msgs.append("bondethernet %s member %s has an LCP" % (ifname, member))
result = False
if interface.has_address(yaml, member):
msgs.append("bondethernet %s member %s has an address" % (ifname, member))
result = False
return result, msgs

138
config/bridgedomain.py Normal file
View File

@ -0,0 +1,138 @@
#
# Copyright (c) 2022 Pim van Pelt
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import config.interface as interface
import config.lcp as lcp
import config.address as address
def get_bridgedomains(yaml):
""" Return a list of all bridgedomains. """
ret = []
if not 'bridgedomains' in yaml:
return ret
for ifname, iface in yaml['bridgedomains'].items():
ret.append(ifname)
return ret
def get_by_name(yaml, ifname):
""" Return the BridgeDomain by name, if it exists. Return None,None otherwise. """
try:
if ifname in yaml['bridgedomains']:
return ifname, yaml['bridgedomains'][ifname]
except:
pass
return None, None
def is_bridgedomain(yaml, ifname):
""" Returns True if the name is an existing bridgedomain (ie bd[0-9]+). """
ifname, iface = get_by_name(yaml, ifname)
return not iface == None
def is_bvi(yaml, ifname):
""" Returns True if the interface name is an existing BVI. """
if not ifname.startswith("bvi"):
return False
idx = ifname[3:]
if not idx.isnumeric():
return False
bridgename, bridge = get_by_name(yaml, "bd%d" % (int(idx)))
if not bridge:
return False
## If the BridgeDomain has an 'lcp', then it has a Bridge Virtual Interface
if 'lcp' in bridge:
return True
return False
def get_bridge_interfaces(yaml):
""" Returns a list of all interfaces that are bridgedomain members """
ret = []
if not 'bridgedomains' in yaml:
return ret
for ifname, iface in yaml['bridgedomains'].items():
if 'interfaces' in iface:
ret.extend(iface['interfaces'])
return ret
def is_bridge_interface_unique(yaml, ifname):
""" Returns True if this interface is referenced in bridgedomains zero or one times """
ifs = get_bridge_interfaces(yaml)
return ifs.count(ifname) < 2
def is_bridge_interface(yaml, ifname):
""" Returns True if this interface is a member of a BridgeDomain """
return ifname in get_bridge_interfaces(yaml)
def validate_bridgedomains(yaml):
result = True
msgs = []
logger = logging.getLogger('vppcfg.config')
logger.addHandler(logging.NullHandler())
if not 'bridgedomains' in yaml:
return result, msgs
for ifname, iface in yaml['bridgedomains'].items():
logger.debug("bridgedomain %s" % iface)
bd_mtu = 1500
if 'mtu' in iface:
bd_mtu = iface['mtu']
if 'addresses' in iface and not 'lcp' in iface:
msgs.append("bridgedomain %s has an address but no LCP" % ifname)
result = False
if 'lcp' in iface and not lcp.is_unique(yaml, iface['lcp']):
msgs.append("bridgedomain %s does not have a unique LCP name %s" % (ifname, iface['lcp']))
result = False
if 'addresses' in iface:
for a in iface['addresses']:
if not address.is_allowed(yaml, ifname, iface['addresses'], a):
msgs.append("bridgedomain %s IP address %s conflicts with another" % (ifname, a))
result = False
if 'interfaces' in iface:
for member in iface['interfaces']:
if (None, None) == interface.get_by_name(yaml, member):
msgs.append("bridgedomain %s member %s does not exist" % (ifname, member))
result = False
continue
if not is_bridge_interface_unique(yaml, member):
msgs.append("bridgedomain %s member %s is not unique" % (ifname, member))
result = False
if interface.has_lcp(yaml, member):
msgs.append("bridgedomain %s member %s has an LCP" % (ifname, member))
result = False
if interface.has_address(yaml, member):
msgs.append("bridgedomain %s member %s has an address" % (ifname, member))
result = False
member_mtu = interface.get_mtu(yaml, member)
if member_mtu != bd_mtu:
msgs.append("bridgedomain %s member %s has MTU %d, while bridge has %d" % (ifname, member, member_mtu, bd_mtu))
result = False
return result, msgs

541
config/interface.py Normal file
View File

@ -0,0 +1,541 @@
#
# Copyright (c) 2022 Pim van Pelt
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import config.bondethernet as bondethernet
import config.bridgedomain as bridgedomain
import config.loopback as loopback
import config.vxlan_tunnel as vxlan_tunnel
import config.lcp as lcp
import config.address as address
def get_qinx_parent_by_name(yaml, ifname):
""" Returns the sub-interface which matches a QinAD or QinQ outer tag, or None,None
if that sub-interface doesn't exist. """
if not is_qinx(yaml, ifname):
return None, None
qinx_ifname, qinx_iface = get_by_name(yaml, ifname)
if not qinx_iface:
return None,None
qinx_encap = get_encapsulation(yaml, ifname)
if not qinx_encap:
return None,None
parent_ifname, parent_iface = get_parent_by_name(yaml, ifname)
if not parent_iface:
return None,None
for subid, sub_iface in parent_iface['sub-interfaces'].items():
sub_ifname = "%s.%d" % (parent_ifname, subid)
sub_encap = get_encapsulation(yaml, sub_ifname)
if not sub_encap:
continue
if qinx_encap['dot1q'] > 0 and sub_encap['dot1q'] == qinx_encap['dot1q']:
return sub_ifname, sub_iface
if qinx_encap['dot1ad'] > 0 and sub_encap['dot1ad'] == qinx_encap['dot1ad']:
return sub_ifname, sub_iface
return None,None
def get_parent_by_name(yaml, ifname):
""" Returns the sub-interface's parent, or None,None if the sub-int doesn't exist. """
if not '.' in ifname:
return None, None
try:
parent_ifname, subid = ifname.split('.')
subid = int(subid)
iface = yaml['interfaces'][parent_ifname]
return parent_ifname, iface
except:
pass
return None,None
def get_by_name(yaml, ifname):
""" Returns the interface or sub-interface by a given name, or None,None if it does not exist """
if '.' in ifname:
try:
phy_ifname, subid = ifname.split('.')
subid = int(subid)
iface = yaml['interfaces'][phy_ifname]['sub-interfaces'][subid]
return ifname, iface
except:
return None, None
try:
iface = yaml['interfaces'][ifname]
return ifname, iface
except:
pass
return None, None
def is_sub(yaml, ifname):
""" Returns True if this interface is a sub-interface """
parent_ifname, parent_iface = get_parent_by_name(yaml, ifname)
return isinstance(parent_iface, dict)
def has_sub(yaml, ifname):
""" Returns True if this interface has sub-interfaces """
if not 'interfaces' in yaml:
return False
if ifname in yaml['interfaces']:
iface = yaml['interfaces'][ifname]
if 'sub-interfaces' in iface and len(iface['sub-interfaces']) > 0:
return True
return False
def has_address(yaml, ifname):
""" Returns True if this interface or sub-interface has one or more addresses"""
ifname, iface = get_by_name(yaml, ifname)
if not iface:
return False
return 'addresses' in iface
def get_l2xc_interfaces(yaml):
""" Returns a list of all interfaces that have an L2 CrossConnect """
ret = []
if not 'interfaces' in yaml:
return ret
for ifname, iface in yaml['interfaces'].items():
if 'l2xc' in iface:
ret.append(ifname)
if 'sub-interfaces' in iface:
for subid, sub_iface in iface['sub-interfaces'].items():
sub_ifname = "%s.%d" % (ifname, subid)
if 'l2xc' in sub_iface:
ret.append(sub_ifname)
return ret
def is_l2xc_interface(yaml, ifname):
""" Returns True if this interface has an L2 CrossConnect """
return ifname in get_l2xc_interfaces(yaml)
def get_l2xc_target_interfaces(yaml):
""" Returns a list of all interfaces that are the target of an L2 CrossConnect """
ret = []
if 'interfaces' in yaml:
for ifname, iface in yaml['interfaces'].items():
if 'l2xc' in iface:
ret.append(iface['l2xc'])
if 'sub-interfaces' in iface:
for subid, sub_iface in iface['sub-interfaces'].items():
if 'l2xc' in sub_iface:
ret.append(sub_iface['l2xc'])
return ret
def is_l2xc_target_interface(yaml, ifname):
""" Returns True if this interface is the target of an L2 CrossConnect """
return ifname in get_l2xc_target_interfaces(yaml)
def is_l2xc_target_interface_unique(yaml, ifname):
""" Returns True if this interface is referenced as an l2xc target zero or one times """
ifs = get_l2xc_target_interfaces(yaml)
return ifs.count(ifname) < 2
def has_lcp(yaml, ifname):
""" Returns True if this interface or sub-interface has an LCP """
ifname, iface = get_by_name(yaml, ifname)
if not iface:
return False
return 'lcp' in iface
def valid_encapsulation(yaml, ifname):
""" Returns True if the sub interface has a valid encapsulation, or
none at all """
ifname, iface = get_by_name(yaml, ifname)
if not iface:
return True
if not 'encapsulation' in iface:
return True
encap = iface['encapsulation']
if 'dot1ad' in encap and 'dot1q' in encap:
return False
if 'inner-dot1q' in encap and not ('dot1ad' in encap or 'dot1q' in encap):
return False
if 'exact-match' in encap and encap['exact-match'] == False and has_lcp(yaml, ifname):
return False
return True
def get_encapsulation(yaml, ifname):
""" Returns the encapsulation of an interface name as a fully formed dictionary:
dot1q: int (default 0)
dot1ad: int (default 0)
inner-dot1q: int (default 0)
exact-match: bool (default False)
If the interface is not a sub-int with valid encapsulation, None is returned.
"""
if not valid_encapsulation(yaml, ifname):
return None
ifname, iface = get_by_name(yaml, ifname)
if not iface:
return None
parent_ifname, parent_iface = get_parent_by_name(yaml, ifname)
if not iface or not parent_iface:
return None
parent_ifname, subid = ifname.split('.')
dot1q = 0
dot1ad = 0
inner_dot1q = 0
exact_match = False
if not 'encapsulation' in iface:
dot1q = int(subid)
else:
if 'dot1q' in iface['encapsulation']:
dot1q = iface['encapsulation']['dot1q']
elif 'dot1ad' in iface['encapsulation']:
dot1ad = iface['encapsulation']['dot1ad']
if 'inner-dot1q' in iface['encapsulation']:
inner_dot1q = iface['encapsulation']['inner-dot1q']
if 'exact-match' in iface['encapsulation']:
exact_match = iface['encapsulation']['exact-match']
return {
"dot1q": int(dot1q),
"dot1ad": int(dot1ad),
"inner-dot1q": int(inner_dot1q),
"exact-match": bool(exact_match)
}
def get_phys(yaml):
""" Return a list of all toplevel (ie. non-sub) interfaces which are
assumed to be physical network cards, eg TenGigabitEthernet1/0/0. Note
that derived/created interfaces such as Tunnels, BondEthernets and
Loopbacks/BVIs are not returned """
ret = []
if not 'interfaces' in yaml:
return ret
for ifname, iface in yaml['interfaces'].items():
if is_phy(yaml, ifname):
ret.append(ifname)
return ret
def is_phy(yaml, ifname):
""" Returns True if the ifname is the name of a physical network interface. """
ifname, iface = get_by_name(yaml, ifname)
if iface == None:
return False
if is_sub(yaml, ifname):
return False
if bondethernet.is_bondethernet(yaml, ifname):
return False
if loopback.is_loopback(yaml, ifname):
return False
if bridgedomain.is_bvi(yaml, ifname):
return False
if vxlan_tunnel.is_vxlan_tunnel(yaml, ifname):
return False
return True
def get_interfaces(yaml):
""" Return a list of all interface and sub-interface names """
ret = []
if not 'interfaces' in yaml:
return ret
for ifname, iface in yaml['interfaces'].items():
ret.append(ifname)
if not 'sub-interfaces' in iface:
continue
for subid, sub_iface in iface['sub-interfaces'].items():
ret.append("%s.%d" % (ifname, subid))
return ret
def get_sub_interfaces(yaml):
""" Return all interfaces which are a subinterface. """
ret = []
for ifname in get_interfaces(yaml):
if is_sub(yaml, ifname):
ret.append(ifname)
return ret
def get_qinx_interfaces(yaml):
""" Return all interfaces which are double-tagged, either QinAD or QinQ.
These interfaces will always have a valid encapsulation with 'inner-dot1q'
set to non-zero.
Note: this is always a strict subset of get_sub_interfaces()
"""
ret = []
for ifname in get_interfaces(yaml):
if not is_sub(yaml, ifname):
continue
encap = get_encapsulation(yaml, ifname)
if not encap:
continue
if encap['inner-dot1q'] > 0:
ret.append(ifname)
return ret
def is_qinx(yaml, ifname):
""" Returns True if the interface is a double-tagged (QinQ or QinAD) interface """
return ifname in get_qinx_interfaces(yaml)
def unique_encapsulation(yaml, sub_ifname):
""" Ensures that for the sub_ifname specified, there exist no other sub-ints on the
parent with the same encapsulation. """
new_ifname, iface = get_by_name(yaml, sub_ifname)
parent_ifname, parent_iface = get_parent_by_name(yaml, new_ifname)
if not iface or not parent_iface:
return False
sub_encap = get_encapsulation(yaml, new_ifname)
if not sub_encap:
return False
ncount = 0
for subid, sibling_iface in parent_iface['sub-interfaces'].items():
sibling_ifname = "%s.%d" % (parent_ifname, subid)
sibling_encap = get_encapsulation(yaml, sibling_ifname)
if sub_encap == sibling_encap and new_ifname != sibling_ifname:
## print("%s overlaps with %s" % (sub_encap, sibling_encap))
ncount = ncount + 1
return ncount == 0
def is_l2(yaml, ifname):
""" Returns True if the interface is an L2XC source, L2XC target or a member of a bridgedomain """
if bridgedomain.is_bridge_interface(yaml, ifname):
return True
if is_l2xc_interface(yaml, ifname):
return True
if is_l2xc_target_interface(yaml, ifname):
return True
return False
def is_l3(yaml, ifname):
""" Returns True if the interface exists and is neither l2xc target nor bridgedomain """
return not is_l2(yaml, ifname)
def get_lcp(yaml, ifname):
""" Returns the LCP of the interface. If the interface is a sub-interface with L3
enabled, synthesize it based on its parent, using smart QinQ syntax.
Return None if no LCP can be found. """
ifname, iface = get_by_name(yaml, ifname)
if iface and 'lcp' in iface:
return iface['lcp']
return None
def get_mtu(yaml, ifname):
""" Returns MTU of the interface. If it's not set, return the parent's MTU, and
return 1500 if no MTU was set on the sub-int or the parent."""
ifname, iface = get_by_name(yaml, ifname)
if not iface:
return 1500
parent_ifname, parent_iface = get_parent_by_name(yaml, ifname)
try:
return iface['mtu']
return parent_iface['mtu']
except:
pass
return 1500
def validate_interfaces(yaml):
result = True
msgs = []
logger = logging.getLogger('vppcfg.config')
logger.addHandler(logging.NullHandler())
if not 'interfaces' in yaml:
return result, msgs
for ifname, iface in yaml['interfaces'].items():
logger.debug("interface %s" % iface)
if ifname.startswith("BondEthernet") and (None,None) == bondethernet.get_by_name(yaml, ifname):
msgs.append("interface %s does not exist in bondethernets" % ifname)
result = False
iface_mtu = get_mtu(yaml, ifname)
iface_lcp = get_lcp(yaml, ifname)
iface_address = has_address(yaml, ifname)
if iface_address and not iface_lcp:
msgs.append("interface %s has an address but no LCP" % ifname)
result = False
if is_l2(yaml, ifname) and iface_lcp:
msgs.append("interface %s is in L2 mode but has LCP name %s" % (ifname, iface_lcp))
result = False
if is_l2(yaml, ifname) and iface_address:
msgs.append("interface %s is in L2 mode but has an address" % ifname)
result = False
if iface_lcp and not lcp.is_unique(yaml, iface_lcp):
msgs.append("interface %s does not have a unique LCP name %s" % (ifname, iface_lcp))
result = False
if 'addresses' in iface:
for a in iface['addresses']:
if not address.is_allowed(yaml, ifname, iface['addresses'], a):
msgs.append("interface %s IP address %s conflicts with another" % (ifname, a))
result = False
if 'l2xc' in iface:
if has_sub(yaml, ifname):
msgs.append("interface %s has l2xc so it cannot have sub-interfaces" % (ifname))
result = False
if iface_lcp:
msgs.append("interface %s has l2xc so it cannot have an LCP" % (ifname))
result = False
if iface_address:
msgs.append("interface %s has l2xc so it cannot have an address" % (ifname))
result = False
if (None,None) == get_by_name(yaml, iface['l2xc']):
msgs.append("interface %s l2xc target %s does not exist" % (ifname, iface['l2xc']))
result = False
if iface['l2xc'] == ifname:
msgs.append("interface %s l2xc target cannot be itself" % (ifname))
result = False
target_mtu = get_mtu(yaml, iface['l2xc'])
if target_mtu != iface_mtu:
msgs.append("interface %s l2xc target MTU %d does not match source MTU %d" % (ifname, target_mtu, iface_mtu))
result = False
if not is_l2xc_target_interface_unique(yaml, iface['l2xc']):
msgs.append("interface %s l2xc target %s is not unique" % (ifname, iface['l2xc']))
result = False
if bridgedomain.is_bridge_interface(yaml, iface['l2xc']):
msgs.append("interface %s l2xc target %s is in a bridgedomain" % (ifname, iface['l2xc']))
result = False
if has_lcp(yaml, iface['l2xc']):
msgs.append("interface %s l2xc target %s cannot have an LCP" % (ifname, iface['l2xc']))
result = False
if has_address(yaml, iface['l2xc']):
msgs.append("interface %s l2xc target %s cannot have an address" % (ifname, iface['l2xc']))
result = False
if has_sub(yaml, ifname):
for sub_id, sub_iface in yaml['interfaces'][ifname]['sub-interfaces'].items():
logger.debug("sub-interface %s" % sub_iface)
sub_ifname = "%s.%d" % (ifname, sub_id)
if not sub_iface:
msgs.append("sub-interface %s has no config" % (sub_ifname))
result = False
continue
sub_mtu = get_mtu(yaml, sub_ifname)
if sub_mtu > iface_mtu:
msgs.append("sub-interface %s has MTU %d higher than parent MTU %d" % (sub_ifname, sub_iface['mtu'], iface_mtu))
result = False
sub_lcp = get_lcp(yaml, sub_ifname)
if is_l2(yaml, sub_ifname) and sub_lcp:
msgs.append("sub-interface %s is in L2 mode but has LCP name %s" % (sub_ifname, sub_lcp))
result = False
if sub_lcp and not lcp.is_unique(yaml, sub_lcp):
msgs.append("sub-interface %s does not have a unique LCP name %s" % (sub_ifname, sub_lcp))
result = False
if sub_lcp and not iface_lcp:
msgs.append("sub-interface %s has LCP name %s but %s does not have an LCP" % (sub_ifname, sub_lcp, ifname))
result = False
if sub_lcp and is_qinx(yaml, sub_ifname):
mid_ifname, mid_iface = get_qinx_parent_by_name(yaml, sub_ifname)
if not mid_iface:
msgs.append("sub-interface %s is QinX and has LCP name %s which requires a parent" % (sub_ifname, sub_lcp))
result = False
elif not get_lcp(yaml, mid_ifname):
msgs.append("sub-interface %s is QinX and has LCP name %s but %s does not have an LCP" % (sub_ifname, sub_lcp, mid_ifname))
result = False
if sub_lcp and 'encapsulation' in sub_iface and 'exact-match' in sub_iface['encapsulation'] and not sub_iface['encapsulation']['exact-match']:
msgs.append("sub-interface %s has LCP name %s but its encapsulation is not exact-match" % (sub_ifname, sub_lcp))
result = False
if has_address(yaml, sub_ifname):
## The sub_iface lcp is not required: it can be derived from the iface_lcp, which has to be set
if not iface_lcp:
msgs.append("sub-interface %s has an address but %s does not have an LCP" % (sub_ifname, ifname))
result = False
if is_l2(yaml, sub_ifname):
msgs.append("sub-interface %s is in L2 mode but has an address" % sub_ifname)
result = False
for a in sub_iface['addresses']:
if not address.is_allowed(yaml, sub_ifname, sub_iface['addresses'], a):
msgs.append("sub-interface %s IP address %s conflicts with another" % (sub_ifname, a))
result = False
if not valid_encapsulation(yaml, sub_ifname):
msgs.append("sub-interface %s has invalid encapsulation" % (sub_ifname))
result = False
elif not unique_encapsulation(yaml, sub_ifname):
msgs.append("sub-interface %s does not have unique encapsulation" % (sub_ifname))
result = False
if 'l2xc' in sub_iface:
if has_lcp(yaml, sub_ifname):
msgs.append("sub-interface %s has l2xc so it cannot have an LCP" % (sub_ifname))
result = False
if has_address(yaml, sub_ifname):
msgs.append("sub-interface %s has l2xc so it cannot have an address" % (sub_ifname))
result = False
if (None, None) == get_by_name(yaml, sub_iface['l2xc']):
msgs.append("sub-interface %s l2xc target %s does not exist" % (sub_ifname, sub_iface['l2xc']))
result = False
if sub_iface['l2xc'] == sub_ifname:
msgs.append("sub-interface %s l2xc target cannot be itself" % (sub_ifname))
result = False
target_mtu = get_mtu(yaml, sub_iface['l2xc'])
if target_mtu != sub_mtu:
msgs.append("sub-interface %s l2xc target MTU %d does not match source MTU %d" % (ifname, target_mtu, sub_mtu))
result = False
if not is_l2xc_target_interface_unique(yaml, sub_iface['l2xc']):
msgs.append("sub-interface %s l2xc target %s is not unique" % (sub_ifname, sub_iface['l2xc']))
result = False
if bridgedomain.is_bridge_interface(yaml, sub_iface['l2xc']):
msgs.append("sub-interface %s l2xc target %s is in a bridgedomain" % (sub_ifname, sub_iface['l2xc']))
result = False
if has_lcp(yaml, sub_iface['l2xc']):
msgs.append("sub-interface %s l2xc target %s cannot have an LCP" % (sub_ifname, sub_iface['l2xc']))
result = False
if has_address(yaml, sub_iface['l2xc']):
msgs.append("sub-interface %s l2xc target %s cannot have an address" % (sub_ifname, sub_iface['l2xc']))
result = False
return result, msgs

45
config/lcp.py Normal file
View File

@ -0,0 +1,45 @@
#
# Copyright (c) 2022 Pim van Pelt
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
def get_lcps(yaml, interfaces=True, loopbacks=True, bridgedomains=True):
""" Returns a list of LCPs configured in the system. Optionally (de)select the different
types of LCP. Return an empty list if there are none of the given type(s). """
ret = []
if interfaces and 'interfaces' in yaml:
for ifname, iface in yaml['interfaces'].items():
if 'lcp' in iface:
ret.append(iface['lcp'])
if 'sub-interfaces' in iface:
for subid, sub_iface in iface['sub-interfaces'].items():
if 'lcp' in sub_iface:
ret.append(sub_iface['lcp'])
if loopbacks and 'loopbacks' in yaml:
for ifname, iface in yaml['loopbacks'].items():
if 'lcp' in iface:
ret.append(iface['lcp'])
if bridgedomains and 'bridgedomains' in yaml:
for ifname, iface in yaml['bridgedomains'].items():
if 'lcp' in iface:
ret.append(iface['lcp'])
return ret
def is_unique(yaml, lcpname):
""" Returns True if there is at most one occurence of the LCP name in the entire config."""
lcps = get_lcps(yaml)
return lcps.count(lcpname) < 2

66
config/loopback.py Normal file
View File

@ -0,0 +1,66 @@
#
# Copyright (c) 2022 Pim van Pelt
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import config.lcp as lcp
import config.address as address
def get_loopbacks(yaml):
""" Return a list of all loopbacks. """
ret = []
if 'loopbacks' in yaml:
for ifname, iface in yaml['loopbacks'].items():
ret.append(ifname)
return ret
def get_by_name(yaml, ifname):
""" Return the loopback by name, if it exists. Return None otherwise. """
try:
if ifname in yaml['loopbacks']:
return ifname, yaml['loopbacks'][ifname]
except:
pass
return None, None
def is_loopback(yaml, ifname):
""" Returns True if the interface name is an existing loopback. """
ifname, iface = get_by_name(yaml, ifname)
return not iface == None
def validate_loopbacks(yaml):
result = True
msgs = []
logger = logging.getLogger('vppcfg.config')
logger.addHandler(logging.NullHandler())
if not 'loopbacks' in yaml:
return result, msgs
for ifname, iface in yaml['loopbacks'].items():
logger.debug("loopback %s" % iface)
if 'addresses' in iface and not 'lcp' in iface:
msgs.append("loopback %s has an address but no LCP" % ifname)
result = False
if 'lcp' in iface and not lcp.is_unique(yaml, iface['lcp']):
msgs.append("loopback %s does not have a unique LCP name %s" % (ifname, iface['lcp']))
result = False
if 'addresses' in iface:
for a in iface['addresses']:
if not address.is_allowed(yaml, ifname, iface['addresses'], a):
msgs.append("loopback %s IP address %s conflicts with another" % (ifname, a))
result = False
return result, msgs

View File

@ -0,0 +1,30 @@
import unittest
import yaml
import config.bondethernet as bondethernet
class TestBondEthernetMethods(unittest.TestCase):
def setUp(self):
with open("unittest/test_bondethernet.yaml", "r") as f:
self.cfg = yaml.load(f, Loader = yaml.FullLoader)
def test_get_by_name(self):
ifname, iface = bondethernet.get_by_name(self.cfg, "BondEthernet0")
self.assertIsNotNone(iface)
self.assertEqual("BondEthernet0", ifname)
self.assertIn("GigabitEthernet1/0/0", iface['interfaces'])
self.assertNotIn("GigabitEthernet2/0/0", iface['interfaces'])
ifname, iface = bondethernet.get_by_name(self.cfg, "BondEthernet-notexist")
self.assertIsNone(iface)
self.assertIsNone(ifname)
def test_members(self):
self.assertTrue(bondethernet.is_bond_member(self.cfg, "GigabitEthernet1/0/0"))
self.assertTrue(bondethernet.is_bond_member(self.cfg, "GigabitEthernet1/0/1"))
self.assertFalse(bondethernet.is_bond_member(self.cfg, "GigabitEthernet2/0/0"))
self.assertFalse(bondethernet.is_bond_member(self.cfg, "GigabitEthernet2/0/0.100"))
def test_is_bondethernet(self):
self.assertTrue(bondethernet.is_bondethernet(self.cfg, "BondEthernet0"))
self.assertFalse(bondethernet.is_bondethernet(self.cfg, "BondEthernet-notexist"))
self.assertFalse(bondethernet.is_bondethernet(self.cfg, "GigabitEthernet1/0/0"))

View File

@ -0,0 +1,54 @@
import unittest
import yaml
import config.bridgedomain as bridgedomain
class TestBridgeDomainMethods(unittest.TestCase):
def setUp(self):
with open("unittest/test_bridgedomain.yaml", "r") as f:
self.cfg = yaml.load(f, Loader = yaml.FullLoader)
def test_get_by_name(self):
ifname, iface = bridgedomain.get_by_name(self.cfg, "bd10")
self.assertIsNotNone(iface)
self.assertEqual("bd10", ifname)
self.assertEqual(iface['mtu'], 3000)
self.assertIn("BondEthernet0", iface['interfaces'])
ifname, iface = bridgedomain.get_by_name(self.cfg, "bd-notexist")
self.assertIsNone(iface)
self.assertIsNone(ifname)
def test_is_bridgedomain(self):
self.assertTrue(bridgedomain.is_bridgedomain(self.cfg, "bd10"))
self.assertTrue(bridgedomain.is_bridgedomain(self.cfg, "bd11"))
self.assertTrue(bridgedomain.is_bridgedomain(self.cfg, "bd12"))
self.assertFalse(bridgedomain.is_bridgedomain(self.cfg, "bd-notexist"))
self.assertFalse(bridgedomain.is_bridgedomain(self.cfg, "GigabitEthernet1/0/0"))
def test_is_bvi(self):
self.assertFalse(bridgedomain.is_bvi(self.cfg, "bvi10"))
self.assertTrue(bridgedomain.is_bvi(self.cfg, "bvi11"))
self.assertTrue(bridgedomain.is_bvi(self.cfg, "bvi12"))
self.assertFalse(bridgedomain.is_bvi(self.cfg, "bvi-notexist"))
self.assertFalse(bridgedomain.is_bvi(self.cfg, "GigabitEthernet1/0/0"))
def test_members(self):
self.assertTrue(bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet1/0/0"))
self.assertTrue(bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet2/0/0.100"))
self.assertFalse(bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet3/0/0"))
self.assertFalse(bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet3/0/0.100"))
def test_unique(self):
self.assertFalse(bridgedomain.is_bridge_interface_unique(self.cfg, "GigabitEthernet1/0/0"))
self.assertTrue(bridgedomain.is_bridge_interface_unique(self.cfg, "GigabitEthernet2/0/0.100"))
def test_enumerators(self):
ifs = bridgedomain.get_bridge_interfaces(self.cfg)
self.assertEqual(len(ifs), 8)
self.assertIn("BondEthernet0", ifs)
self.assertIn("GigabitEthernet1/0/0", ifs)
self.assertIn("GigabitEthernet2/0/0.100", ifs)
def test_get_bridgedomains(self):
ifs = bridgedomain.get_bridgedomains(self.cfg)
self.assertEqual(len(ifs), 3)

193
config/test_interface.py Normal file
View File

@ -0,0 +1,193 @@
import unittest
import yaml
import config.interface as interface
class TestInterfaceMethods(unittest.TestCase):
def setUp(self):
with open("unittest/test_interface.yaml", "r") as f:
self.cfg = yaml.load(f, Loader = yaml.FullLoader)
def test_enumerators(self):
ifs = interface.get_interfaces(self.cfg)
self.assertEqual(len(ifs), 19)
self.assertIn("GigabitEthernet1/0/1", ifs)
self.assertIn("GigabitEthernet1/0/1.200", ifs)
ifs = interface.get_sub_interfaces(self.cfg)
self.assertEqual(len(ifs), 13)
self.assertNotIn("GigabitEthernet1/0/1", ifs)
self.assertIn("GigabitEthernet1/0/1.200", ifs)
self.assertIn("GigabitEthernet1/0/1.201", ifs)
self.assertIn("GigabitEthernet1/0/1.202", ifs)
self.assertIn("GigabitEthernet1/0/1.203", ifs)
ifs = interface.get_qinx_interfaces(self.cfg)
self.assertEqual(len(ifs), 3)
self.assertNotIn("GigabitEthernet1/0/1.200", ifs)
self.assertNotIn("GigabitEthernet1/0/1.202", ifs)
self.assertIn("GigabitEthernet1/0/1.201", ifs)
self.assertIn("GigabitEthernet1/0/1.203", ifs)
ifs = interface.get_l2xc_interfaces(self.cfg)
self.assertEqual(len(ifs), 3)
self.assertIn("GigabitEthernet3/0/0", ifs)
self.assertIn("GigabitEthernet3/0/1", ifs)
self.assertIn("GigabitEthernet3/0/2.100", ifs)
self.assertNotIn("GigabitEthernet3/0/2.200", ifs)
target_ifs = interface.get_l2xc_target_interfaces(self.cfg)
self.assertEqual(len(target_ifs), 3)
self.assertIn("GigabitEthernet3/0/0", target_ifs)
self.assertIn("GigabitEthernet3/0/1", target_ifs)
self.assertNotIn("GigabitEthernet3/0/2.100", target_ifs)
self.assertIn("GigabitEthernet3/0/2.200", target_ifs)
## Since l2xc cannot connect to itself, and the target must exist,
## it follows that the same number of l2xc target interfaces must exist.
self.assertEqual(len(target_ifs), len(ifs))
def test_mtu(self):
self.assertEqual(interface.get_mtu(self.cfg, "GigabitEthernet1/0/1"), 9216)
self.assertEqual(interface.get_mtu(self.cfg, "GigabitEthernet1/0/1.200"), 9000)
self.assertEqual(interface.get_mtu(self.cfg, "GigabitEthernet1/0/1.201"), 1500)
def test_encapsulation(self):
self.assertTrue(interface.valid_encapsulation(self.cfg, "GigabitEthernet1/0/1.200"))
self.assertTrue(interface.unique_encapsulation(self.cfg, "GigabitEthernet1/0/1.200"))
self.assertEqual(interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.200"),
{ 'dot1q': 1000, 'dot1ad': 0, 'inner-dot1q': 0, 'exact-match': False })
self.assertEqual(interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.201"),
{ 'dot1q': 1000, 'dot1ad': 0, 'inner-dot1q': 1234, 'exact-match': False })
self.assertEqual(interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.202"),
{ 'dot1q': 0, 'dot1ad': 1000, 'inner-dot1q': 0, 'exact-match': False })
self.assertEqual(interface.get_encapsulation(self.cfg, "GigabitEthernet1/0/1.203"),
{ 'dot1q': 0, 'dot1ad': 1000, 'inner-dot1q': 1000, 'exact-match': True })
self.assertFalse(interface.valid_encapsulation(self.cfg, "GigabitEthernet1/0/0.100"))
self.assertFalse(interface.valid_encapsulation(self.cfg, "GigabitEthernet1/0/0.101"))
self.assertFalse(interface.unique_encapsulation(self.cfg, "GigabitEthernet1/0/0.102"))
self.assertFalse(interface.unique_encapsulation(self.cfg, "GigabitEthernet1/0/0.103"))
def test_has_sub(self):
self.assertTrue(interface.has_sub(self.cfg, "GigabitEthernet1/0/1"))
self.assertFalse(interface.has_sub(self.cfg, "GigabitEthernet1/0/1.200"))
self.assertFalse(interface.has_sub(self.cfg, "GigabitEthernet2/0/0"))
self.assertFalse(interface.has_sub(self.cfg, "GigabitEthernet3/0/0"))
def test_is_sub(self):
self.assertFalse(interface.is_sub(self.cfg, "GigabitEthernet1/0/1"))
self.assertTrue(interface.is_sub(self.cfg, "GigabitEthernet1/0/1.200"))
def test_is_qinx(self):
self.assertFalse(interface.is_qinx(self.cfg, "GigabitEthernet1/0/1"))
self.assertFalse(interface.is_qinx(self.cfg, "GigabitEthernet1/0/1.200"))
self.assertFalse(interface.is_qinx(self.cfg, "GigabitEthernet1/0/1.202"))
self.assertTrue(interface.is_qinx(self.cfg, "GigabitEthernet1/0/1.201"))
self.assertTrue(interface.is_qinx(self.cfg, "GigabitEthernet1/0/1.203"))
def test_has_lcp(self):
self.assertTrue(interface.has_lcp(self.cfg, "GigabitEthernet1/0/1"))
self.assertFalse(interface.has_lcp(self.cfg, "GigabitEthernet1/0/0"))
def test_get_lcp(self):
self.assertIsNone(interface.get_lcp(self.cfg, "GigabitEthernet1/0/0"))
self.assertIsNone(interface.get_lcp(self.cfg, "GigabitEthernet1/0/0.100"))
self.assertEqual(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1"), "e1")
self.assertEqual(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.100"), "foo")
self.assertEqual(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.101"), "e1.100")
self.assertEqual(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.102"), "e1.100.100")
self.assertIsNone(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.200"))
self.assertIsNone(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.201"))
self.assertIsNone(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.202"))
self.assertIsNone(interface.get_lcp(self.cfg, "GigabitEthernet1/0/1.203"))
def test_address(self):
self.assertFalse(interface.has_address(self.cfg, "GigabitEthernet1/0/0"))
self.assertFalse(interface.has_address(self.cfg, "GigabitEthernet1/0/0.100"))
self.assertTrue(interface.has_address(self.cfg, "GigabitEthernet1/0/1"))
self.assertTrue(interface.has_address(self.cfg, "GigabitEthernet1/0/1.100"))
def test_lx2c(self):
l2xc_ifs = interface.get_l2xc_interfaces(self.cfg)
l2xc_target_ifs = interface.get_l2xc_target_interfaces(self.cfg)
self.assertIn("GigabitEthernet3/0/0", l2xc_ifs)
self.assertIn("GigabitEthernet3/0/0", l2xc_target_ifs)
self.assertTrue(interface.is_l2xc_interface(self.cfg, "GigabitEthernet3/0/0"))
self.assertTrue(interface.is_l2xc_target_interface(self.cfg, "GigabitEthernet3/0/0"))
self.assertNotIn("GigabitEthernet2/0/0", l2xc_ifs)
self.assertNotIn("GigabitEthernet2/0/0", l2xc_target_ifs)
self.assertFalse(interface.is_l2xc_interface(self.cfg, "GigabitEthernet2/0/0"))
self.assertFalse(interface.is_l2xc_target_interface(self.cfg, "GigabitEthernet2/0/0"))
def test_l2(self):
self.assertTrue(interface.is_l2(self.cfg, "GigabitEthernet3/0/0"))
self.assertFalse(interface.is_l2(self.cfg, "GigabitEthernet1/0/0"))
self.assertTrue(interface.is_l2(self.cfg, "GigabitEthernet3/0/2.100"))
self.assertTrue(interface.is_l2(self.cfg, "GigabitEthernet3/0/2.200"))
def test_l3(self):
self.assertTrue(interface.is_l3(self.cfg, "GigabitEthernet1/0/0"))
self.assertFalse(interface.is_l3(self.cfg, "GigabitEthernet3/0/0"))
def test_get_by_name(self):
ifname, iface = interface.get_by_name(self.cfg, "GigabitEthernet1/0/1.201")
self.assertEqual(ifname, "GigabitEthernet1/0/1.201")
self.assertIsNotNone(iface)
encap = interface.get_encapsulation(self.cfg, ifname)
self.assertEqual(encap, {'dot1q': 1000, 'dot1ad': 0, 'inner-dot1q': 1234, 'exact-match': False})
ifname, iface = interface.get_by_name(self.cfg, "GigabitEthernet1/0/1.1")
self.assertIsNone(ifname)
self.assertIsNone(iface)
def test_get_parent_by_name(self):
ifname, iface = interface.get_parent_by_name(self.cfg, "GigabitEthernet1/0/1.201")
self.assertEqual(ifname, "GigabitEthernet1/0/1")
self.assertIsNotNone(iface)
self.assertNotIn('encapsulation', iface)
ifname, iface = interface.get_parent_by_name(self.cfg, "GigabitEthernet1/0/1.200")
self.assertEqual(ifname, "GigabitEthernet1/0/1")
self.assertIsNotNone(iface)
self.assertNotIn('encapsulation', iface)
ifname, iface = interface.get_parent_by_name(self.cfg, "GigabitEthernet1/0/1")
self.assertIsNone(ifname)
self.assertIsNone(iface)
def test_get_qinx_parent_by_name(self):
self.assertIsNotNone(interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.202"))
self.assertIsNotNone(interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.203"))
ifname, iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1")
self.assertIsNone(iface)
self.assertIsNone(ifname)
ifname, iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.100")
self.assertIsNone(iface)
self.assertIsNone(ifname)
ifname, iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.200")
self.assertIsNone(iface)
self.assertIsNone(ifname)
ifname, iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.201")
self.assertEqual(ifname, "GigabitEthernet1/0/1.200")
def test_get_phys(self):
phys = interface.get_phys(self.cfg)
print(phys)
self.assertEqual(len(phys), 6)
self.assertIn("GigabitEthernet1/0/0", phys)
self.assertNotIn("GigabitEthernet1/0/0.100", phys)
def test_is_phy(self):
self.assertTrue(interface.is_phy(self.cfg, "GigabitEthernet1/0/0"))
self.assertFalse(interface.is_phy(self.cfg, "GigabitEthernet1/0/0.100"))

47
config/test_lcp.py Normal file
View File

@ -0,0 +1,47 @@
import unittest
import yaml
import config.lcp as lcp
import config.interface as interface
class TestLCPMethods(unittest.TestCase):
def setUp(self):
with open("unittest/test_lcp.yaml", "r") as f:
self.cfg = yaml.load(f, Loader = yaml.FullLoader)
def test_enumerators(self):
lcps = lcp.get_lcps(self.cfg)
self.assertIn("e1", lcps)
self.assertIn("foo", lcps)
self.assertIn("e2", lcps)
loopback_lcps = lcp.get_lcps(self.cfg, interfaces=False, bridgedomains=False)
self.assertIn("thrice", loopback_lcps)
self.assertNotIn("e1", loopback_lcps)
def test_lcp(self):
self.assertTrue(lcp.is_unique(self.cfg, "e1"))
self.assertTrue(lcp.is_unique(self.cfg, "foo"))
self.assertTrue(lcp.is_unique(self.cfg, "notexist"))
self.assertFalse(lcp.is_unique(self.cfg, "twice"))
self.assertFalse(lcp.is_unique(self.cfg, "thrice"))
def test_qinx(self):
qinx_ifname, qinx_iface = interface.get_by_name(self.cfg, "GigabitEthernet1/0/1.201")
mid_ifname, mid_iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.201")
parent_ifname, parent_iface = interface.get_parent_by_name(self.cfg, "GigabitEthernet1/0/1.201")
self.assertEqual(qinx_ifname, "GigabitEthernet1/0/1.201")
self.assertEqual(mid_ifname, "GigabitEthernet1/0/1.200")
self.assertEqual(parent_ifname, "GigabitEthernet1/0/1")
qinx_ifname, qinx_iface = interface.get_by_name(self.cfg, "GigabitEthernet1/0/1.201")
mid_ifname, mid_iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.201")
parent_ifname, parent_iface = interface.get_parent_by_name(self.cfg, "GigabitEthernet1/0/1.201")
self.assertEqual(qinx_ifname, "GigabitEthernet1/0/1.201")
self.assertEqual(mid_ifname, "GigabitEthernet1/0/1.200")
self.assertEqual(parent_ifname, "GigabitEthernet1/0/1")
ifname, iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.100")
self.assertIsNone(ifname)
self.assertIsNone(iface)

26
config/test_loopback.py Normal file
View File

@ -0,0 +1,26 @@
import unittest
import yaml
import config.loopback as loopback
class TestLoopbackMethods(unittest.TestCase):
def setUp(self):
with open("unittest/test_loopback.yaml", "r") as f:
self.cfg = yaml.load(f, Loader = yaml.FullLoader)
def test_get_by_name(self):
ifname, iface = loopback.get_by_name(self.cfg, "loop1")
self.assertIsNotNone(iface)
self.assertEqual("loop1", ifname)
self.assertEqual(iface['mtu'], 2000)
ifname, iface = loopback.get_by_name(self.cfg, "loop-noexist")
self.assertIsNone(ifname)
self.assertIsNone(iface)
def test_enumerators(self):
ifs = loopback.get_loopbacks(self.cfg)
self.assertEqual(len(ifs), 3)
self.assertIn("loop0", ifs)
self.assertIn("loop1", ifs)
self.assertIn("loop2", ifs)
self.assertNotIn("loop-noexist", ifs)

View File

@ -0,0 +1,36 @@
import unittest
import yaml
import config.vxlan_tunnel as vxlan_tunnel
class TestVXLANMethods(unittest.TestCase):
def setUp(self):
with open("unittest/test_vxlan_tunnel.yaml", "r") as f:
self.cfg = yaml.load(f, Loader = yaml.FullLoader)
def test_get_by_name(self):
ifname, iface = vxlan_tunnel.get_by_name(self.cfg, "vxlan_tunnel0")
self.assertIsNotNone(iface)
self.assertEqual("vxlan_tunnel0", ifname)
ifname, iface = vxlan_tunnel.get_by_name(self.cfg, "vxlan_tunnel-noexist")
self.assertIsNone(ifname)
self.assertIsNone(iface)
def test_is_vxlan_tunnel(self):
self.assertTrue(vxlan_tunnel.is_vxlan_tunnel(self.cfg, "vxlan_tunnel0"))
self.assertFalse(vxlan_tunnel.is_vxlan_tunnel(self.cfg, "vxlan_tunnel-noexist"))
self.assertFalse(vxlan_tunnel.is_vxlan_tunnel(self.cfg, "GigabitEthernet1/0/0"))
def test_enumerators(self):
ifs = vxlan_tunnel.get_vxlan_tunnels(self.cfg)
self.assertEqual(len(ifs), 4)
self.assertIn("vxlan_tunnel0", ifs)
self.assertIn("vxlan_tunnel1", ifs)
self.assertIn("vxlan_tunnel2", ifs)
self.assertIn("vxlan_tunnel3", ifs)
self.assertNotIn("vxlan_tunnel-noexist", ifs)
def test_vni_unique(self):
self.assertTrue(vxlan_tunnel.vni_unique(self.cfg, 100))
self.assertFalse(vxlan_tunnel.vni_unique(self.cfg, 101))
self.assertTrue(vxlan_tunnel.vni_unique(self.cfg, 102))

79
config/vxlan_tunnel.py Normal file
View File

@ -0,0 +1,79 @@
#
# Copyright (c) 2022 Pim van Pelt
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import config.interface as interface
import ipaddress
def get_by_name(yaml, ifname):
""" Return the VXLAN by name, if it exists. Return None otherwise. """
try:
if ifname in yaml['vxlan_tunnels']:
return ifname, yaml['vxlan_tunnels'][ifname]
except:
pass
return None, None
def is_vxlan_tunnel(yaml, ifname):
""" Returns True if the interface name is an existing VXLAN Tunnel. """
ifname, iface = get_by_name(yaml, ifname)
return not iface == None
def vni_unique(yaml, vni):
""" Return True if the VNI is unique amongst all VXLANs """
if not 'vxlan_tunnels' in yaml:
return True
ncount = 0
for ifname, iface in yaml['vxlan_tunnels'].items():
if iface['vni'] == vni:
ncount = ncount + 1
return ncount < 2
def get_vxlan_tunnels(yaml):
""" Returns a list of all VXLAN tunnel interface names. """
ret = []
if not 'vxlan_tunnels' in yaml:
return ret
for ifname, iface in yaml['vxlan_tunnels'].items():
ret.append(ifname)
return ret
def validate_vxlan_tunnels(yaml):
result = True
msgs = []
logger = logging.getLogger('vppcfg.config')
logger.addHandler(logging.NullHandler())
if not 'vxlan_tunnels' in yaml:
return result, msgs
for ifname, iface in yaml['vxlan_tunnels'].items():
logger.debug("vxlan_tunnel %s: %s" % (ifname, iface))
vni = iface['vni']
if not vni_unique(yaml, vni):
msgs.append("vxlan_tunnel %s VNI %d is not unique" % (ifname, vni))
result = False
local = ipaddress.ip_address(iface['local'])
remote = ipaddress.ip_address(iface['remote'])
if local.version != remote.version:
msgs.append("vxlan_tunnel %s local and remote are not the same address family" % (ifname))
result = False
return result, msgs