Files
vppcfg/validator/interface.py
2022-03-15 22:32:55 +00:00

481 lines
18 KiB
Python

#
# Copyright (c) 2022 Pim van Pelt
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import validator.bondethernet as bondethernet
import validator.lcp as lcp
import validator.address as address
class NullHandler(logging.Handler):
def emit(self, record):
pass
def get_parent_by_name(yaml, ifname):
""" Returns the sub-interface's parent, or None if the sub-int doesn't exist. """
if not '.' in ifname:
return None
ifname, subid = ifname.split('.')
subid = int(subid)
try:
iface = yaml['interfaces'][ifname]
return iface
except:
pass
return None
def get_by_name(yaml, ifname):
""" Returns the interface or sub-interface by a given name, or None if it does not exist """
if '.' in ifname:
ifname, subid = ifname.split('.')
subid = int(subid)
try:
iface = yaml['interfaces'][ifname]['sub-interfaces'][subid]
return iface
except:
return None
try:
iface = yaml['interfaces'][ifname]
return iface
except:
pass
return None
def has_sub(yaml, ifname):
""" Returns True if this interface has sub-interfaces """
if not 'interfaces' in yaml:
return False
if ifname in yaml['interfaces']:
iface = yaml['interfaces'][ifname]
if 'sub-interfaces' in iface and len(iface['sub-interfaces']) > 0:
return True
return False
def has_address(yaml, ifname):
""" Returns True if this interface or sub-interface has one or more addresses"""
if not 'interfaces' in yaml:
return False
if '.' in ifname:
ifname, subid = ifname.split('.')
subid = int(subid)
try:
if len(yaml['interfaces'][ifname]['sub-interfaces'][subid]['addresses']) > 0:
return True
except:
pass
return False
try:
if len(yaml['interfaces'][ifname]['addresses']) > 0:
return True
except:
pass
return False
def is_bond_member(yaml, ifname):
""" Returns True if this interface is a member of a BondEthernet """
if not 'bondethernets' in yaml:
return False
for bond, iface in yaml['bondethernets'].items():
if not 'interfaces' in iface:
continue
if ifname in iface['interfaces']:
return True
return False
def get_bridge_interfaces(yaml):
""" Returns a list of all interfaces that are bridgedomain members """
ret = []
if not 'bridgedomains' in yaml:
return ret
for ifname, iface in yaml['bridgedomains'].items():
if 'interfaces' in iface:
ret.extend(iface['interfaces'])
return ret
def is_bridge_interface_unique(yaml, ifname):
""" Returns True if this interface is referenced in bridgedomains zero or one times """
ifs = get_bridge_interfaces(yaml)
n = ifs.count(ifname)
if n == 0 or n == 1:
return True
return False
def is_bridge_interface(yaml, ifname):
""" Returns True if this interface is a member of a BridgeDomain """
if ifname in get_bridge_interfaces(yaml):
return True
return False
def get_l2xc_interfaces(yaml):
""" Returns a list of all interfaces that have an L2 CrossConnect """
ret = []
if not 'interfaces' in yaml:
return ret
for ifname, iface in yaml['interfaces'].items():
if 'l2xc' in iface:
ret.extend(ifname)
if 'sub-interfaces' in iface:
for sub_ifname, sub_iface in iface['sub-interfaces'].items():
if 'l2xc' in sub_iface:
ret.extend(sub_ifname)
return ret
def is_l2xc_interface(yaml, ifname):
""" Returns True if this interface has an L2 CrossConnect """
if ifname in get_l2xc_interfaces(yaml):
return True
return False
def get_l2xc_target_interfaces(yaml):
""" Returns a list of all interfaces that are the target of an L2 CrossConnect """
ret = []
if 'interfaces' in yaml:
for ifname, iface in yaml['interfaces'].items():
if 'l2xc' in iface:
ret.append(iface['l2xc'])
if 'sub-interfaces' in iface:
for sub_ifname, sub_iface in iface['sub-interfaces'].items():
if 'l2xc' in sub_iface:
ret.append(sub_iface['l2xc'])
return ret
def is_l2xc_target_interface(yaml, ifname):
""" Returns True if this interface is the target of an L2 CrossConnect """
if ifname in get_l2xc_target_interfaces(yaml):
return True
return False
def is_l2xc_target_interface_unique(yaml, ifname):
""" Returns True if this interface is referenced as an l2xc target zero or one times """
ifs = get_l2xc_target_interfaces(yaml)
n = ifs.count(ifname)
if n == 0 or n == 1:
return True
return False
def has_lcp(yaml, ifname):
""" Returns True if this interface or sub-interface has an LCP """
if not 'interfaces' in yaml:
return False
if '.' in ifname:
ifname, subid = ifname.split('.')
subid = int(subid)
try:
if len(yaml['interfaces'][ifname]['sub-interfaces'][subid]['lcp']) > 0:
return True
except:
pass
return False
try:
if len(yaml['interfaces'][ifname]['lcp']) > 0:
return True
except:
pass
return False
def unique_encapsulation(yaml, sub_ifname):
""" Ensures that for the sub_ifname specified, there exist no other sub-ints on the
parent with the same encapsulation. """
iface = get_by_name(yaml, sub_ifname)
parent_iface = get_parent_by_name(yaml, sub_ifname)
if not iface or not parent_iface:
return False
parent_ifname, subid = sub_ifname.split('.')
dot1q = 0
dot1ad = 0
inner_dot1q = 0
if not 'encapsulation' in iface:
dot1q = int(subid)
else:
if 'dot1q' in iface['encapsulation']:
dot1q = iface['encapsulation']['dot1q']
elif 'dot1ad' in iface['encapsulation']:
dot1ad = iface['encapsulation']['dot1ad']
if 'inner-dot1q' in iface['encapsulation']:
inner_dot1q = iface['encapsulation']['inner-dot1q']
ncount = 0
for subid, sibling_iface in parent_iface['sub-interfaces'].items():
sibling_dot1q = 0
sibling_dot1ad = 0
sibling_inner_dot1q = 0
sibling_ifname = "%s.%d" % (parent_ifname, subid)
if not 'encapsulation' in sibling_iface:
sibling_dot1q = subid
else:
if 'dot1q' in sibling_iface['encapsulation']:
sibling_dot1q = sibling_iface['encapsulation']['dot1q']
elif 'dot1ad' in sibling_iface['encapsulation']:
sibling_dot1ad = sibling_iface['encapsulation']['dot1ad']
if 'inner-dot1q' in sibling_iface['encapsulation']:
sibling_inner_dot1q = sibling_iface['encapsulation']['inner-dot1q']
if (dot1q,dot1ad,inner_dot1q) == (sibling_dot1q, sibling_dot1ad, sibling_inner_dot1q) and sub_ifname != sibling_ifname:
## print("%s overlaps with %s: [%d,%d,%d]" % (sub_ifname, sibling_ifname, dot1q, dot1ad, inner_dot1q))
ncount = ncount + 1
if (ncount == 0):
return True
return False
def valid_encapsulation(yaml, sub_ifname):
try:
ifname, subid = sub_ifname.split('.')
subid = int(subid)
sub_iface = yaml['interfaces'][ifname]['sub-interfaces'][subid]
except:
return False
if not 'encapsulation' in sub_iface:
return True
encap = sub_iface['encapsulation']
if 'dot1ad' in encap and 'dot1q' in encap:
return False
if 'inner-dot1q' in encap and not ('dot1ad' in encap or 'dot1q' in encap):
return False
if 'exact-match' in encap and encap['exact-match'] == False and is_l3(yaml, sub_ifname):
return False
return True
def is_l3(yaml, ifname):
""" Returns True if the interface exists and has either an LCP or an address """
iface = get_by_name(yaml, ifname)
if not iface:
return False
if has_lcp(yaml, ifname):
return True
if has_address(yaml, ifname):
return True
return False
def get_lcp(yaml, ifname):
""" Returns the LCP of the interface. If the interface is a sub-interface with L3
enabled, synthesize it based on its parent, using smart QinQ syntax.
Return None if no LCP can be found. """
iface = get_by_name(yaml, ifname)
parent_iface = get_parent_by_name(yaml, ifname)
if 'lcp' in iface:
return iface['lcp']
if not is_l3(yaml, ifname):
return None
if parent_iface and not 'lcp' in parent_iface:
return None
if not 'encapsulation' in iface:
if not '.' in ifname:
## Not a sub-int and no encap? Should not happen
return None
ifname, subid = ifname.split('.')
subid = int(subid)
return "%s.%d" % (parent_iface['lcp'], subid)
dot1q = 0
dot1ad = 0
inner_dot1q = 0
if 'dot1q' in iface['encapsulation']:
dot1q = iface['encapsulation']['dot1q']
elif 'dot1ad' in iface['encapsulation']:
dot1ad = iface['encapsulation']['dot1ad']
if 'inner-dot1q' in iface['encapsulation']:
inner_dot1q = iface['encapsulation']['inner-dot1q']
if inner_dot1q and dot1ad:
lcp = "%s.%d.%d" % (parent_iface['lcp'], dot1ad, inner_dot1q)
elif inner_dot1q and dot1q:
lcp = "%s.%d.%d" % (parent_iface['lcp'], dot1q, inner_dot1q)
elif dot1ad:
lcp = "%s.%d" % (parent_iface['lcp'], dot1ad)
elif dot1q:
lcp = "%s.%d" % (parent_iface['lcp'], dot1q)
else:
return None
return lcp
def get_mtu(yaml, ifname):
""" Returns MTU of the interface. If it's not set, return the parent's MTU, and
return 1500 if no MTU was set on the sub-int or the parent."""
iface = get_by_name(yaml, ifname)
parent_iface = get_parent_by_name(yaml, ifname)
try:
return iface['mtu']
return parent_iface['mtu']
except:
pass
return 1500
def validate_interfaces(yaml):
result = True
msgs = []
logger = logging.getLogger('vppcfg.validator')
logger.addHandler(NullHandler())
if not 'interfaces' in yaml:
return result, msgs
for ifname, iface in yaml['interfaces'].items():
logger.debug("interface %s" % iface)
if ifname.startswith("BondEthernet") and not bondethernet.get_by_name(yaml, ifname):
msgs.append("interface %s does not exist in bondethernets" % ifname)
result = False
iface_mtu = get_mtu(yaml, ifname)
iface_lcp = has_lcp(yaml, ifname)
iface_address = has_address(yaml, ifname)
if iface_address and not iface_lcp:
msgs.append("interface %s has an address but no LCP" % ifname)
result = False
iface_lcp = get_lcp(yaml, ifname)
if iface_lcp and not lcp.is_unique(yaml, iface_lcp):
msgs.append("interface %s does not have a unique LCP name %s" % (ifname, iface_lcp))
result = False
if 'addresses' in iface:
for a in iface['addresses']:
if not address.is_allowed(yaml, ifname, iface['addresses'], a):
msgs.append("interface %s IP address %s conflicts with another" % (ifname, a))
result = False
if 'l2xc' in iface:
if has_sub(yaml, ifname):
msgs.append("interface %s has l2xc so it cannot have sub-interfaces" % (ifname))
result = False
if iface_lcp:
msgs.append("interface %s has l2xc so it cannot be an LCP" % (ifname))
result = False
if iface_address:
msgs.append("interface %s has l2xc so it cannot have an address" % (ifname))
result = False
if not get_by_name(yaml, iface['l2xc']):
msgs.append("interface %s l2xc target %s does not exist" % (ifname, iface['l2xc']))
result = False
target_mtu = get_mtu(yaml, iface['l2xc'])
if target_mtu != iface_mtu:
msgs.append("interface %s l2xc target MTU %d does not match source MTU %d" % (ifname, target_mtu, iface_mtu))
result = False
if not is_l2xc_target_interface_unique(yaml, iface['l2xc']):
msgs.append("interface %s l2xc target %s is not unique" % (ifname, iface['l2xc']))
result = False
if is_bridge_interface(yaml, iface['l2xc']):
msgs.append("interface %s l2xc target %s is in a bridgedomain" % (ifname, iface['l2xc']))
result = False
if has_lcp(yaml, iface['l2xc']):
msgs.append("interface %s l2xc target %s cannot be an LCP" % (ifname, iface['l2xc']))
result = False
if has_address(yaml, iface['l2xc']):
msgs.append("interface %s l2xc target %s cannot have an address" % (ifname, iface['l2xc']))
result = False
if has_sub(yaml, ifname):
for sub_id, sub_iface in yaml['interfaces'][ifname]['sub-interfaces'].items():
logger.debug("sub-interface %s" % sub_iface)
sub_ifname = "%s.%d" % (ifname, sub_id)
if not sub_iface:
msgs.append("sub-interface %s has no config" % (sub_ifname))
result = False
continue
sub_lcp = get_lcp(yaml, sub_ifname)
if sub_lcp and len(sub_lcp)>15:
msgs.append("sub-interface %s has LCP with too long name %s" % (sub_ifname, sub_lcp))
result = False
if iface_lcp and not lcp.is_unique(yaml, iface_lcp):
msgs.append("sub-interface %s does not have a unique LCP name %s" % (sub_ifname, sub_lcp))
result = False
sub_mtu = get_mtu(yaml, sub_ifname)
if sub_mtu > iface_mtu:
msgs.append("sub-interface %s has MTU %d higher than parent MTU %d" % (sub_ifname, sub_iface['mtu'], iface_mtu))
result = False
if has_lcp(yaml, sub_ifname):
if not iface_lcp:
msgs.append("sub-interface %s has LCP but %s does not have LCP" % (sub_ifname, ifname))
result = False
if has_address(yaml, sub_ifname):
## The sub_iface lcp is not required: it can be derived from the iface_lcp, which has to be set
if not iface_lcp:
msgs.append("sub-interface %s has an address but %s does not have LCP" % (sub_ifname, ifname))
result = False
for a in sub_iface['addresses']:
if not address.is_allowed(yaml, sub_ifname, sub_iface['addresses'], a):
msgs.append("sub-interface %s IP address %s conflicts with another" % (sub_ifname, a))
result = False
if not valid_encapsulation(yaml, sub_ifname):
msgs.append("sub-interface %s has invalid encapsulation" % (sub_ifname))
result = False
elif not unique_encapsulation(yaml, sub_ifname):
msgs.append("sub-interface %s does not have unique encapsulation" % (sub_ifname))
result = False
if 'l2xc' in sub_iface:
if has_lcp(yaml, sub_ifname):
msgs.append("sub-interface %s has l2xc so it cannot be an LCP" % (sub_ifname))
result = False
if has_address(yaml, sub_ifname):
msgs.append("sub-interface %s has l2xc so it cannot have an address" % (sub_ifname))
result = False
if not get_by_name(yaml, sub_iface['l2xc']):
msgs.append("sub-interface %s l2xc target %s does not exist" % (sub_ifname, sub_iface['l2xc']))
result = False
target_mtu = get_mtu(yaml, sub_iface['l2xc'])
if target_mtu != sub_mtu:
msgs.append("sub-interface %s l2xc target MTU %d does not match source MTU %d" % (ifname, target_mtu, sub_mtu))
result = False
if not is_l2xc_target_interface_unique(yaml, sub_iface['l2xc']):
msgs.append("sub-interface %s l2xc target %s is not unique" % (sub_ifname, sub_iface['l2xc']))
result = False
if is_bridge_interface(yaml, sub_iface['l2xc']):
msgs.append("sub-interface %s l2xc target %s is in a bridgedomain" % (sub_ifname, sub_iface['l2xc']))
result = False
if has_lcp(yaml, sub_iface['l2xc']):
msgs.append("sub-interface %s l2xc target %s cannot be an LCP" % (sub_ifname, sub_iface['l2xc']))
result = False
if has_address(yaml, sub_iface['l2xc']):
msgs.append("sub-interface %s l2xc target %s cannot have an address" % (sub_ifname, sub_iface['l2xc']))
result = False
return result, msgs