Add a few additional useful functions
- is_*() returns True if the interface name is of a certain type is_bondethernet() is_loopback() is_bvi() is_bridgedomain() is_vxlan_tunnel() is_phy() - get_phys() by process of elimination, returns all interface names that are supposed to be physical network interfaces. Add unit tests for validator/vxlan_tunnel.py => Notable: while here, fix a bug in get_by_name() Add unit tests for all the is_*() and get_phys() functions.
This commit is contained in:
24
unittest/test_vxlan_tunnel.yaml
Normal file
24
unittest/test_vxlan_tunnel.yaml
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
vxlan_tunnels:
|
||||||
|
vxlan_tunnel0:
|
||||||
|
description: "Correctly configured VXLAN"
|
||||||
|
local: 192.0.2.1
|
||||||
|
remote: 192.0.2.2
|
||||||
|
vni: 100
|
||||||
|
|
||||||
|
vxlan_tunnel1:
|
||||||
|
description: "VXLAN VNI overlaps with vxlan_tunnel2"
|
||||||
|
local: 2001:db8::1
|
||||||
|
remote: 2001:db8::2
|
||||||
|
vni: 101
|
||||||
|
|
||||||
|
vxlan_tunnel2:
|
||||||
|
description: "VXLAN VNI overlaps with vxlan_tunnel1"
|
||||||
|
local: 192.0.2.9
|
||||||
|
remote: 192.0.2.10
|
||||||
|
vni: 101
|
||||||
|
|
||||||
|
vxlan_tunnel3:
|
||||||
|
description: "VXLAN src/dst address family mismatch"
|
||||||
|
local: 192.0.2.17
|
||||||
|
remote: 2001:db8:1::2
|
||||||
|
vni: 102
|
@ -29,8 +29,14 @@ def get_by_name(yaml, ifname):
|
|||||||
return None, None
|
return None, None
|
||||||
|
|
||||||
|
|
||||||
|
def is_bondethernet(yaml, ifname):
|
||||||
|
""" Returns True if the interface name is an existing BondEthernet. """
|
||||||
|
ifname, iface = get_by_name(yaml, ifname)
|
||||||
|
return not iface == None
|
||||||
|
|
||||||
|
|
||||||
def is_bond_member(yaml, ifname):
|
def is_bond_member(yaml, ifname):
|
||||||
""" Returns True if this interface is a member of a BondEthernet """
|
""" Returns True if this interface is a member of a BondEthernet. """
|
||||||
if not 'bondethernets' in yaml:
|
if not 'bondethernets' in yaml:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -41,6 +41,29 @@ def get_by_name(yaml, ifname):
|
|||||||
return None, None
|
return None, None
|
||||||
|
|
||||||
|
|
||||||
|
def is_bridgedomain(yaml, ifname):
|
||||||
|
""" Returns True if the name is an existing bridgedomain (ie bd[0-9]+). """
|
||||||
|
ifname, iface = get_by_name(yaml, ifname)
|
||||||
|
return not iface == None
|
||||||
|
|
||||||
|
|
||||||
|
def is_bvi(yaml, ifname):
|
||||||
|
""" Returns True if the interface name is an existing BVI. """
|
||||||
|
if not ifname.startswith("bvi"):
|
||||||
|
return False
|
||||||
|
idx = ifname[3:]
|
||||||
|
if not idx.isnumeric():
|
||||||
|
return False
|
||||||
|
bridgename, bridge = get_by_name(yaml, "bd%d" % (int(idx)))
|
||||||
|
if not bridge:
|
||||||
|
return False
|
||||||
|
|
||||||
|
## If the BridgeDomain has an 'lcp', then it has a Bridge Virtual Interface
|
||||||
|
if 'lcp' in bridge:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def get_bridge_interfaces(yaml):
|
def get_bridge_interfaces(yaml):
|
||||||
""" Returns a list of all interfaces that are bridgedomain members """
|
""" Returns a list of all interfaces that are bridgedomain members """
|
||||||
|
|
||||||
|
@ -14,6 +14,8 @@
|
|||||||
import logging
|
import logging
|
||||||
import validator.bondethernet as bondethernet
|
import validator.bondethernet as bondethernet
|
||||||
import validator.bridgedomain as bridgedomain
|
import validator.bridgedomain as bridgedomain
|
||||||
|
import validator.loopback as loopback
|
||||||
|
import validator.vxlan_tunnel as vxlan_tunnel
|
||||||
import validator.lcp as lcp
|
import validator.lcp as lcp
|
||||||
import validator.address as address
|
import validator.address as address
|
||||||
|
|
||||||
@ -237,6 +239,40 @@ def get_encapsulation(yaml, ifname):
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_phys(yaml):
|
||||||
|
""" Return a list of all toplevel (ie. non-sub) interfaces which are
|
||||||
|
assumed to be physical network cards, eg TenGigabitEthernet1/0/0. Note
|
||||||
|
that derived/created interfaces such as Tunnels, BondEthernets and
|
||||||
|
Loopbacks/BVIs are not returned """
|
||||||
|
ret = []
|
||||||
|
if not 'interfaces' in yaml:
|
||||||
|
return ret
|
||||||
|
for ifname, iface in yaml['interfaces'].items():
|
||||||
|
if is_phy(yaml, ifname):
|
||||||
|
ret.append(ifname)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
|
||||||
|
def is_phy(yaml, ifname):
|
||||||
|
""" Returns True if the ifname is the name of a physical network interface. """
|
||||||
|
|
||||||
|
ifname, iface = get_by_name(yaml, ifname)
|
||||||
|
if iface == None:
|
||||||
|
return False
|
||||||
|
if is_sub(yaml, ifname):
|
||||||
|
return False
|
||||||
|
|
||||||
|
if bondethernet.is_bondethernet(yaml, ifname):
|
||||||
|
return False
|
||||||
|
if loopback.is_loopback(yaml, ifname):
|
||||||
|
return False
|
||||||
|
if bridgedomain.is_bvi(yaml, ifname):
|
||||||
|
return False
|
||||||
|
if vxlan_tunnel.is_vxlan_tunnel(yaml, ifname):
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def get_interfaces(yaml):
|
def get_interfaces(yaml):
|
||||||
""" Return a list of all interface and sub-interface names """
|
""" Return a list of all interface and sub-interface names """
|
||||||
ret = []
|
ret = []
|
||||||
|
@ -27,6 +27,7 @@ def get_loopbacks(yaml):
|
|||||||
ret.append(ifname)
|
ret.append(ifname)
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
def get_by_name(yaml, ifname):
|
def get_by_name(yaml, ifname):
|
||||||
""" Return the loopback by name, if it exists. Return None otherwise. """
|
""" Return the loopback by name, if it exists. Return None otherwise. """
|
||||||
try:
|
try:
|
||||||
@ -37,6 +38,12 @@ def get_by_name(yaml, ifname):
|
|||||||
return None, None
|
return None, None
|
||||||
|
|
||||||
|
|
||||||
|
def is_loopback(yaml, ifname):
|
||||||
|
""" Returns True if the interface name is an existing loopback. """
|
||||||
|
ifname, iface = get_by_name(yaml, ifname)
|
||||||
|
return not iface == None
|
||||||
|
|
||||||
|
|
||||||
def validate_loopbacks(yaml):
|
def validate_loopbacks(yaml):
|
||||||
result = True
|
result = True
|
||||||
msgs = []
|
msgs = []
|
||||||
|
@ -23,3 +23,8 @@ class TestBondEthernetMethods(unittest.TestCase):
|
|||||||
self.assertTrue(bondethernet.is_bond_member(self.cfg, "GigabitEthernet1/0/1"))
|
self.assertTrue(bondethernet.is_bond_member(self.cfg, "GigabitEthernet1/0/1"))
|
||||||
self.assertFalse(bondethernet.is_bond_member(self.cfg, "GigabitEthernet2/0/0"))
|
self.assertFalse(bondethernet.is_bond_member(self.cfg, "GigabitEthernet2/0/0"))
|
||||||
self.assertFalse(bondethernet.is_bond_member(self.cfg, "GigabitEthernet2/0/0.100"))
|
self.assertFalse(bondethernet.is_bond_member(self.cfg, "GigabitEthernet2/0/0.100"))
|
||||||
|
|
||||||
|
def test_is_bondethernet(self):
|
||||||
|
self.assertTrue(bondethernet.is_bondethernet(self.cfg, "BondEthernet0"))
|
||||||
|
self.assertFalse(bondethernet.is_bondethernet(self.cfg, "BondEthernet-notexist"))
|
||||||
|
self.assertFalse(bondethernet.is_bondethernet(self.cfg, "GigabitEthernet1/0/0"))
|
||||||
|
@ -18,6 +18,20 @@ class TestBridgeDomainMethods(unittest.TestCase):
|
|||||||
self.assertIsNone(iface)
|
self.assertIsNone(iface)
|
||||||
self.assertIsNone(ifname)
|
self.assertIsNone(ifname)
|
||||||
|
|
||||||
|
def test_is_bridgedomain(self):
|
||||||
|
self.assertTrue(bridgedomain.is_bridgedomain(self.cfg, "bd10"))
|
||||||
|
self.assertTrue(bridgedomain.is_bridgedomain(self.cfg, "bd11"))
|
||||||
|
self.assertTrue(bridgedomain.is_bridgedomain(self.cfg, "bd12"))
|
||||||
|
self.assertFalse(bridgedomain.is_bridgedomain(self.cfg, "bd-notexist"))
|
||||||
|
self.assertFalse(bridgedomain.is_bridgedomain(self.cfg, "GigabitEthernet1/0/0"))
|
||||||
|
|
||||||
|
def test_is_bvi(self):
|
||||||
|
self.assertFalse(bridgedomain.is_bvi(self.cfg, "bvi10"))
|
||||||
|
self.assertTrue(bridgedomain.is_bvi(self.cfg, "bvi11"))
|
||||||
|
self.assertTrue(bridgedomain.is_bvi(self.cfg, "bvi12"))
|
||||||
|
self.assertFalse(bridgedomain.is_bvi(self.cfg, "bvi-notexist"))
|
||||||
|
self.assertFalse(bridgedomain.is_bvi(self.cfg, "GigabitEthernet1/0/0"))
|
||||||
|
|
||||||
def test_members(self):
|
def test_members(self):
|
||||||
self.assertTrue(bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet1/0/0"))
|
self.assertTrue(bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet1/0/0"))
|
||||||
self.assertTrue(bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet2/0/0.100"))
|
self.assertTrue(bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet2/0/0.100"))
|
||||||
|
@ -180,3 +180,14 @@ class TestInterfaceMethods(unittest.TestCase):
|
|||||||
|
|
||||||
ifname, iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.201")
|
ifname, iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.201")
|
||||||
self.assertEqual(ifname, "GigabitEthernet1/0/1.200")
|
self.assertEqual(ifname, "GigabitEthernet1/0/1.200")
|
||||||
|
|
||||||
|
def test_get_phys(self):
|
||||||
|
phys = interface.get_phys(self.cfg)
|
||||||
|
print(phys)
|
||||||
|
self.assertEqual(len(phys), 6)
|
||||||
|
self.assertIn("GigabitEthernet1/0/0", phys)
|
||||||
|
self.assertNotIn("GigabitEthernet1/0/0.100", phys)
|
||||||
|
|
||||||
|
def test_is_phy(self):
|
||||||
|
self.assertTrue(interface.is_phy(self.cfg, "GigabitEthernet1/0/0"))
|
||||||
|
self.assertFalse(interface.is_phy(self.cfg, "GigabitEthernet1/0/0.100"))
|
||||||
|
36
validator/test_vxlan_tunnel.py
Normal file
36
validator/test_vxlan_tunnel.py
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
import unittest
|
||||||
|
import yaml
|
||||||
|
import validator.vxlan_tunnel as vxlan_tunnel
|
||||||
|
|
||||||
|
class TestVXLANMethods(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
with open("unittest/test_vxlan_tunnel.yaml", "r") as f:
|
||||||
|
self.cfg = yaml.load(f, Loader = yaml.FullLoader)
|
||||||
|
|
||||||
|
def test_get_by_name(self):
|
||||||
|
ifname, iface = vxlan_tunnel.get_by_name(self.cfg, "vxlan_tunnel0")
|
||||||
|
self.assertIsNotNone(iface)
|
||||||
|
self.assertEqual("vxlan_tunnel0", ifname)
|
||||||
|
|
||||||
|
ifname, iface = vxlan_tunnel.get_by_name(self.cfg, "vxlan_tunnel-noexist")
|
||||||
|
self.assertIsNone(ifname)
|
||||||
|
self.assertIsNone(iface)
|
||||||
|
|
||||||
|
def test_is_vxlan_tunnel(self):
|
||||||
|
self.assertTrue(vxlan_tunnel.is_vxlan_tunnel(self.cfg, "vxlan_tunnel0"))
|
||||||
|
self.assertFalse(vxlan_tunnel.is_vxlan_tunnel(self.cfg, "vxlan_tunnel-noexist"))
|
||||||
|
self.assertFalse(vxlan_tunnel.is_vxlan_tunnel(self.cfg, "GigabitEthernet1/0/0"))
|
||||||
|
|
||||||
|
def test_enumerators(self):
|
||||||
|
ifs = vxlan_tunnel.get_vxlan_tunnels(self.cfg)
|
||||||
|
self.assertEqual(len(ifs), 4)
|
||||||
|
self.assertIn("vxlan_tunnel0", ifs)
|
||||||
|
self.assertIn("vxlan_tunnel1", ifs)
|
||||||
|
self.assertIn("vxlan_tunnel2", ifs)
|
||||||
|
self.assertIn("vxlan_tunnel3", ifs)
|
||||||
|
self.assertNotIn("vxlan_tunnel-noexist", ifs)
|
||||||
|
|
||||||
|
def test_vni_unique(self):
|
||||||
|
self.assertTrue(vxlan_tunnel.vni_unique(self.cfg, 100))
|
||||||
|
self.assertFalse(vxlan_tunnel.vni_unique(self.cfg, 101))
|
||||||
|
self.assertTrue(vxlan_tunnel.vni_unique(self.cfg, 102))
|
@ -27,7 +27,14 @@ def get_by_name(yaml, ifname):
|
|||||||
return ifname, yaml['vxlan_tunnels'][ifname]
|
return ifname, yaml['vxlan_tunnels'][ifname]
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
return None
|
return None, None
|
||||||
|
|
||||||
|
|
||||||
|
def is_vxlan_tunnel(yaml, ifname):
|
||||||
|
""" Returns True if the interface name is an existing VXLAN Tunnel. """
|
||||||
|
ifname, iface = get_by_name(yaml, ifname)
|
||||||
|
return not iface == None
|
||||||
|
|
||||||
|
|
||||||
def vni_unique(yaml, vni):
|
def vni_unique(yaml, vni):
|
||||||
""" Return True if the VNI is unique amongst all VXLANs """
|
""" Return True if the VNI is unique amongst all VXLANs """
|
||||||
@ -42,6 +49,17 @@ def vni_unique(yaml, vni):
|
|||||||
return ncount < 2
|
return ncount < 2
|
||||||
|
|
||||||
|
|
||||||
|
def get_vxlan_tunnels(yaml):
|
||||||
|
""" Returns a list of all VXLAN tunnel interface names. """
|
||||||
|
ret = []
|
||||||
|
if not 'vxlan_tunnels' in yaml:
|
||||||
|
return ret
|
||||||
|
|
||||||
|
for ifname, iface in yaml['vxlan_tunnels'].items():
|
||||||
|
ret.append(ifname)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
|
||||||
def validate_vxlan_tunnels(yaml):
|
def validate_vxlan_tunnels(yaml):
|
||||||
result = True
|
result = True
|
||||||
msgs = []
|
msgs = []
|
||||||
|
Reference in New Issue
Block a user