acl: add dumper for acls

A reasonable attempt will be made to shorten the output of terms, but
due to the nature of the ACL plugin in VPP, all ACLs will be unrolled
into their individual ACEs (called 'terms').

- src/dst-port will only be emitted with UDP/TCP
- icmp-typc/code will only be emitted with ICMP/ICMPv6
- icmp-code/type and source/destination-ports ranges will be collapsed
  where appropriate.
- if protocol is 0, only L3 information will be emitted

NOTE: a bug in the VPP plugin will allow for ICMP 'sport' and 'dport'
upper value to be 16 bits. If an ACE is retrieved from the dataplane
regarding an ICMP or ICMPv6 (referring the 16 bit values to icmp type
and code), they will be truncated and a warning issued.
This commit is contained in:
Pim van Pelt
2023-01-16 17:12:48 +00:00
parent efef03ea42
commit 02ca2e22cd
2 changed files with 118 additions and 0 deletions

View File

@ -65,6 +65,8 @@ class Dumper(VPPApi):
"bridgedomains": {},
"vxlan_tunnels": {},
"taps": {},
"prefixlists": {},
"acls": {},
}
for idx, bond_iface in self.cache["bondethernets"].items():
bond = {"description": ""}
@ -246,5 +248,92 @@ class Dumper(VPPApi):
bridge["interfaces"] = members
bridge["mtu"] = mtu
config["bridgedomains"][bridge_name] = bridge
for idx, acl in self.cache["acls"].items():
aclname = f"vppacl{acl.acl_index}"
config_acl = {"description": "", "terms": []}
terms = 0
for acl_rule in acl.r:
terms += 1
action = "deny"
if acl_rule.is_permit == 1:
action = "permit"
elif acl_rule.is_permit == 2:
action = "permit+reflect"
config_term = {
"action": action,
"source": str(acl_rule.src_prefix),
"destination": str(acl_rule.dst_prefix),
}
if acl_rule.proto == 0:
pass
elif acl_rule.proto in [1, 58]:
if acl_rule.proto == 1:
config_term["protocol"] = "icmp"
else:
config_term["protocol"] = "ipv6-icmp"
maxval = acl_rule.srcport_or_icmptype_last
if maxval > 255:
self.logger.warning(
f"icmp type > 255 on acl {acl.acl_index} term {terms}"
)
maxval = 255
if acl_rule.srcport_or_icmptype_first == maxval:
config_term["icmp-type"] = int(
acl_rule.srcport_or_icmptype_first
)
else:
config_term[
"icmp-type"
] = f"{acl_rule.srcport_or_icmptype_first}-{maxval}"
maxval = acl_rule.dstport_or_icmpcode_last
if maxval > 255:
self.logger.warning(
f"icmp code > 255 on acl {acl.acl_index} term {terms}"
)
maxval = 255
if acl_rule.dstport_or_icmpcode_first == maxval:
config_term["icmp-code"] = int(
acl_rule.dstport_or_icmpcode_first
)
else:
config_term[
"icmp-code"
] = f"{acl_rule.dstport_or_icmpcode_first}-{maxval}"
elif acl_rule.proto in [6, 17]:
if acl_rule.proto == 6:
config_term["protocol"] = "tcp"
else:
config_term["protocol"] = "udp"
if (
acl_rule.srcport_or_icmptype_first
== acl_rule.srcport_or_icmptype_last
):
config_term["source-port"] = int(
acl_rule.srcport_or_icmptype_first
)
else:
config_term[
"source-port"
] = f"{acl_rule.srcport_or_icmptype_first}-{acl_rule.srcport_or_icmptype_last}"
if (
acl_rule.dstport_or_icmpcode_first
== acl_rule.dstport_or_icmpcode_last
):
config_term["destination-port"] = int(
acl_rule.dstport_or_icmpcode_first
)
else:
config_term[
"destination-port"
] = f"{acl_rule.dstport_or_icmpcode_first}-{acl_rule.dstport_or_icmpcode_last}"
else:
config_term["protocol"] = int(acl_rule.proto)
config_acl["terms"].append(config_term)
config["acls"][aclname] = config_acl
return config

View File

@ -119,12 +119,14 @@ class VPPApi:
"interface_names": {},
"interfaces": {},
"interface_addresses": {},
"interface_acls": {},
"bondethernets": {},
"bondethernet_members": {},
"bridgedomains": {},
"vxlan_tunnels": {},
"l2xcs": {},
"taps": {},
"acls": {},
}
return True
@ -196,6 +198,7 @@ class VPPApi:
if len(self.cache["interface_addresses"][iface.sw_if_index]) > 0:
self.logger.warning(f"Not all addresses were removed on {ifname}")
del self.cache["interface_addresses"][iface.sw_if_index]
del self.cache["interface_acls"][iface.sw_if_index]
del self.cache["interface_names"][ifname]
## Use my_dict.pop('key', None), as it allows 'key' to be absent
@ -246,6 +249,14 @@ class VPPApi:
interface_dev_type="local",
tag="mock",
)
self.cache["interface_acls"][idx] = self.vpp_messages[
"acl_interface_list_details"
].tuple(
sw_if_index=idx,
count=0,
n_input=0,
acls=[],
)
## Add mock PHYs
for ifname, iface in yaml_config["interfaces"].items():
if not "device-type" in iface or iface["device-type"] not in ["dpdk"]:
@ -277,6 +288,14 @@ class VPPApi:
interface_dev_type=iface["device-type"],
tag="mock",
)
self.cache["interface_acls"][idx] = self.vpp_messages[
"acl_interface_list_details"
].tuple(
sw_if_index=idx,
count=0,
n_input=0,
acls=[],
)
## Create interface_names and interface_address indexes
for idx, iface in self.cache["interfaces"].items():
@ -332,6 +351,16 @@ class VPPApi:
str(addr.prefix)
)
self.logger.debug("Retrieving ACLs")
api_response = self.vpp.api.acl_dump(acl_index=0xFFFFFFFF)
for acl in api_response:
self.cache["acls"][acl.acl_index] = acl
self.logger.debug("Retrieving interface ACLs")
api_response = self.vpp.api.acl_interface_list_dump()
for iface in api_response:
self.cache["interface_acls"][iface.sw_if_index] = iface
self.logger.debug("Retrieving bondethernets")
api_response = self.vpp.api.sw_bond_interface_dump()
for iface in api_response: