Merge ACL plugin work so-far

This commit is contained in:
Pim van Pelt
2023-06-23 00:50:19 +02:00
18 changed files with 1390 additions and 0 deletions

View File

@ -369,3 +369,127 @@ interfaces:
dot1q: 200 dot1q: 200
exact-match: False exact-match: False
``` ```
### Prefix Lists
This construct allows to enumerate a list of IPv4 or IPv6 host addresses and/or networks. Each
prefixlist has a name which consists of anywhere between 1 and 56 characters, and it must start
with a letter. The prefixlist name `any` is reserved. The syntax is straight forward:
* ***description***: A string, no longer than 64 characters, and excluding the single quote '
and double quote ". This string is currently not used anywhere, and serves for enduser
documentation purposes.
* ***members***: A list of zero or more entries which can take the form:
* ***IPv4 Host***: an IPv4 address, eg. `192.0.2.1`
* ***IPv4 Prefix***: an IPv6 prefix, eg. `192.0.2.0/24`
* ***IPv6 Host***: an IPv4 address, eg. `2001:db8::1`
* ***IPv6 Prefix***: an IPv6 prefix, eg. `2001:db8::0/64`
***NOTE***: It is valid to have host addresses with prefixlen, for example `192.168.1.1/24`
in other words, the prefix can be either a network or a host.
A few examples:
```
prefixlists:
example:
description: "An example prefixlist with hosts and prefixes"
members:
- 192.0.2.1
- 192.0.2.0/24
- 2001:db8::1
- 2001:db8::/64
empty:
description: "An empty prefixlist"
members: []
```
### Access Control Lists
In VPP, a common firewall function is provided by the `acl-plugin`. The anatomy of this plugin
is as follows. First, an ACL consists of one or more Access Control Elements or `ACE`s. These
can match on IPv4 or IPv6 source/destination, an IP protocol, and then for TCP/UDP a range
of source- and destination ports, and for ICMP a range of ICMP type and codes. Any matching
packets then either perform an action of `permit` or `deny` (for stateless) or `permit+reflect`
(stateful). The full syntax is as follows:
* ***description***: A string, no longer than 64 characters, and excluding the single quote '
and double quote ". This string is currently not used anywhere, and serves for enduser
documentation purposes.
* ***terms***: A list of Access Control Elements:
* ***action***: What to do upon match, can be either `permit`, `deny` or `permit+reflect`.
This is the only required field.
* ***family***: Which IP address family to match, can be either `ipv4`, or `ipv6` or `any`,
which is the default. If `any` is used, this term will also operate on any source and
destination addresses, and it will emit two ACEs, one for each address family.
* ***source***: Either an IPv4 or IPv6 host (without prefixlen, eg. `192.0.2.1` or
`2001:db8::1`), an IPv4 or IPv6 prefix (with prefixlen, eg. `192.0.2.0/24` or
`2001:db8::/64`), or a reference to the name of an existing _prefixlist_ (eg. `trusted`).
If left empty, this means all IPv4 and IPv6 (ie. `[ 0.0.0.0/0, ::/0 ]`).
* ***destination***: Similar to `source`, but for the destination field of the packets.
* ***protocol***: The L4 protocol, can be either a numeric value (eg. `6`), or a symbolic
string value from `/etc/protocols` (eg. `tcp`). If omitted, only L3 matches are performed.
* ***source-port***: When `TCP` or `UDP` are specified, this field specified which source
port(s) are matched. It can be either a numeric value (eg. `80`), a symbolic string value
from `/etc/services` (eg. `www`), a numeric range with start and/or end ranges (eg. `-1024`
for all ports from 0-1024 inclusive; or `1024-` for all ports from 1024-65535 inclusive,
or an actual range `49152-65535`). The default keyword `any` is also permitted, which results
in range `0-65535`, and is the default if the field is not specified.
* ***destination-port***: Similar to `source-port` but for the destination port field in the
`TCP` or `UDP` header.
* ***icmp-type***: It can be either a numeric value (eg. `3`), a numeric range with start
and/or end ranges (eg. `-10` for all types from 0-10 inclusive; or `10-` for all types from
10-255 inclusive, or an actual range `10-15`). The default keyword `any` is also permitted,
which results in range `0-255`, and is the default if the field is not specified. This field
can only be specified if the `protocol` field is `icmp` (1) or `ipv6-icmp` (58).
* ***icmp-code***: Similar to `icmp-type` but for the ICMP code field. This field can only be
specified if the `protocol` field is `icmp` (1) or `ipv6-icmp` (58).
An example ACL with four ACE terms:
```
prefixlists:
example:
description: "An example prefixlist with hosts and prefixes"
members:
- 192.0.2.1
- 192.0.2.0/24
- 2001:db8::1
- 2001:db8::/64
acls:
acl01:
description: "Test ACL"
terms:
- description: "Allow a prefixlist, but only for IPv6"
family: ipv6
action: permit
source: example
- description: "Allow a specific IPv6 TCP flow"
action: permit
source: 2001:db8::/64
destination: 2001:db8:1::/64
protocol: tcp
destination-port: www
source-port: "1024-65535"
- description: "Allow IPv4 ICMP Destination Unreachable, any code"
family: ipv4
action: permit
protocol: icmp
icmp-type: 3
icmp-code: any
- description: "Deny any IPv4 or IPv6"
action: deny
```
One or more of these ACLs are then applied to an interface in either the `input` or the `output`
direction:
```
interfaces:
GigabitEthernet3/0/0:
acls:
input: acl01
output: [ acl02, acl03 ]
```
The configuration here is tolerant of either a singleton (a literal string referring to the one
ACL that must be applied), or a _list_ of strings to more than one ACL, in which case they will
be tested in order (with a first-match return value).

View File

@ -38,6 +38,8 @@ from .interface import validate_interfaces
from .bridgedomain import validate_bridgedomains from .bridgedomain import validate_bridgedomains
from .vxlan_tunnel import validate_vxlan_tunnels from .vxlan_tunnel import validate_vxlan_tunnels
from .tap import validate_taps from .tap import validate_taps
from .prefixlist import validate_prefixlists
from .acl import validate_acls
class IPInterfaceWithPrefixLength(validators.Validator): class IPInterfaceWithPrefixLength(validators.Validator):
@ -90,6 +92,8 @@ class Validator:
validate_bridgedomains, validate_bridgedomains,
validate_vxlan_tunnels, validate_vxlan_tunnels,
validate_taps, validate_taps,
validate_prefixlists,
validate_acls,
] ]
def validate(self, yaml): def validate(self, yaml):

373
vppcfg/config/acl.py Normal file
View File

@ -0,0 +1,373 @@
#
# Copyright (c) 2023 Pim van Pelt
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
""" A vppcfg configuration module that validates acls """
import logging
import socket
import ipaddress
from . import prefixlist
def get_acls(yaml):
"""Return a list of all acls."""
ret = []
if "acls" in yaml:
for aclname, _acl in yaml["acls"].items():
ret.append(aclname)
return ret
def get_by_name(yaml, aclname):
"""Return the acl by name, if it exists. Return None otherwise."""
try:
if aclname in yaml["acls"]:
return aclname, yaml["acls"][aclname]
except KeyError:
pass
return None, None
def hydrate_term(acl_term):
"""Adds all defaults to an ACL term"""
if "family" not in acl_term:
acl_term["family"] = "any"
if "source" not in acl_term:
acl_term["source"] = "any"
if "destination" not in acl_term:
acl_term["destination"] = "any"
if "protocol" not in acl_term or acl_term["protocol"] == "any":
acl_term["protocol"] = 0
if "source-port" not in acl_term:
acl_term["source-port"] = "any"
if "destination-port" not in acl_term:
acl_term["destination-port"] = "any"
if "icmp-code" not in acl_term:
acl_term["icmp-code"] = "any"
if "icmp-type" not in acl_term:
acl_term["icmp-type"] = "any"
return acl_term
def get_icmp_low_high(icmpstring):
"""For a given icmp string, which can be either an integer or a range of
integers including start/stop being omitted, eg 0-255, 10- or -10, or the
string "any", return a tuple of (lowport, highport) or (None, None) upon
error"""
if isinstance(icmpstring, int):
return int(icmpstring), int(icmpstring)
if "any" == icmpstring:
return 0, 255
try:
icmp = int(icmpstring)
if icmp > 0:
return icmp, icmp
except:
pass
if icmpstring.startswith("-"):
icmp = int(icmpstring[1:])
return 0, icmp
if icmpstring.endswith("-"):
icmp = int(icmpstring[:-1])
return icmp, 255
try:
icmps = icmpstring.split("-")
return int(icmps[0]), int(icmps[1])
except:
pass
return None, None
def get_port_low_high(portstring):
"""For a given port string, which can be either an integer, a symbolic port name
in /etc/services, a range of integers including start/stop being omitted, eg
0-65535, 1024- or -1024, or the string "any", return a tuple of
(lowport, highport) or (None, None) upon error"""
if isinstance(portstring, int):
return int(portstring), int(portstring)
if "any" == portstring:
return 0, 65535
try:
port = int(portstring)
if port > 0:
return port, port
except:
pass
try:
port = socket.getservbyname(portstring)
return port, port
except:
pass
if portstring.startswith("-"):
port = int(portstring[1:])
return 0, port
if portstring.endswith("-"):
port = int(portstring[:-1])
return port, 65535
try:
ports = portstring.split("-")
return int(ports[0]), int(ports[1])
except:
pass
return None, None
def is_ip(ip_string):
"""Return True if the given ip_string is either an IPv4/IPv6 address or prefix."""
if not isinstance(ip_string, str):
return False
try:
ipn = ipaddress.ip_network(ip_string, strict=False)
return True
except:
pass
return False
def get_network_list(yaml, network_string, want_ipv4=True, want_ipv6=True):
"""Return the full list of source or destination address(es). This function resolves the
'source' or 'destination' field, which can either be an IP address, a Prefix, or the name
of a Prefix List. It returns a list of ip_network() objects, including prefix. IP addresses
will receive prefixlen /32 or /128. Optionally, want_ipv4 or want_ipv6 can be set to False
to filter the list."""
ret = []
if is_ip(network_string):
ipn = ipaddress.ip_network(network_string, strict=False)
if ipn.version == 4 and want_ipv4:
ret = [ipn]
if ipn.version == 6 and want_ipv6:
ret = [ipn]
return ret
if network_string == "any":
if want_ipv4:
ret.append(ipaddress.ip_network("0.0.0.0/0"))
if want_ipv6:
ret.append(ipaddress.ip_network("::/0"))
return ret
return prefixlist.get_network_list(
yaml, network_string, want_ipv4=want_ipv4, want_ipv6=want_ipv6
)
def get_protocol(protostring):
"""For a given protocol string, which can be either an integer or a symbolic port
name in /etc/protocols, return the protocol number as integer, or None if it cannot
be determined."""
if isinstance(protostring, int):
return int(protostring)
if "any" == protostring:
return 0
try:
proto = int(protostring)
if proto > 0:
return proto
except:
pass
try:
proto = socket.getprotobyname(protostring)
return proto
except:
pass
return None
def network_list_has_family(network_list, version):
"""Returns True if the given list of ip_network() elements has at least one
element with the specified version, which can be either 4 or 6. Return False
otherwise"""
for m in network_list:
if m.version == version:
return True
return False
def validate_acls(yaml):
"""Validate the semantics of all YAML 'acls' entries"""
result = True
msgs = []
logger = logging.getLogger("vppcfg.config")
logger.addHandler(logging.NullHandler())
if not "acls" in yaml:
return result, msgs
for aclname, acl in yaml["acls"].items():
terms = 0
for acl_term in acl["terms"]:
terms += 1
orig_acl_term = acl_term.copy()
acl_term = hydrate_term(acl_term)
logger.debug(
f"acl {aclname} term {terms} orig {orig_acl_term} hydrated {acl_term}"
)
if acl_term["family"] == "ipv4":
want_ipv4 = True
want_ipv6 = False
elif acl_term["family"] == "ipv6":
want_ipv4 = False
want_ipv6 = True
else:
want_ipv4 = True
want_ipv6 = True
src_network_list = get_network_list(
yaml, acl_term["source"], want_ipv4=want_ipv4, want_ipv6=want_ipv6
)
dst_network_list = get_network_list(
yaml, acl_term["destination"], want_ipv4=want_ipv4, want_ipv6=want_ipv6
)
logger.debug(
f"acl {aclname} term {terms} src: {src_network_list} dst: {dst_network_list}"
)
if len(src_network_list) == 0:
msgs.append(
f"acl {aclname} term {terms} family {acl_term['family']} has no source"
)
result = False
if len(dst_network_list) == 0:
msgs.append(
f"acl {aclname} term {terms} family {acl_term['family']} has no destination"
)
result = False
if len(dst_network_list) == 0 or len(src_network_list) == 0:
## Pointless to continue if there's no src/dst at all
continue
src_network_has_ipv4 = network_list_has_family(src_network_list, 4)
dst_network_has_ipv4 = network_list_has_family(dst_network_list, 4)
src_network_has_ipv6 = network_list_has_family(src_network_list, 6)
dst_network_has_ipv6 = network_list_has_family(dst_network_list, 6)
if (
src_network_has_ipv4 != dst_network_has_ipv4
and src_network_has_ipv6 != dst_network_has_ipv6
):
msgs.append(
f"acl {aclname} term {terms} source and destination family do not overlap"
)
result = False
continue
proto = get_protocol(acl_term["protocol"])
if proto is None:
msgs.append(f"acl {aclname} term {terms} could not understand protocol")
result = False
if not proto in [6, 17]:
if "source-port" in orig_acl_term:
msgs.append(
f"acl {aclname} term {terms} source-port can only be specified for protocol tcp or udp"
)
result = False
if "destination-port" in orig_acl_term:
msgs.append(
f"acl {aclname} term {terms} destination-port can only be specified for protocol tcp or udp"
)
result = False
else:
src_low_port, src_high_port = get_port_low_high(acl_term["source-port"])
dst_low_port, dst_high_port = get_port_low_high(
acl_term["destination-port"]
)
if src_low_port is None or src_high_port is None:
msgs.append(
f"acl {aclname} term {terms} could not understand source-port"
)
result = False
else:
if src_low_port > src_high_port:
msgs.append(
f"acl {aclname} term {terms} source-port low value is greater than high value"
)
result = False
if src_low_port < 0 or src_low_port > 65535:
msgs.append(
f"acl {aclname} term {terms} source-port low value is not between [0,65535]"
)
result = False
if src_high_port < 0 or src_high_port > 65535:
msgs.append(
f"acl {aclname} term {terms} source-port high value is not between [0,65535]"
)
result = False
if dst_low_port is None or dst_high_port is None:
msgs.append(
f"acl {aclname} term {terms} could not understand destination-port"
)
result = False
else:
if dst_low_port > dst_high_port:
msgs.append(
f"acl {aclname} term {terms} destination-port low value is greater than high value"
)
result = False
if dst_low_port < 0 or dst_low_port > 65535:
msgs.append(
f"acl {aclname} term {terms} destination-port low value is not between [0,65535]"
)
result = False
if dst_high_port < 0 or dst_high_port > 65535:
msgs.append(
f"acl {aclname} term {terms} destination-port high value is not between [0,65535]"
)
result = False
if not proto in [1, 58]:
if "icmp-code" in orig_acl_term:
msgs.append(
f"acl {aclname} term {terms} icmp-code can only be specified for protocol icmp or ipv6-icmp"
)
result = False
if "icmp-type" in orig_acl_term:
msgs.append(
f"acl {aclname} term {terms} icmp-type can only be specified for protocol icmp or ipv6-icmp"
)
result = False
else:
icmp_code_low, icmp_code_high = get_icmp_low_high(acl_term["icmp-code"])
icmp_type_low, icmp_type_high = get_icmp_low_high(acl_term["icmp-type"])
if icmp_code_low > icmp_code_high:
msgs.append(
f"acl {aclname} term {terms} icmp-code low value is greater than high value"
)
result = False
if icmp_type_low > icmp_type_high:
msgs.append(
f"acl {aclname} term {terms} icmp-type low value is greater than high value"
)
result = False
return result, msgs

126
vppcfg/config/prefixlist.py Normal file
View File

@ -0,0 +1,126 @@
#
# Copyright (c) 2023 Pim van Pelt
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
""" A vppcfg configuration module that validates prefixlists """
import logging
import ipaddress
def get_prefixlists(yaml):
"""Return a list of all prefixlists."""
ret = []
if "prefixlists" in yaml:
for plname, _pl in yaml["prefixlists"].items():
ret.append(plname)
return ret
def get_by_name(yaml, plname):
"""Return the prefixlist by name, if it exists. Return None otherwise."""
try:
if plname in yaml["prefixlists"]:
return plname, yaml["prefixlists"][plname]
except KeyError:
pass
return None, None
def get_network_list(yaml, plname, want_ipv4=True, want_ipv6=True):
"""Returns a list of 0 or more ip_network elements, that represent the members
in a prefixlist of given name. Return the empty list if the prefixlist doesn't
exist. Optionally, want_ipv4 or want_ipv6 can be set to False to filter the list."""
ret = []
plname, plist = get_by_name(yaml, plname)
if not plist:
return ret
for member in plist["members"]:
ipn = ipaddress.ip_network(member, strict=False)
if ipn.version == 4 and want_ipv4:
ret.append(ipn)
if ipn.version == 6 and want_ipv6:
ret.append(ipn)
return ret
def count(yaml, plname):
"""Return the number of IPv4 and IPv6 entries in the prefixlist.
Returns 0, 0 if it doesn't exist"""
ipv4, ipv6 = 0, 0
plname, plist = get_by_name(yaml, plname)
if not plist:
return 0, 0
for member in plist["members"]:
ipn = ipaddress.ip_network(member, strict=False)
if ipn.version == 4:
ipv4 += 1
elif ipn.version == 6:
ipv6 += 1
return ipv4, ipv6
def count_ipv4(yaml, plname):
"""Return the number of IPv4 entries in the prefixlist."""
ipv4, _ = count(yaml, plname)
return ipv4
def count_ipv6(yaml, plname):
"""Return the number of IPv6 entries in the prefixlist."""
_, ipv6 = count(yaml, plname)
return ipv6
def has_ipv4(yaml, plname):
"""Return True if the prefixlist has at least one IPv4 entry."""
ipv4, _ = count(yaml, plname)
return ipv4 > 0
def has_ipv6(yaml, plname):
"""Return True if the prefixlist has at least one IPv6 entry."""
_, ipv6 = count(yaml, plname)
return ipv6 > 0
def is_empty(yaml, plname):
"""Return True if the prefixlist has no entries."""
ipv4, ipv6 = count(yaml, plname)
return ipv4 + ipv6 == 0
def validate_prefixlists(yaml):
"""Validate the semantics of all YAML 'prefixlists' entries"""
result = True
msgs = []
logger = logging.getLogger("vppcfg.config")
logger.addHandler(logging.NullHandler())
if not "prefixlists" in yaml:
return result, msgs
for plname, plist in yaml["prefixlists"].items():
logger.debug(f"prefixlist {plname}: {plist}")
if plname in ["any"]:
## Note: ACL 'source' and 'destination', when they are empty, will resolve
## to 'any', and can thus never refer to a prefixlist called 'any'.
msgs.append(f"prefixlist {plname} is a reserved name")
result = False
members = 0
for pl_member in plist["members"]:
members += 1
logger.debug(f"prefixlist {plname} member {members} is {pl_member}")
return result, msgs

171
vppcfg/config/test_acl.py Normal file
View File

@ -0,0 +1,171 @@
#
# Copyright (c) 2022 Pim van Pelt
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# -*- coding: utf-8 -*-
""" Unit tests for taps """
import unittest
import yaml
from . import acl
from .unittestyaml import UnitTestYaml
class TestACLMethods(unittest.TestCase):
def setUp(self):
with UnitTestYaml("test_acl.yaml") as f:
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
def test_get_acls(self):
acllist = acl.get_acls(self.cfg)
self.assertIsInstance(acllist, list)
self.assertEqual(2, len(acllist))
def test_get_by_name(self):
aclname, _acl = acl.get_by_name(self.cfg, "deny-all")
self.assertIsNotNone(_acl)
self.assertEqual("deny-all", aclname)
aclname, _acl = acl.get_by_name(self.cfg, "acl-noexist")
self.assertIsNone(aclname)
self.assertIsNone(_acl)
def test_get_port_low_high(self):
lo, hi = acl.get_port_low_high(80)
self.assertEqual(80, lo)
self.assertEqual(80, hi)
lo, hi = acl.get_port_low_high("80")
self.assertEqual(80, lo)
self.assertEqual(80, hi)
lo, hi = acl.get_port_low_high("www")
self.assertEqual(80, lo)
self.assertEqual(80, hi)
lo, hi = acl.get_port_low_high("any")
self.assertEqual(0, lo)
self.assertEqual(65535, hi)
lo, hi = acl.get_port_low_high("-1024")
self.assertEqual(0, lo)
self.assertEqual(1024, hi)
lo, hi = acl.get_port_low_high("1024-")
self.assertEqual(1024, lo)
self.assertEqual(65535, hi)
lo, hi = acl.get_port_low_high("1000-2000")
self.assertEqual(1000, lo)
self.assertEqual(2000, hi)
lo, hi = acl.get_port_low_high("0-65535")
self.assertEqual(0, lo)
self.assertEqual(65535, hi)
lo, hi = acl.get_port_low_high("bla")
self.assertIsNone(lo)
self.assertIsNone(hi)
lo, hi = acl.get_port_low_high("foo-bar")
self.assertIsNone(lo)
self.assertIsNone(hi)
def test_get_protocol(self):
proto = acl.get_protocol(1)
self.assertEqual(1, proto)
proto = acl.get_protocol("icmp")
self.assertEqual(1, proto)
proto = acl.get_protocol("unknown")
self.assertIsNone(proto)
def test_get_icmp_low_high(self):
lo, hi = acl.get_icmp_low_high(3)
self.assertEqual(3, lo)
self.assertEqual(3, hi)
lo, hi = acl.get_icmp_low_high("3")
self.assertEqual(3, lo)
self.assertEqual(3, hi)
lo, hi = acl.get_icmp_low_high("any")
self.assertEqual(0, lo)
self.assertEqual(255, hi)
lo, hi = acl.get_icmp_low_high("10-")
self.assertEqual(10, lo)
self.assertEqual(255, hi)
lo, hi = acl.get_icmp_low_high("-10")
self.assertEqual(0, lo)
self.assertEqual(10, hi)
lo, hi = acl.get_icmp_low_high("10-20")
self.assertEqual(10, lo)
self.assertEqual(20, hi)
def test_is_ip(self):
self.assertTrue(acl.is_ip("192.0.2.1"))
self.assertTrue(acl.is_ip("192.0.2.1/24"))
self.assertTrue(acl.is_ip("192.0.2.0/24"))
self.assertTrue(acl.is_ip("2001:db8::1"))
self.assertTrue(acl.is_ip("2001:db8::1/64"))
self.assertTrue(acl.is_ip("2001:db8::/64"))
self.assertFalse(acl.is_ip(True))
self.assertFalse(acl.is_ip("String"))
self.assertFalse(acl.is_ip([]))
self.assertFalse(acl.is_ip({}))
def test_get_network_list(self):
for s in ["192.0.2.1", "192.0.2.1/24", "2001:db8::1", "2001:db8::1/64"]:
l = acl.get_network_list(self.cfg, s)
self.assertIsInstance(l, list)
self.assertEquals(1, len(l))
n = l[0]
l = acl.get_network_list(self.cfg, "trusted")
self.assertIsInstance(l, list)
self.assertEquals(5, len(l))
l = acl.get_network_list(self.cfg, "trusted", want_ipv6=False)
self.assertIsInstance(l, list)
self.assertEquals(2, len(l))
l = acl.get_network_list(self.cfg, "trusted", want_ipv4=False)
self.assertIsInstance(l, list)
self.assertEquals(3, len(l))
l = acl.get_network_list(self.cfg, "trusted", want_ipv4=False, want_ipv6=False)
self.assertIsInstance(l, list)
self.assertEquals(0, len(l))
l = acl.get_network_list(self.cfg, "pl-notexist")
self.assertIsInstance(l, list)
self.assertEquals(0, len(l))
def test_network_list_has_family(self):
l = acl.get_network_list(self.cfg, "trusted")
self.assertTrue(acl.network_list_has_family(l, 4))
self.assertTrue(acl.network_list_has_family(l, 6))
l = acl.get_network_list(self.cfg, "trusted", want_ipv4=False)
self.assertFalse(acl.network_list_has_family(l, 4))
self.assertTrue(acl.network_list_has_family(l, 6))
l = acl.get_network_list(self.cfg, "trusted", want_ipv6=False)
self.assertTrue(acl.network_list_has_family(l, 4))
self.assertFalse(acl.network_list_has_family(l, 6))
l = acl.get_network_list(self.cfg, "trusted", want_ipv4=False, want_ipv6=False)
self.assertFalse(acl.network_list_has_family(l, 4))
self.assertFalse(acl.network_list_has_family(l, 6))

View File

@ -0,0 +1,100 @@
#
# Copyright (c) 2022 Pim van Pelt
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# -*- coding: utf-8 -*-
""" Unit tests for taps """
import unittest
import yaml
from . import prefixlist
from .unittestyaml import UnitTestYaml
class TestACLMethods(unittest.TestCase):
def setUp(self):
with UnitTestYaml("test_prefixlist.yaml") as f:
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
def test_get_prefixlists(self):
plist = prefixlist.get_prefixlists(self.cfg)
self.assertIsInstance(plist, list)
self.assertEqual(5, len(plist))
def test_get_by_name(self):
plname, _pl = prefixlist.get_by_name(self.cfg, "trusted")
self.assertIsNotNone(_pl)
self.assertEqual("trusted", plname)
plname, _pl = prefixlist.get_by_name(self.cfg, "pl-noexist")
self.assertIsNone(plname)
self.assertIsNone(_pl)
def test_count(self):
v4, v6 = prefixlist.count(self.cfg, "trusted")
self.assertEqual(2, v4)
self.assertEqual(3, v6)
v4, v6 = prefixlist.count(self.cfg, "empty")
self.assertEqual(0, v4)
self.assertEqual(0, v6)
v4, v6 = prefixlist.count(self.cfg, "pl-noexist")
self.assertEqual(0, v4)
self.assertEqual(0, v6)
def test_count_ipv4(self):
self.assertEqual(2, prefixlist.count_ipv4(self.cfg, "trusted"))
self.assertEqual(0, prefixlist.count_ipv4(self.cfg, "empty"))
self.assertEqual(0, prefixlist.count_ipv4(self.cfg, "pl-noexist"))
def test_count_ipv6(self):
self.assertEqual(3, prefixlist.count_ipv6(self.cfg, "trusted"))
self.assertEqual(0, prefixlist.count_ipv6(self.cfg, "empty"))
self.assertEqual(0, prefixlist.count_ipv6(self.cfg, "pl-noexist"))
def test_has_ipv4(self):
self.assertTrue(prefixlist.has_ipv4(self.cfg, "trusted"))
self.assertFalse(prefixlist.has_ipv4(self.cfg, "empty"))
self.assertFalse(prefixlist.has_ipv4(self.cfg, "pl-noexist"))
def test_has_ipv6(self):
self.assertTrue(prefixlist.has_ipv6(self.cfg, "trusted"))
self.assertFalse(prefixlist.has_ipv6(self.cfg, "empty"))
self.assertFalse(prefixlist.has_ipv6(self.cfg, "pl-noexist"))
def test_is_empty(self):
self.assertFalse(prefixlist.is_empty(self.cfg, "trusted"))
self.assertTrue(prefixlist.is_empty(self.cfg, "empty"))
self.assertTrue(prefixlist.is_empty(self.cfg, "pl-noexist"))
def test_get_network_list(self):
l = prefixlist.get_network_list(self.cfg, "trusted")
self.assertIsInstance(l, list)
self.assertEquals(5, len(l))
l = prefixlist.get_network_list(self.cfg, "trusted", want_ipv6=False)
self.assertIsInstance(l, list)
self.assertEquals(2, len(l))
l = prefixlist.get_network_list(self.cfg, "trusted", want_ipv4=False)
self.assertIsInstance(l, list)
self.assertEquals(3, len(l))
l = prefixlist.get_network_list(
self.cfg, "trusted", want_ipv4=False, want_ipv6=False
)
self.assertIsInstance(l, list)
self.assertEquals(0, len(l))
l = prefixlist.get_network_list(self.cfg, "pl-notexist")
self.assertIsInstance(l, list)
self.assertEquals(0, len(l))

View File

@ -123,3 +123,43 @@ taps:
name: vpp-tap101 name: vpp-tap101
mtu: 1500 mtu: 1500
bridge: br1 bridge: br1
prefixlists:
trusted:
description: "All IPv4 and IPv6 trusted networks"
members:
- 192.0.2.1
- 192.0.2.0/24
- 2001:db8::1
- 2001:db8::/64
- 2001:db8::/48
empty:
description: "An empty prefixlist"
members: []
acls:
acl01:
description: "Test ACL"
terms:
- description: "Allow a prefixlist"
action: permit
source: trusted
- description: "Allow a prefixlist for one family only"
family: ipv4
action: permit
source: trusted
- description: "Allow a specific IPv6 TCP flow"
action: permit
source: 2001:db8::/64
destination: 2001:db8:1::/64
protocol: tcp
destination-port: www
source-port: "1024-65535"
- description: "Allow IPv4 ICMP Destination Unreachable, any code"
family: ipv4
action: permit
protocol: icmp
icmp-type: 3
icmp-code: any
- description: "Deny any IPv4 or IPv6"
action: deny

View File

@ -4,6 +4,8 @@ loopbacks: map(include('loopback'),key=str(matches='loop[0-9]+'),required=False)
bridgedomains: map(include('bridgedomain'),key=str(matches='bd[0-9]+'),required=False) bridgedomains: map(include('bridgedomain'),key=str(matches='bd[0-9]+'),required=False)
vxlan_tunnels: map(include('vxlan'),key=str(matches='vxlan_tunnel[0-9]+'),required=False) vxlan_tunnels: map(include('vxlan'),key=str(matches='vxlan_tunnel[0-9]+'),required=False)
taps: map(include('tap'),key=str(matches='tap[0-9]+'),required=False) taps: map(include('tap'),key=str(matches='tap[0-9]+'),required=False)
prefixlists: map(include('prefixlist'),key=str(matches='[a-z][a-z0-9\-]+',min=1,max=64),required=False)
acls: map(include('acl'),key=str(matches='[a-z][a-z0-9\-]+',min=1,max=56),required=False)
--- ---
vxlan: vxlan:
description: str(exclude='\'"',len=64,required=False) description: str(exclude='\'"',len=64,required=False)
@ -82,3 +84,29 @@ tap:
namespace-create: bool(required=False) namespace-create: bool(required=False)
rx-ring-size: int(min=8,max=32768,required=False) rx-ring-size: int(min=8,max=32768,required=False)
tx-ring-size: int(min=8,max=32768,required=False) tx-ring-size: int(min=8,max=32768,required=False)
---
prefixlist:
description: str(exclude='\'"',len=64,required=False)
members: list(any(ip_interface(),ip()))
---
# Valid: 80 "www" "-1024" "1024-" "1024-65535", and "any"
acl-term-port-int-range-symbolic: any(int(min=1,max=65535),str(equals="any"),regex('^([1-9][0-9]*-|-[1-9][0-9]*|[1-9][0-9]*-[1-9][0-9]*)$'),regex('^[a-z][a-z0-9-]*$'))
---
# Valid: 80 "-245" "10-" "10-245", and "any"
acl-term-icmp-int-range: any(int(min=0,max=255),str(equals="any"),regex('^([0-9]+-|-[1-9][0-9]*|[0-9]*-[1-9][0-9]*)$'))
---
acl-term:
description: str(exclude='\'"',len=64,required=False)
action: enum('permit','deny','permit+reflect')
family: enum('ipv4','ipv6','any',required=False)
source: any(ip(),ip_interface(),str(min=1,max=56),required=False)
destination: any(ip(),ip_interface(),str(min=1,max=56),required=False)
protocol: any(int(min=1,max=255),regex('^[a-z][a-z0-9-]*$'),required=False)
source-port: include('acl-term-port-int-range-symbolic',required=False)
destination-port: include('acl-term-port-int-range-symbolic',required=False)
icmp-type: include('acl-term-icmp-int-range',required=False)
icmp-code: include('acl-term-icmp-int-range',required=False)
---
acl:
description: str(exclude='\'"',len=64,required=False)
terms: list(include('acl-term'), min=1, max=100, required=True)

View File

@ -0,0 +1,35 @@
prefixlists:
trusted:
members:
- 192.0.2.1
- 192.0.2.0/24
- 2001:db8::1
- 2001:db8::/64
- 2001:db8::/48
acls:
acl01:
description: "Test ACL #1"
terms:
- description: "Allow a Prefixlist"
action: permit
source: trusted
- description: "Allow a specific IPv6 TCP flow"
action: permit
source: 2001:db8::/64
destination: 2001:db8:1::/64
protocol: tcp
destination-port: www
source-port: "1024-65535"
- description: "Allow IPv4 ICMP Destination Unreachable, any code"
family: ipv4
action: permit
protocol: icmp
icmp-type: 3
icmp-code: any
- description: "Deny any IPv4 or IPv6"
action: deny
deny-all:
description: "Test ACL #2"
terms:
- action: deny

View File

@ -0,0 +1,27 @@
prefixlists:
trusted:
description: "All IPv4 and IPv6 trusted networks"
members:
- 192.0.2.1
- 192.0.2.0/24
- 2001:db8::1
- 2001:db8::/64
- 2001:db8::/48
deny-all:
description: "Default for IPv4 and IPv6"
members:
- 0.0.0.0/0
- ::/0
ipv4-only:
description: "Only contains IPv4"
members:
- 192.0.2.1
- 192.0.2.0/24
ipv6-only:
description: "Only contains IPv6"
members:
- 2001:db8::1
- 2001:db8::/64
empty:
description: "An empty list"
members: []

View File

@ -0,0 +1,50 @@
test:
description: "A bunch of ACLs that are wellformed"
errors:
count: 0
---
prefixlists:
trusted:
description: "Trusted IPv4 nd IPv6 hosts"
members:
- 192.0.2.1
- 192.0.2.0/24
- 2001:db8::1
- 2001:db8::/64
- 2001:db8::/48
acls:
acl01:
description: "Test ACL"
terms:
- description: "Allow a prefixlist"
action: permit
source: trusted
- description: "Allow a specific IPv6 TCP flow"
action: permit
source: 2001:db8::/64
destination: 2001:db8:1::/64
protocol: tcp
destination-port: www
source-port: "1024-65535"
- description: "Allow IPv4 ICMP Destination Unreachable, any code"
family: ipv4
action: permit
protocol: icmp
icmp-type: 3
icmp-code: any
- description: "Using an IPv4 address is OK"
action: permit
source: 192.168.0.1
- description: "Using an IPv6 address is OK"
action: permit
destination: 2001:db8::1
- description: "Protocol using number"
action: permit
protocol: 1
- description: "Protocol using symbolic name"
action: permit
protocol: icmp
- description: "Deny any IPv4 or IPv6"
protocol: any
action: deny

View File

@ -0,0 +1,74 @@
test:
description: "Source and Destination must have the same address family"
errors:
expected:
- "acl .* term .* source and destination family do not overlap"
- "acl .* term .* family any has no (source|destination)"
count: 8
---
prefixlists:
v4only:
members:
- 192.0.2.1
- 192.0.2.0/24
v6only:
members:
- 2001:db8::1
- 2001:db8::/64
- 2001:db8::/48
empty:
members: []
acls:
acl01:
terms:
- description: "Error, source is IPv4 and destination is IPv6"
source: 0.0.0.0/0
destination: ::/0
action: permit
- description: "Error, source prefixlist is IPv4 and destination prefixlist is IPv6"
source: v4only
destination: v6only
action: permit
- description: "Error, source prefixlist is IPv6 and destination is IPv4"
source: v6only
destination: 0.0.0.0/0
action: permit
- description: "Error, source is IPv6 and destination is IPv4"
source: ::/0
destination: 192.168.0.1
action: permit
- description: "Error, source is IPv4 and destination is IPv6"
source: 0.0.0.0/0
destination: 2001:db8::1
action: permit
- description: "Error, source is IPv6 and destination is IPv4"
source: ::/0
destination: 192.168.0.0/16
action: permit
- description: "Error, can never match an empty prefixlist"
source: empty
destination: 192.0.2.1
action: permit
- description: "Error, can never match an empty prefixlist"
source: 2001:db8::1
destination: empty
action: permit
- description: "OK"
source: ::/0
destination: 2001:db8::1
action: permit
- description: "OK"
source: 192.168.0.1
destination: 10.0.0.0/8
action: permit
- description: "OK"
source: v4only
action: permit
- description: "OK"
source: v6only
action: permit
- description: "OK"
source: v4only
destination: v4only
action: permit

View File

@ -0,0 +1,39 @@
test:
description: "Ways in which port ranges can fail"
errors:
expected:
- "acl .* term .* could not understand (source|destination)-port"
- "acl .* term .* (source|destination)-port low value is greater than high value"
- "acl .* term .* (source|destination)-port (low|high) value is not between \\[0,65535\\]"
- "acl .* term .* (source|destination)-port can only be specified for protocol tcp or udp"
count: 7
---
acls:
acl01:
terms:
- description: "Port is not known in /etc/services"
action: permit
protocol: tcp
source-port: "unknown"
- description: "Port is not known in /etc/services"
action: permit
destination-port: "unknown-range"
protocol: tcp
- description: "Low port is higher than High port"
action: permit
source-port: "20-10"
protocol: udp
- description: "High port is > 65535"
action: permit
source-port: "10-65536"
protocol: udp
- description: "High port is > 65535"
action: permit
protocol: tcp
destination-port: "10-65536"
- description: "ports are not allowed if protocol is not TCP or UDP"
action: permit
source-port: 80
- description: "ports are not allowed if protocol is not TCP or UDP"
action: permit
destination-port: 80-1024

View File

@ -0,0 +1,13 @@
test:
description: "Ways in which ACE protocol can fail"
errors:
expected:
- "acl .* term .* could not understand protocol"
count: 1
---
acls:
acl01:
terms:
- description: "Protocol is not known in /etc/protocols"
action: permit
protocol: "unknown"

View File

@ -0,0 +1,40 @@
test:
description: "Ways in which ICMP code and type can fail"
errors:
expected:
- "acl .* term .* icmp-(type|code) can only be specified for protocol icmp or ipv6-icmp"
- "acl .* term .* icmp-(type|code) low value is greater than high value"
count: 8
---
acls:
acl01:
terms:
- description: "code and type are not allowed if protocol is not icmp or icmp-ipv6"
action: permit
icmp-code: 1
icmp-type: 1
- description: "code and type are not allowed if protocol is not icmp or icmp-ipv6"
action: permit
protocol: udp
icmp-code: 1
icmp-type: 1
- description: "code and type are not allowed if protocol is not icmp or icmp-ipv6"
action: permit
protocol: tcp
icmp-code: 1
icmp-type: 1
- description: "Ranges invalid"
action: permit
protocol: icmp
icmp-code: 5-4
icmp-type: 20-10
- description: "OK"
action: permit
protocol: icmp
icmp-code: 1
icmp-type: 1
- description: "OK"
action: permit
protocol: ipv6-icmp
icmp-code: 1
icmp-type: 1

View File

@ -0,0 +1,18 @@
test:
description: "Some prefixlist names are reserved"
errors:
expected:
- "prefixlist any is a reserved name"
count: 1
---
prefixlists:
any:
description: "any is a reserved name"
members:
- 192.0.2.1
- 192.0.2.0/24
v6only:
members:
- 2001:db8::1
- 2001:db8::/64
- 2001:db8::/48

View File

@ -65,6 +65,8 @@ class Dumper(VPPApi):
"bridgedomains": {}, "bridgedomains": {},
"vxlan_tunnels": {}, "vxlan_tunnels": {},
"taps": {}, "taps": {},
"prefixlists": {},
"acls": {},
} }
for idx, bond_iface in self.cache["bondethernets"].items(): for idx, bond_iface in self.cache["bondethernets"].items():
bond = {"description": ""} bond = {"description": ""}
@ -248,5 +250,98 @@ class Dumper(VPPApi):
bridge["interfaces"] = members bridge["interfaces"] = members
bridge["mtu"] = mtu bridge["mtu"] = mtu
config["bridgedomains"][bridge_name] = bridge config["bridgedomains"][bridge_name] = bridge
for idx, acl in self.cache["acls"].items():
aclname = f"vppacl{acl.acl_index}"
descr = acl.tag.replace('"', "").replace("'", "")
if descr != acl.tag:
self.logger.warning(
f"acl tag {acl.tag} has invalid characters, stripping"
)
descr = "tag " + descr
config_acl = {"description": descr, "terms": []}
terms = 0
for acl_rule in acl.r:
terms += 1
action = "deny"
if acl_rule.is_permit == 1:
action = "permit"
elif acl_rule.is_permit == 2:
action = "permit+reflect"
config_term = {
"action": action,
"source": str(acl_rule.src_prefix),
"destination": str(acl_rule.dst_prefix),
}
if acl_rule.proto == 0:
pass
elif acl_rule.proto in [1, 58]:
if acl_rule.proto == 1:
config_term["protocol"] = "icmp"
else:
config_term["protocol"] = "ipv6-icmp"
maxval = acl_rule.srcport_or_icmptype_last
if maxval > 255:
self.logger.warning(
f"icmp type > 255 on acl {acl.acl_index} term {terms}"
)
maxval = 255
if acl_rule.srcport_or_icmptype_first == maxval:
config_term["icmp-type"] = int(
acl_rule.srcport_or_icmptype_first
)
else:
config_term[
"icmp-type"
] = f"{acl_rule.srcport_or_icmptype_first}-{maxval}"
maxval = acl_rule.dstport_or_icmpcode_last
if maxval > 255:
self.logger.warning(
f"icmp code > 255 on acl {acl.acl_index} term {terms}"
)
maxval = 255
if acl_rule.dstport_or_icmpcode_first == maxval:
config_term["icmp-code"] = int(
acl_rule.dstport_or_icmpcode_first
)
else:
config_term[
"icmp-code"
] = f"{acl_rule.dstport_or_icmpcode_first}-{maxval}"
elif acl_rule.proto in [6, 17]:
if acl_rule.proto == 6:
config_term["protocol"] = "tcp"
else:
config_term["protocol"] = "udp"
if (
acl_rule.srcport_or_icmptype_first
== acl_rule.srcport_or_icmptype_last
):
config_term["source-port"] = int(
acl_rule.srcport_or_icmptype_first
)
else:
config_term[
"source-port"
] = f"{acl_rule.srcport_or_icmptype_first}-{acl_rule.srcport_or_icmptype_last}"
if (
acl_rule.dstport_or_icmpcode_first
== acl_rule.dstport_or_icmpcode_last
):
config_term["destination-port"] = int(
acl_rule.dstport_or_icmpcode_first
)
else:
config_term[
"destination-port"
] = f"{acl_rule.dstport_or_icmpcode_first}-{acl_rule.dstport_or_icmpcode_last}"
else:
config_term["protocol"] = int(acl_rule.proto)
config_acl["terms"].append(config_term)
config["acls"][aclname] = config_acl
return config return config

View File

@ -120,12 +120,15 @@ class VPPApi:
"interfaces": {}, "interfaces": {},
"interface_addresses": {}, "interface_addresses": {},
"interface_mpls": {}, "interface_mpls": {},
"interface_acls": {},
"bondethernets": {}, "bondethernets": {},
"bondethernet_members": {}, "bondethernet_members": {},
"bridgedomains": {}, "bridgedomains": {},
"vxlan_tunnels": {}, "vxlan_tunnels": {},
"l2xcs": {}, "l2xcs": {},
"taps": {}, "taps": {},
"acls": {},
"acl_tags": {},
} }
return True return True
@ -197,6 +200,7 @@ class VPPApi:
if len(self.cache["interface_addresses"][iface.sw_if_index]) > 0: if len(self.cache["interface_addresses"][iface.sw_if_index]) > 0:
self.logger.warning(f"Not all addresses were removed on {ifname}") self.logger.warning(f"Not all addresses were removed on {ifname}")
del self.cache["interface_addresses"][iface.sw_if_index] del self.cache["interface_addresses"][iface.sw_if_index]
del self.cache["interface_acls"][iface.sw_if_index]
del self.cache["interface_names"][ifname] del self.cache["interface_names"][ifname]
## Use my_dict.pop('key', None), as it allows 'key' to be absent ## Use my_dict.pop('key', None), as it allows 'key' to be absent
@ -247,6 +251,14 @@ class VPPApi:
interface_dev_type="local", interface_dev_type="local",
tag="mock", tag="mock",
) )
self.cache["interface_acls"][idx] = self.vpp_messages[
"acl_interface_list_details"
].tuple(
sw_if_index=idx,
count=0,
n_input=0,
acls=[],
)
## Add mock PHYs ## Add mock PHYs
for ifname, iface in yaml_config["interfaces"].items(): for ifname, iface in yaml_config["interfaces"].items():
if not "device-type" in iface or iface["device-type"] not in ["dpdk"]: if not "device-type" in iface or iface["device-type"] not in ["dpdk"]:
@ -278,6 +290,14 @@ class VPPApi:
interface_dev_type=iface["device-type"], interface_dev_type=iface["device-type"],
tag="mock", tag="mock",
) )
self.cache["interface_acls"][idx] = self.vpp_messages[
"acl_interface_list_details"
].tuple(
sw_if_index=idx,
count=0,
n_input=0,
acls=[],
)
## Create interface_names and interface_address indexes ## Create interface_names and interface_address indexes
for idx, iface in self.cache["interfaces"].items(): for idx, iface in self.cache["interfaces"].items():
@ -341,6 +361,19 @@ class VPPApi:
f"MPLS state retrieval requires https://gerrit.fd.io/r/c/vpp/+/39022" f"MPLS state retrieval requires https://gerrit.fd.io/r/c/vpp/+/39022"
) )
try:
self.logger.debug("Retrieving ACLs")
api_response = self.vpp.api.acl_dump(acl_index=0xFFFFFFFF)
for acl in api_response:
self.cache["acls"][acl.acl_index] = acl
self.logger.debug("Retrieving interface ACLs")
api_response = self.vpp.api.acl_interface_list_dump()
for iface in api_response:
self.cache["interface_acls"][iface.sw_if_index] = iface
except AttributeError:
self.logger.warning(f"ACL API not found - missing plugin: {err}")
self.logger.debug("Retrieving bondethernets") self.logger.debug("Retrieving bondethernets")
api_response = self.vpp.api.sw_bond_interface_dump() api_response = self.vpp.api.sw_bond_interface_dump()
for iface in api_response: for iface in api_response: