Add prefixlist (mixed IPv4 and IPv6, containing either IP addresses or prefixes + tests
This commit is contained in:
		| @@ -38,6 +38,7 @@ from .interface import validate_interfaces | ||||
| from .bridgedomain import validate_bridgedomains | ||||
| from .vxlan_tunnel import validate_vxlan_tunnels | ||||
| from .tap import validate_taps | ||||
| from .prefixlist import validate_prefixlists | ||||
| from .acl import validate_acls | ||||
|  | ||||
|  | ||||
| @@ -90,6 +91,7 @@ class Validator: | ||||
|             validate_bridgedomains, | ||||
|             validate_vxlan_tunnels, | ||||
|             validate_taps, | ||||
|             validate_prefixlists, | ||||
|             validate_acls, | ||||
|         ] | ||||
|  | ||||
|   | ||||
							
								
								
									
										104
									
								
								vppcfg/config/prefixlist.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										104
									
								
								vppcfg/config/prefixlist.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,104 @@ | ||||
| # | ||||
| # Copyright (c) 2023 Pim van Pelt | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at: | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| # | ||||
| """ A vppcfg configuration module that validates prefixlists """ | ||||
| import logging | ||||
| import socket | ||||
| import ipaddress | ||||
|  | ||||
|  | ||||
| def get_prefixlists(yaml): | ||||
|     """Return a list of all prefixlists.""" | ||||
|     ret = [] | ||||
|     if "prefixlists" in yaml: | ||||
|         for plname, _pl in yaml["prefixlists"].items(): | ||||
|             ret.append(plname) | ||||
|     return ret | ||||
|  | ||||
|  | ||||
| def get_by_name(yaml, plname): | ||||
|     """Return the prefixlist by name, if it exists. Return None otherwise.""" | ||||
|     try: | ||||
|         if plname in yaml["prefixlists"]: | ||||
|             return plname, yaml["prefixlists"][plname] | ||||
|     except KeyError: | ||||
|         pass | ||||
|     return None, None | ||||
|  | ||||
|  | ||||
| def count(yaml, plname): | ||||
|     """Return the number of IPv4 and IPv6 entries in the prefixlist. | ||||
|     Returns 0, 0 if it doesn't exist""" | ||||
|     v4, v6 = 0, 0 | ||||
|  | ||||
|     plname, pl = get_by_name(yaml, plname) | ||||
|     if not pl: | ||||
|         return 0, 0 | ||||
|     for m in pl["members"]: | ||||
|         ipn = ipaddress.ip_network(m, strict=False) | ||||
|         if ipn.version == 4: | ||||
|             v4 += 1 | ||||
|         elif ipn.version == 6: | ||||
|             v6 += 1 | ||||
|  | ||||
|     return v4, v6 | ||||
|  | ||||
|  | ||||
| def count_ipv4(yaml, plname): | ||||
|     """Return the number of IPv4 entries in the prefixlist.""" | ||||
|     v4, v6 = count(yaml, plname) | ||||
|     return v4 | ||||
|  | ||||
|  | ||||
| def count_ipv6(yaml, plname): | ||||
|     """Return the number of IPv6 entries in the prefixlist.""" | ||||
|     v4, v6 = count(yaml, plname) | ||||
|     return v6 | ||||
|  | ||||
|  | ||||
| def has_ipv4(yaml, plname): | ||||
|     """Return True if the prefixlist has at least one IPv4 entry.""" | ||||
|     v4, v6 = count(yaml, plname) | ||||
|     return v4 > 0 | ||||
|  | ||||
|  | ||||
| def has_ipv6(yaml, plname): | ||||
|     """Return True if the prefixlist has at least one IPv6 entry.""" | ||||
|     v4, v6 = count(yaml, plname) | ||||
|     return v6 > 0 | ||||
|  | ||||
|  | ||||
| def is_empty(yaml, plname): | ||||
|     """Return True if the prefixlist has no entries.""" | ||||
|     v4, v6 = count(yaml, plname) | ||||
|     return v4 + v6 == 0 | ||||
|  | ||||
|  | ||||
| def validate_prefixlists(yaml): | ||||
|     """Validate the semantics of all YAML 'prefixlists' entries""" | ||||
|     result = True | ||||
|     msgs = [] | ||||
|     logger = logging.getLogger("vppcfg.config") | ||||
|     logger.addHandler(logging.NullHandler()) | ||||
|  | ||||
|     if not "prefixlists" in yaml: | ||||
|         return result, msgs | ||||
|  | ||||
|     for plname, pl in yaml["prefixlists"].items(): | ||||
|         logger.debug(f"prefixlist {plname}: {pl}") | ||||
|         members = 0 | ||||
|         for pl_member in pl["members"]: | ||||
|             members += 1 | ||||
|             logger.debug(f"prefixlist {plname} member {members} is {pl_member}") | ||||
|  | ||||
|     return result, msgs | ||||
							
								
								
									
										77
									
								
								vppcfg/config/test_prefixlist.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										77
									
								
								vppcfg/config/test_prefixlist.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,77 @@ | ||||
| # | ||||
| # Copyright (c) 2022 Pim van Pelt | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at: | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| # | ||||
| # -*- coding: utf-8 -*- | ||||
| """ Unit tests for taps """ | ||||
| import unittest | ||||
| import yaml | ||||
| from . import prefixlist | ||||
| from .unittestyaml import UnitTestYaml | ||||
|  | ||||
|  | ||||
| class TestACLMethods(unittest.TestCase): | ||||
|     def setUp(self): | ||||
|         with UnitTestYaml("test_prefixlist.yaml") as f: | ||||
|             self.cfg = yaml.load(f, Loader=yaml.FullLoader) | ||||
|  | ||||
|     def test_get_prefixlists(self): | ||||
|         plist = prefixlist.get_prefixlists(self.cfg) | ||||
|         self.assertIsInstance(plist, list) | ||||
|         self.assertEqual(5, len(plist)) | ||||
|  | ||||
|     def test_get_by_name(self): | ||||
|         plname, _pl = prefixlist.get_by_name(self.cfg, "trusted") | ||||
|         self.assertIsNotNone(_pl) | ||||
|         self.assertEqual("trusted", plname) | ||||
|  | ||||
|         plname, _pl = prefixlist.get_by_name(self.cfg, "pl-noexist") | ||||
|         self.assertIsNone(plname) | ||||
|         self.assertIsNone(_pl) | ||||
|  | ||||
|     def test_count(self): | ||||
|         v4, v6 = prefixlist.count(self.cfg, "trusted") | ||||
|         self.assertEqual(2, v4) | ||||
|         self.assertEqual(2, v6) | ||||
|  | ||||
|         v4, v6 = prefixlist.count(self.cfg, "empty") | ||||
|         self.assertEqual(0, v4) | ||||
|         self.assertEqual(0, v6) | ||||
|  | ||||
|         v4, v6 = prefixlist.count(self.cfg, "pl-noexist") | ||||
|         self.assertEqual(0, v4) | ||||
|         self.assertEqual(0, v6) | ||||
|  | ||||
|     def test_count_ipv4(self): | ||||
|         self.assertEqual(2, prefixlist.count_ipv4(self.cfg, "trusted")) | ||||
|         self.assertEqual(0, prefixlist.count_ipv4(self.cfg, "empty")) | ||||
|         self.assertEqual(0, prefixlist.count_ipv4(self.cfg, "pl-noexist")) | ||||
|  | ||||
|     def test_count_ipv6(self): | ||||
|         self.assertEqual(2, prefixlist.count_ipv6(self.cfg, "trusted")) | ||||
|         self.assertEqual(0, prefixlist.count_ipv6(self.cfg, "empty")) | ||||
|         self.assertEqual(0, prefixlist.count_ipv6(self.cfg, "pl-noexist")) | ||||
|  | ||||
|     def test_has_ipv4(self): | ||||
|         self.assertTrue(prefixlist.has_ipv4(self.cfg, "trusted")) | ||||
|         self.assertFalse(prefixlist.has_ipv4(self.cfg, "empty")) | ||||
|         self.assertFalse(prefixlist.has_ipv4(self.cfg, "pl-noexist")) | ||||
|  | ||||
|     def test_has_ipv6(self): | ||||
|         self.assertTrue(prefixlist.has_ipv6(self.cfg, "trusted")) | ||||
|         self.assertFalse(prefixlist.has_ipv6(self.cfg, "empty")) | ||||
|         self.assertFalse(prefixlist.has_ipv6(self.cfg, "pl-noexist")) | ||||
|  | ||||
|     def test_is_empty(self): | ||||
|         self.assertFalse(prefixlist.is_empty(self.cfg, "trusted")) | ||||
|         self.assertTrue(prefixlist.is_empty(self.cfg, "empty")) | ||||
|         self.assertTrue(prefixlist.is_empty(self.cfg, "pl-noexist")) | ||||
		Reference in New Issue
	
	Block a user