From b43d7903fd3766bf9ffab314c9d9ea8473f56c0b Mon Sep 17 00:00:00 2001 From: Pim van Pelt Date: Thu, 24 Mar 2022 10:45:34 +0000 Subject: [PATCH] Add a few additional useful functions - is_*() returns True if the interface name is of a certain type is_bondethernet() is_loopback() is_bvi() is_bridgedomain() is_vxlan_tunnel() is_phy() - get_phys() by process of elimination, returns all interface names that are supposed to be physical network interfaces. Add unit tests for validator/vxlan_tunnel.py => Notable: while here, fix a bug in get_by_name() Add unit tests for all the is_*() and get_phys() functions. --- unittest/test_vxlan_tunnel.yaml | 24 ++++++++++++++++++++++ validator/bondethernet.py | 8 +++++++- validator/bridgedomain.py | 23 +++++++++++++++++++++ validator/interface.py | 36 +++++++++++++++++++++++++++++++++ validator/loopback.py | 7 +++++++ validator/test_bondethernet.py | 5 +++++ validator/test_bridgedomain.py | 14 +++++++++++++ validator/test_interface.py | 11 ++++++++++ validator/test_vxlan_tunnel.py | 36 +++++++++++++++++++++++++++++++++ validator/vxlan_tunnel.py | 20 +++++++++++++++++- 10 files changed, 182 insertions(+), 2 deletions(-) create mode 100644 unittest/test_vxlan_tunnel.yaml create mode 100644 validator/test_vxlan_tunnel.py diff --git a/unittest/test_vxlan_tunnel.yaml b/unittest/test_vxlan_tunnel.yaml new file mode 100644 index 0000000..c6c0234 --- /dev/null +++ b/unittest/test_vxlan_tunnel.yaml @@ -0,0 +1,24 @@ +vxlan_tunnels: + vxlan_tunnel0: + description: "Correctly configured VXLAN" + local: 192.0.2.1 + remote: 192.0.2.2 + vni: 100 + + vxlan_tunnel1: + description: "VXLAN VNI overlaps with vxlan_tunnel2" + local: 2001:db8::1 + remote: 2001:db8::2 + vni: 101 + + vxlan_tunnel2: + description: "VXLAN VNI overlaps with vxlan_tunnel1" + local: 192.0.2.9 + remote: 192.0.2.10 + vni: 101 + + vxlan_tunnel3: + description: "VXLAN src/dst address family mismatch" + local: 192.0.2.17 + remote: 2001:db8:1::2 + vni: 102 diff --git a/validator/bondethernet.py b/validator/bondethernet.py index 464e252..dfaaa6e 100644 --- a/validator/bondethernet.py +++ b/validator/bondethernet.py @@ -29,8 +29,14 @@ def get_by_name(yaml, ifname): return None, None +def is_bondethernet(yaml, ifname): + """ Returns True if the interface name is an existing BondEthernet. """ + ifname, iface = get_by_name(yaml, ifname) + return not iface == None + + def is_bond_member(yaml, ifname): - """ Returns True if this interface is a member of a BondEthernet """ + """ Returns True if this interface is a member of a BondEthernet. """ if not 'bondethernets' in yaml: return False diff --git a/validator/bridgedomain.py b/validator/bridgedomain.py index e80120e..2f956bb 100644 --- a/validator/bridgedomain.py +++ b/validator/bridgedomain.py @@ -41,6 +41,29 @@ def get_by_name(yaml, ifname): return None, None +def is_bridgedomain(yaml, ifname): + """ Returns True if the name is an existing bridgedomain (ie bd[0-9]+). """ + ifname, iface = get_by_name(yaml, ifname) + return not iface == None + + +def is_bvi(yaml, ifname): + """ Returns True if the interface name is an existing BVI. """ + if not ifname.startswith("bvi"): + return False + idx = ifname[3:] + if not idx.isnumeric(): + return False + bridgename, bridge = get_by_name(yaml, "bd%d" % (int(idx))) + if not bridge: + return False + + ## If the BridgeDomain has an 'lcp', then it has a Bridge Virtual Interface + if 'lcp' in bridge: + return True + return False + + def get_bridge_interfaces(yaml): """ Returns a list of all interfaces that are bridgedomain members """ diff --git a/validator/interface.py b/validator/interface.py index 4435fab..0698062 100644 --- a/validator/interface.py +++ b/validator/interface.py @@ -14,6 +14,8 @@ import logging import validator.bondethernet as bondethernet import validator.bridgedomain as bridgedomain +import validator.loopback as loopback +import validator.vxlan_tunnel as vxlan_tunnel import validator.lcp as lcp import validator.address as address @@ -237,6 +239,40 @@ def get_encapsulation(yaml, ifname): } +def get_phys(yaml): + """ Return a list of all toplevel (ie. non-sub) interfaces which are + assumed to be physical network cards, eg TenGigabitEthernet1/0/0. Note + that derived/created interfaces such as Tunnels, BondEthernets and + Loopbacks/BVIs are not returned """ + ret = [] + if not 'interfaces' in yaml: + return ret + for ifname, iface in yaml['interfaces'].items(): + if is_phy(yaml, ifname): + ret.append(ifname) + return ret + + +def is_phy(yaml, ifname): + """ Returns True if the ifname is the name of a physical network interface. """ + + ifname, iface = get_by_name(yaml, ifname) + if iface == None: + return False + if is_sub(yaml, ifname): + return False + + if bondethernet.is_bondethernet(yaml, ifname): + return False + if loopback.is_loopback(yaml, ifname): + return False + if bridgedomain.is_bvi(yaml, ifname): + return False + if vxlan_tunnel.is_vxlan_tunnel(yaml, ifname): + return False + return True + + def get_interfaces(yaml): """ Return a list of all interface and sub-interface names """ ret = [] diff --git a/validator/loopback.py b/validator/loopback.py index eee97af..dd235c8 100644 --- a/validator/loopback.py +++ b/validator/loopback.py @@ -27,6 +27,7 @@ def get_loopbacks(yaml): ret.append(ifname) return ret + def get_by_name(yaml, ifname): """ Return the loopback by name, if it exists. Return None otherwise. """ try: @@ -37,6 +38,12 @@ def get_by_name(yaml, ifname): return None, None +def is_loopback(yaml, ifname): + """ Returns True if the interface name is an existing loopback. """ + ifname, iface = get_by_name(yaml, ifname) + return not iface == None + + def validate_loopbacks(yaml): result = True msgs = [] diff --git a/validator/test_bondethernet.py b/validator/test_bondethernet.py index 92ce93f..881e2b9 100644 --- a/validator/test_bondethernet.py +++ b/validator/test_bondethernet.py @@ -23,3 +23,8 @@ class TestBondEthernetMethods(unittest.TestCase): self.assertTrue(bondethernet.is_bond_member(self.cfg, "GigabitEthernet1/0/1")) self.assertFalse(bondethernet.is_bond_member(self.cfg, "GigabitEthernet2/0/0")) self.assertFalse(bondethernet.is_bond_member(self.cfg, "GigabitEthernet2/0/0.100")) + + def test_is_bondethernet(self): + self.assertTrue(bondethernet.is_bondethernet(self.cfg, "BondEthernet0")) + self.assertFalse(bondethernet.is_bondethernet(self.cfg, "BondEthernet-notexist")) + self.assertFalse(bondethernet.is_bondethernet(self.cfg, "GigabitEthernet1/0/0")) diff --git a/validator/test_bridgedomain.py b/validator/test_bridgedomain.py index 52b58b0..35fd10f 100644 --- a/validator/test_bridgedomain.py +++ b/validator/test_bridgedomain.py @@ -18,6 +18,20 @@ class TestBridgeDomainMethods(unittest.TestCase): self.assertIsNone(iface) self.assertIsNone(ifname) + def test_is_bridgedomain(self): + self.assertTrue(bridgedomain.is_bridgedomain(self.cfg, "bd10")) + self.assertTrue(bridgedomain.is_bridgedomain(self.cfg, "bd11")) + self.assertTrue(bridgedomain.is_bridgedomain(self.cfg, "bd12")) + self.assertFalse(bridgedomain.is_bridgedomain(self.cfg, "bd-notexist")) + self.assertFalse(bridgedomain.is_bridgedomain(self.cfg, "GigabitEthernet1/0/0")) + + def test_is_bvi(self): + self.assertFalse(bridgedomain.is_bvi(self.cfg, "bvi10")) + self.assertTrue(bridgedomain.is_bvi(self.cfg, "bvi11")) + self.assertTrue(bridgedomain.is_bvi(self.cfg, "bvi12")) + self.assertFalse(bridgedomain.is_bvi(self.cfg, "bvi-notexist")) + self.assertFalse(bridgedomain.is_bvi(self.cfg, "GigabitEthernet1/0/0")) + def test_members(self): self.assertTrue(bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet1/0/0")) self.assertTrue(bridgedomain.is_bridge_interface(self.cfg, "GigabitEthernet2/0/0.100")) diff --git a/validator/test_interface.py b/validator/test_interface.py index 93824f3..d4160ab 100644 --- a/validator/test_interface.py +++ b/validator/test_interface.py @@ -180,3 +180,14 @@ class TestInterfaceMethods(unittest.TestCase): ifname, iface = interface.get_qinx_parent_by_name(self.cfg, "GigabitEthernet1/0/1.201") self.assertEqual(ifname, "GigabitEthernet1/0/1.200") + + def test_get_phys(self): + phys = interface.get_phys(self.cfg) + print(phys) + self.assertEqual(len(phys), 6) + self.assertIn("GigabitEthernet1/0/0", phys) + self.assertNotIn("GigabitEthernet1/0/0.100", phys) + + def test_is_phy(self): + self.assertTrue(interface.is_phy(self.cfg, "GigabitEthernet1/0/0")) + self.assertFalse(interface.is_phy(self.cfg, "GigabitEthernet1/0/0.100")) diff --git a/validator/test_vxlan_tunnel.py b/validator/test_vxlan_tunnel.py new file mode 100644 index 0000000..1d7a09c --- /dev/null +++ b/validator/test_vxlan_tunnel.py @@ -0,0 +1,36 @@ +import unittest +import yaml +import validator.vxlan_tunnel as vxlan_tunnel + +class TestVXLANMethods(unittest.TestCase): + def setUp(self): + with open("unittest/test_vxlan_tunnel.yaml", "r") as f: + self.cfg = yaml.load(f, Loader = yaml.FullLoader) + + def test_get_by_name(self): + ifname, iface = vxlan_tunnel.get_by_name(self.cfg, "vxlan_tunnel0") + self.assertIsNotNone(iface) + self.assertEqual("vxlan_tunnel0", ifname) + + ifname, iface = vxlan_tunnel.get_by_name(self.cfg, "vxlan_tunnel-noexist") + self.assertIsNone(ifname) + self.assertIsNone(iface) + + def test_is_vxlan_tunnel(self): + self.assertTrue(vxlan_tunnel.is_vxlan_tunnel(self.cfg, "vxlan_tunnel0")) + self.assertFalse(vxlan_tunnel.is_vxlan_tunnel(self.cfg, "vxlan_tunnel-noexist")) + self.assertFalse(vxlan_tunnel.is_vxlan_tunnel(self.cfg, "GigabitEthernet1/0/0")) + + def test_enumerators(self): + ifs = vxlan_tunnel.get_vxlan_tunnels(self.cfg) + self.assertEqual(len(ifs), 4) + self.assertIn("vxlan_tunnel0", ifs) + self.assertIn("vxlan_tunnel1", ifs) + self.assertIn("vxlan_tunnel2", ifs) + self.assertIn("vxlan_tunnel3", ifs) + self.assertNotIn("vxlan_tunnel-noexist", ifs) + + def test_vni_unique(self): + self.assertTrue(vxlan_tunnel.vni_unique(self.cfg, 100)) + self.assertFalse(vxlan_tunnel.vni_unique(self.cfg, 101)) + self.assertTrue(vxlan_tunnel.vni_unique(self.cfg, 102)) diff --git a/validator/vxlan_tunnel.py b/validator/vxlan_tunnel.py index 48df5b7..6438b51 100644 --- a/validator/vxlan_tunnel.py +++ b/validator/vxlan_tunnel.py @@ -27,7 +27,14 @@ def get_by_name(yaml, ifname): return ifname, yaml['vxlan_tunnels'][ifname] except: pass - return None + return None, None + + +def is_vxlan_tunnel(yaml, ifname): + """ Returns True if the interface name is an existing VXLAN Tunnel. """ + ifname, iface = get_by_name(yaml, ifname) + return not iface == None + def vni_unique(yaml, vni): """ Return True if the VNI is unique amongst all VXLANs """ @@ -42,6 +49,17 @@ def vni_unique(yaml, vni): return ncount < 2 +def get_vxlan_tunnels(yaml): + """ Returns a list of all VXLAN tunnel interface names. """ + ret = [] + if not 'vxlan_tunnels' in yaml: + return ret + + for ifname, iface in yaml['vxlan_tunnels'].items(): + ret.append(ifname) + return ret + + def validate_vxlan_tunnels(yaml): result = True msgs = []