Files
vppcfg/validator/__init__.py
2022-03-15 20:57:11 +00:00

127 lines
3.8 KiB
Python

#!/usr/bin/env python
#
# Copyright (c) 2022 Pim van Pelt
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# -*- coding: utf-8 -*-
from __future__ import (
absolute_import,
division,
print_function,
)
import logging
try:
import yamale
except ImportError:
print("ERROR: install yamale manually: sudo pip install yamale")
sys.exit(-2)
from validator.loopback import validate_loopbacks
from validator.bondethernet import validate_bondethernets
from validator.interface import validate_interfaces
from validator.bridgedomain import validate_bridgedomains
from yamale.validators import DefaultValidators, Validator
import ipaddress
class IPInterfaceWithPrefixLength(Validator):
""" Custom IPAddress validator - takes IP/prefixlen as input:
192.0.2.1/29 or 2001:db8::1/64 are correct. The PrefixLength
is required, and must be a number (0-32 for IPv4 and 0-128 for
IPv6).
"""
tag = 'ip_interface'
def _is_valid(self, value):
try:
network = ipaddress.ip_interface(value)
except:
return False
if not isinstance(value, str):
return False
if not '/' in value:
return False
e = value.split('/')
if not len(e) == 2:
return False
if not e[1].isnumeric():
return False
return True
class NullHandler(logging.Handler):
def emit(self, record):
pass
class Validator(object):
def __init__(self, schema):
self.logger = logging.getLogger('vppcfg.validator')
self.logger.addHandler(NullHandler())
self.schema = schema
def validate(self, yaml):
ret_rv = True
ret_msgs = []
if not yaml:
return ret_rv, ret_msgs
if self.schema:
try:
self.logger.debug("Validating against schema %s" % self.schema)
validators = DefaultValidators.copy()
validators[IPInterfaceWithPrefixLength.tag] = IPInterfaceWithPrefixLength
schema = yamale.make_schema(self.schema, validators=validators)
data = yamale.make_data(content=str(yaml))
yamale.validate(schema, data)
self.logger.debug("Schema correctly validated by yamale")
except ValueError as e:
ret_rv = False
for result in e.results:
for error in result.errors:
ret_msgs.extend(['yamale: %s' % error])
return ret_rv, ret_msgs
else:
self.logger.warning("Schema validation disabled")
self.logger.debug("Validating Semantics...")
rv, msgs = validate_bondethernets(yaml)
if msgs:
ret_msgs.extend(msgs)
if not rv:
ret_rv = False
rv, msgs = validate_interfaces(yaml)
if msgs:
ret_msgs.extend(msgs)
if not rv:
ret_rv = False
rv, msgs = validate_loopbacks(yaml)
if msgs:
ret_msgs.extend(msgs)
if not rv:
ret_rv = False
rv, msgs = validate_bridgedomains(yaml)
if msgs:
ret_msgs.extend(msgs)
if not rv:
ret_rv = False
if ret_rv:
self.logger.debug("Semantics correctly validated")
return ret_rv, ret_msgs