acl: rework source/destination
For ACE 'source' and 'destination' is now possible to specify one of: - ipv4 or ipv6 address - ipv4 or ipv6 prefix - name of a prefixlist The validator resolves the src/dst network list, optionally filtering this with the desired 'family' (which defaults to 'any'). Errors are raised if the resulting src/dst network lists do not overlap, that is to say if all src entries are IPv4 but there are no IPv4 dst entries and vise-versa. * Update the example to have a 'trusted' prefixlist. * Update the unit tests to use the new error message(s).
This commit is contained in:
@ -40,28 +40,12 @@ def get_by_name(yaml, aclname):
|
||||
def hydrate_term(acl_term):
|
||||
"""Adds all defaults to an ACL term"""
|
||||
|
||||
# Figure out the address family
|
||||
if "family" not in acl_term:
|
||||
if "source" in acl_term and ":" in acl_term["source"]:
|
||||
acl_term["family"] = "ipv6"
|
||||
elif "destination" in acl_term and ":" in acl_term["destination"]:
|
||||
acl_term["family"] = "ipv6"
|
||||
elif "source" in acl_term and "." in acl_term["source"]:
|
||||
acl_term["family"] = "ipv4"
|
||||
elif "destination" in acl_term and "." in acl_term["destination"]:
|
||||
acl_term["family"] = "ipv4"
|
||||
else:
|
||||
acl_term["family"] = "any"
|
||||
|
||||
# Set source/destionation based on family, if they're omitted
|
||||
if acl_term["family"] == "ipv4" and "source" not in acl_term:
|
||||
acl_term["source"] = "0.0.0.0/0"
|
||||
if acl_term["family"] == "ipv4" and "destination" not in acl_term:
|
||||
acl_term["destination"] = "0.0.0.0/0"
|
||||
if acl_term["family"] == "ipv6" and "source" not in acl_term:
|
||||
acl_term["source"] = "::/0"
|
||||
if acl_term["family"] == "ipv6" and "destination" not in acl_term:
|
||||
acl_term["destination"] = "::/0"
|
||||
acl_term["family"] = "any"
|
||||
if "source" not in acl_term:
|
||||
acl_term["source"] = "any"
|
||||
if "destination" not in acl_term:
|
||||
acl_term["destination"] = "any"
|
||||
|
||||
if "protocol" not in acl_term or acl_term["protocol"] == "any":
|
||||
acl_term["protocol"] = 0
|
||||
@ -181,6 +165,13 @@ def get_network_list(yaml, network_string, want_ipv4=True, want_ipv6=True):
|
||||
ret = [ipn]
|
||||
return ret
|
||||
|
||||
if network_string == "any":
|
||||
if want_ipv4:
|
||||
ret.append(ipaddress.ip_network("0.0.0.0/0"))
|
||||
if want_ipv6:
|
||||
ret.append(ipaddress.ip_network("::/0"))
|
||||
return ret
|
||||
|
||||
return prefixlist.get_network_list(
|
||||
yaml, network_string, want_ipv4=want_ipv4, want_ipv6=want_ipv6
|
||||
)
|
||||
@ -211,6 +202,16 @@ def get_protocol(protostring):
|
||||
return None
|
||||
|
||||
|
||||
def network_list_has_family(network_list, version):
|
||||
"""Returns True if the given list of ip_network() elements has at least one
|
||||
element with the specified version, which can be either 4 or 6. Return False
|
||||
otherwise"""
|
||||
for m in network_list:
|
||||
if m.version == version:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def validate_acls(yaml):
|
||||
"""Validate the semantics of all YAML 'acls' entries"""
|
||||
result = True
|
||||
@ -230,25 +231,53 @@ def validate_acls(yaml):
|
||||
logger.debug(
|
||||
f"acl {aclname} term {terms} orig {orig_acl_term} hydrated {acl_term}"
|
||||
)
|
||||
if acl_term["family"] == "any":
|
||||
if "source" in acl_term:
|
||||
msgs.append(
|
||||
f"acl {aclname} term {terms} family any cannot have source"
|
||||
)
|
||||
result = False
|
||||
if "destination" in acl_term:
|
||||
msgs.append(
|
||||
f"acl {aclname} term {terms} family any cannot have destination"
|
||||
)
|
||||
result = False
|
||||
if acl_term["family"] == "ipv4":
|
||||
want_ipv4 = True
|
||||
want_ipv6 = False
|
||||
elif acl_term["family"] == "ipv6":
|
||||
want_ipv4 = False
|
||||
want_ipv6 = True
|
||||
else:
|
||||
src = ipaddress.ip_network(acl_term["source"])
|
||||
dst = ipaddress.ip_network(acl_term["destination"])
|
||||
if src.version != dst.version:
|
||||
msgs.append(
|
||||
f"acl {aclname} term {terms} source and destination have different address family"
|
||||
)
|
||||
result = False
|
||||
want_ipv4 = True
|
||||
want_ipv6 = True
|
||||
|
||||
src_network_list = get_network_list(
|
||||
yaml, acl_term["source"], want_ipv4=want_ipv4, want_ipv6=want_ipv6
|
||||
)
|
||||
dst_network_list = get_network_list(
|
||||
yaml, acl_term["destination"], want_ipv4=want_ipv4, want_ipv6=want_ipv6
|
||||
)
|
||||
logger.debug(
|
||||
f"acl {aclname} term {terms} src: {src_network_list} dst: {dst_network_list}"
|
||||
)
|
||||
if len(src_network_list) == 0:
|
||||
msgs.append(
|
||||
f"acl {aclname} term {terms} family {acl_term['family']} has no source"
|
||||
)
|
||||
result = False
|
||||
if len(dst_network_list) == 0:
|
||||
msgs.append(
|
||||
f"acl {aclname} term {terms} family {acl_term['family']} has no destination"
|
||||
)
|
||||
result = False
|
||||
if len(dst_network_list) == 0 or len(src_network_list) == 0:
|
||||
## Pointless to continue if there's no src/dst at all
|
||||
continue
|
||||
|
||||
src_network_has_ipv4 = network_list_has_family(src_network_list, 4)
|
||||
dst_network_has_ipv4 = network_list_has_family(dst_network_list, 4)
|
||||
src_network_has_ipv6 = network_list_has_family(src_network_list, 6)
|
||||
dst_network_has_ipv6 = network_list_has_family(dst_network_list, 6)
|
||||
|
||||
if (
|
||||
src_network_has_ipv4 != dst_network_has_ipv4
|
||||
and src_network_has_ipv6 != dst_network_has_ipv6
|
||||
):
|
||||
msgs.append(
|
||||
f"acl {aclname} term {terms} source and destination family do not overlap"
|
||||
)
|
||||
result = False
|
||||
continue
|
||||
|
||||
proto = get_protocol(acl_term["protocol"])
|
||||
if proto is None:
|
||||
@ -266,8 +295,7 @@ def validate_acls(yaml):
|
||||
f"acl {aclname} term {terms} destination-port can only be specified for protocol tcp or udp"
|
||||
)
|
||||
result = False
|
||||
|
||||
if proto in [6, 17]:
|
||||
else:
|
||||
src_low_port, src_high_port = get_port_low_high(acl_term["source-port"])
|
||||
dst_low_port, dst_high_port = get_port_low_high(
|
||||
acl_term["destination-port"]
|
||||
@ -328,7 +356,7 @@ def validate_acls(yaml):
|
||||
f"acl {aclname} term {terms} icmp-type can only be specified for protocol icmp or icmp-ipv6"
|
||||
)
|
||||
result = False
|
||||
if proto in [1, 58]:
|
||||
else:
|
||||
icmp_code_low, icmp_code_high = get_icmp_low_high(acl_term["icmp-code"])
|
||||
icmp_type_low, icmp_type_high = get_icmp_low_high(acl_term["icmp-type"])
|
||||
if icmp_code_low > icmp_code_high:
|
||||
|
@ -152,3 +152,20 @@ class TestACLMethods(unittest.TestCase):
|
||||
l = acl.get_network_list(self.cfg, "pl-notexist")
|
||||
self.assertIsInstance(l, list)
|
||||
self.assertEquals(0, len(l))
|
||||
|
||||
def test_network_list_has_family(self):
|
||||
l = acl.get_network_list(self.cfg, "trusted")
|
||||
self.assertTrue(acl.network_list_has_family(l, 4))
|
||||
self.assertTrue(acl.network_list_has_family(l, 6))
|
||||
|
||||
l = acl.get_network_list(self.cfg, "trusted", want_ipv4=False)
|
||||
self.assertFalse(acl.network_list_has_family(l, 4))
|
||||
self.assertTrue(acl.network_list_has_family(l, 6))
|
||||
|
||||
l = acl.get_network_list(self.cfg, "trusted", want_ipv6=False)
|
||||
self.assertTrue(acl.network_list_has_family(l, 4))
|
||||
self.assertFalse(acl.network_list_has_family(l, 6))
|
||||
|
||||
l = acl.get_network_list(self.cfg, "trusted", want_ipv4=False, want_ipv6=False)
|
||||
self.assertFalse(acl.network_list_has_family(l, 4))
|
||||
self.assertFalse(acl.network_list_has_family(l, 6))
|
||||
|
Reference in New Issue
Block a user