Refactor VPPApi

VPPApiDumper() is its own file, preparing for VPPApiApplier() in an
upcoming commit. VPPApi() itself remains read-only. No need for an empty
__init__.py file.

Update vppcfg to use the correct vpp/dumper.py import
This commit is contained in:
Pim van Pelt
2022-04-10 14:47:37 +00:00
parent 9d60a01879
commit da7765569f
4 changed files with 146 additions and 164 deletions

View File

@ -1,24 +0,0 @@
#!/usr/bin/env python
#
# Copyright (c) 2022 Pim van Pelt
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# -*- coding: utf-8 -*-
from __future__ import (
absolute_import,
division,
print_function,
)
import logging
from vpp.vppapi import VPPApi
from vpp.reconciler import Reconciler

143
vpp/dumper.py Normal file
View File

@ -0,0 +1,143 @@
'''
The functions in this file interact with the VPP API to retrieve certain
interface metadata and write it to a YAML file.
'''
from vpp.vppapi import VPPApi
import sys
import yaml
import config.bondethernet as bondethernet
class VPPApiDumper(VPPApi):
def __init__(self, address='/run/vpp/api.sock', clientname='vppcfg'):
VPPApi.__init__(self, address, clientname)
def write(self, outfile):
if outfile and outfile == '-':
fh = sys.stdout
outfile = "(stdout)"
else:
fh = open(outfile, 'w')
config = self.cache_to_config()
print(yaml.dump(config), file=fh)
if fh is not sys.stdout:
fh.close()
self.logger.info("Wrote YAML config to %s" % (outfile))
def cache_to_config(self):
config = {"loopbacks": {}, "bondethernets": {}, "interfaces": {}, "bridgedomains": {}, "vxlan_tunnels": {} }
for idx, bond_iface in self.cache['bondethernets'].items():
bond = {"description": ""}
if bond_iface.sw_if_index in self.cache['bondethernet_members']:
members = [self.cache['interfaces'][x].interface_name for x in self.cache['bondethernet_members'][bond_iface.sw_if_index]]
if len(members) > 0:
bond['interfaces'] = members
mode = bondethernet.int_to_mode(bond_iface.mode)
bond['mode'] = mode
if mode in ['xor', 'lacp']:
bond['load-balance'] = bondethernet.int_to_lb(bond_iface.lb)
iface = self.cache['interfaces'][bond_iface.sw_if_index]
bond['mac'] = str(iface.l2_address)
config['bondethernets'][iface.interface_name] = bond
for numtags in [ 0, 1, 2 ]:
for idx, iface in self.cache['interfaces'].items():
if iface.sub_number_of_tags != numtags:
continue
if iface.interface_dev_type=='Loopback':
if iface.sub_id > 0:
self.logger.warning("Refusing to export sub-interfaces of loopback devices (%s)" % iface.interface_name)
continue
loop = {"description": ""}
loop['mtu'] = iface.mtu[0]
loop['mac'] = str(iface.l2_address)
if iface.sw_if_index in self.cache['lcps']:
loop['lcp'] = self.cache['lcps'][iface.sw_if_index].host_if_name
if iface.sw_if_index in self.cache['interface_addresses']:
if len(self.cache['interface_addresses'][iface.sw_if_index]) > 0:
loop['addresses'] = self.cache['interface_addresses'][iface.sw_if_index]
config['loopbacks'][iface.interface_name] = loop
elif iface.interface_dev_type in ['bond', 'VXLAN', 'dpdk']:
i = {"description": "" }
if iface.sw_if_index in self.cache['lcps']:
i['lcp'] = self.cache['lcps'][iface.sw_if_index].host_if_name
if iface.sw_if_index in self.cache['interface_addresses']:
if len(self.cache['interface_addresses'][iface.sw_if_index]) > 0:
i['addresses'] = self.cache['interface_addresses'][iface.sw_if_index]
if iface.sw_if_index in self.cache['l2xcs']:
l2xc = self.cache['l2xcs'][iface.sw_if_index]
i['l2xc'] = self.cache['interfaces'][l2xc.tx_sw_if_index].interface_name
if not self.cache['interfaces'][idx].flags & 1: # IF_STATUS_API_FLAG_ADMIN_UP
i['state'] = 'down'
if iface.interface_dev_type == 'dpdk':
i['mac'] = str(iface.l2_address)
i['mtu'] = iface.mtu[0]
if iface.sub_number_of_tags == 0:
config['interfaces'][iface.interface_name] = i
continue
encap = {}
if iface.sub_if_flags&8:
encap['dot1ad'] = iface.sub_outer_vlan_id
else:
encap['dot1q'] = iface.sub_outer_vlan_id
if iface.sub_inner_vlan_id > 0:
encap['inner-dot1q'] = iface.sub_inner_vlan_id
encap['exact-match'] = bool(iface.sub_if_flags&16)
i['encapsulation'] = encap
sup_iface = self.cache['interfaces'][iface.sup_sw_if_index]
if iface.mtu[0] > 0:
i['mtu'] = iface.mtu[0]
else:
i['mtu'] = sup_iface.mtu[0]
if not 'sub-interfaces' in config['interfaces'][sup_iface.interface_name]:
config['interfaces'][sup_iface.interface_name]['sub-interfaces'] = {}
config['interfaces'][sup_iface.interface_name]['sub-interfaces'][iface.sub_id] = i
for idx, iface in self.cache['vxlan_tunnels'].items():
vpp_iface = self.cache['interfaces'][iface.sw_if_index]
vxlan = { "description": "",
"vni": int(iface.vni),
"local": str(iface.src_address),
"remote": str(iface.dst_address) }
config['vxlan_tunnels'][vpp_iface.interface_name] = vxlan
for idx, iface in self.cache['bridgedomains'].items():
# self.logger.info("%d: %s" % (idx, iface))
bridge_name = "bd%d" % idx
mtu = 1500
bridge = {"description": ""}
settings = {}
settings['learn'] = iface.learn
settings['unicast-flood'] = iface.flood
settings['unknown-unicast-flood'] = iface.uu_flood
settings['unicast-forward'] = iface.forward
settings['arp-termination'] = iface.arp_term
settings['arp-unicast-forward'] = iface.arp_ufwd
settings['mac-age-minutes'] = int(iface.mac_age)
bridge['settings'] = settings
bvi = None
if iface.bvi_sw_if_index != 2**32-1:
bvi = self.cache['interfaces'][iface.bvi_sw_if_index]
mtu = bvi.mtu[0]
bridge['bvi'] = bvi.interface_name
members = []
for member in iface.sw_if_details:
if bvi and bvi.interface_name == self.cache['interfaces'][member.sw_if_index].interface_name == bvi.interface_name:
continue
members.append(self.cache['interfaces'][member.sw_if_index].interface_name)
mtu = self.cache['interfaces'][member.sw_if_index].mtu[0]
if len(members) > 0:
bridge['interfaces'] = members
bridge['mtu'] = mtu
config['bridgedomains'][bridge_name] = bridge
return config

View File

@ -1,16 +1,13 @@
''' '''
The functions in this file interact with the VPP API to retrieve certain The functions in this file interact with the VPP API to retrieve certain
interface metadata. interface metadata. Its base class will never change state. See the
derived classes VPPApiDumper() and VPPApiApplier()
''' '''
from vpp_papi import VPPApiClient from vpp_papi import VPPApiClient
import os import os
import sys
import fnmatch import fnmatch
import logging import logging
import socket
import yaml
import config.bondethernet as bondethernet
class VPPApi(): class VPPApi():
def __init__(self, address='/run/vpp/api.sock', clientname='vppcfg'): def __init__(self, address='/run/vpp/api.sock', clientname='vppcfg'):
@ -250,137 +247,3 @@ class VPPApi():
if lcp.phy_sw_if_index == sw_if_index: if lcp.phy_sw_if_index == sw_if_index:
return lcp return lcp
return None return None
class VPPApiDumper(VPPApi):
def __init__(self, address='/run/vpp/api.sock', clientname='vppcfg'):
VPPApi.__init__(self, address, clientname)
def write(self, outfile):
if outfile and outfile == '-':
fh = sys.stdout
outfile = "(stdout)"
else:
fh = open(outfile, 'w')
config = self.cache_to_config()
print(yaml.dump(config), file=fh)
if fh is not sys.stdout:
fh.close()
self.logger.info("Wrote YAML config to %s" % (outfile))
def cache_to_config(self):
config = {"loopbacks": {}, "bondethernets": {}, "interfaces": {}, "bridgedomains": {}, "vxlan_tunnels": {} }
for idx, bond_iface in self.cache['bondethernets'].items():
bond = {"description": ""}
if bond_iface.sw_if_index in self.cache['bondethernet_members']:
members = [self.cache['interfaces'][x].interface_name for x in self.cache['bondethernet_members'][bond_iface.sw_if_index]]
if len(members) > 0:
bond['interfaces'] = members
mode = bondethernet.int_to_mode(bond_iface.mode)
bond['mode'] = mode
if mode in ['xor', 'lacp']:
bond['load-balance'] = bondethernet.int_to_lb(bond_iface.lb)
iface = self.cache['interfaces'][bond_iface.sw_if_index]
bond['mac'] = str(iface.l2_address)
config['bondethernets'][iface.interface_name] = bond
for numtags in [ 0, 1, 2 ]:
for idx, iface in self.cache['interfaces'].items():
if iface.sub_number_of_tags != numtags:
continue
if iface.interface_dev_type=='Loopback':
if iface.sub_id > 0:
self.logger.warning("Refusing to export sub-interfaces of loopback devices (%s)" % iface.interface_name)
continue
loop = {"description": ""}
loop['mtu'] = iface.mtu[0]
loop['mac'] = str(iface.l2_address)
if iface.sw_if_index in self.cache['lcps']:
loop['lcp'] = self.cache['lcps'][iface.sw_if_index].host_if_name
if iface.sw_if_index in self.cache['interface_addresses']:
if len(self.cache['interface_addresses'][iface.sw_if_index]) > 0:
loop['addresses'] = self.cache['interface_addresses'][iface.sw_if_index]
config['loopbacks'][iface.interface_name] = loop
elif iface.interface_dev_type in ['bond', 'VXLAN', 'dpdk']:
i = {"description": "" }
if iface.sw_if_index in self.cache['lcps']:
i['lcp'] = self.cache['lcps'][iface.sw_if_index].host_if_name
if iface.sw_if_index in self.cache['interface_addresses']:
if len(self.cache['interface_addresses'][iface.sw_if_index]) > 0:
i['addresses'] = self.cache['interface_addresses'][iface.sw_if_index]
if iface.sw_if_index in self.cache['l2xcs']:
l2xc = self.cache['l2xcs'][iface.sw_if_index]
i['l2xc'] = self.cache['interfaces'][l2xc.tx_sw_if_index].interface_name
if not self.cache['interfaces'][idx].flags & 1: # IF_STATUS_API_FLAG_ADMIN_UP
i['state'] = 'down'
if iface.interface_dev_type == 'dpdk':
i['mac'] = str(iface.l2_address)
i['mtu'] = iface.mtu[0]
if iface.sub_number_of_tags == 0:
config['interfaces'][iface.interface_name] = i
continue
encap = {}
if iface.sub_if_flags&8:
encap['dot1ad'] = iface.sub_outer_vlan_id
else:
encap['dot1q'] = iface.sub_outer_vlan_id
if iface.sub_inner_vlan_id > 0:
encap['inner-dot1q'] = iface.sub_inner_vlan_id
encap['exact-match'] = bool(iface.sub_if_flags&16)
i['encapsulation'] = encap
sup_iface = self.cache['interfaces'][iface.sup_sw_if_index]
if iface.mtu[0] > 0:
i['mtu'] = iface.mtu[0]
else:
i['mtu'] = sup_iface.mtu[0]
if not 'sub-interfaces' in config['interfaces'][sup_iface.interface_name]:
config['interfaces'][sup_iface.interface_name]['sub-interfaces'] = {}
config['interfaces'][sup_iface.interface_name]['sub-interfaces'][iface.sub_id] = i
for idx, iface in self.cache['vxlan_tunnels'].items():
vpp_iface = self.cache['interfaces'][iface.sw_if_index]
vxlan = { "description": "",
"vni": int(iface.vni),
"local": str(iface.src_address),
"remote": str(iface.dst_address) }
config['vxlan_tunnels'][vpp_iface.interface_name] = vxlan
for idx, iface in self.cache['bridgedomains'].items():
# self.logger.info("%d: %s" % (idx, iface))
bridge_name = "bd%d" % idx
mtu = 1500
bridge = {"description": ""}
settings = {}
settings['learn'] = iface.learn
settings['unicast-flood'] = iface.flood
settings['unknown-unicast-flood'] = iface.uu_flood
settings['unicast-forward'] = iface.forward
settings['arp-termination'] = iface.arp_term
settings['arp-unicast-forward'] = iface.arp_ufwd
settings['mac-age-minutes'] = int(iface.mac_age)
bridge['settings'] = settings
bvi = None
if iface.bvi_sw_if_index != 2**32-1:
bvi = self.cache['interfaces'][iface.bvi_sw_if_index]
mtu = bvi.mtu[0]
bridge['bvi'] = bvi.interface_name
members = []
for member in iface.sw_if_details:
if bvi and bvi.interface_name == self.cache['interfaces'][member.sw_if_index].interface_name == bvi.interface_name:
continue
members.append(self.cache['interfaces'][member.sw_if_index].interface_name)
mtu = self.cache['interfaces'][member.sw_if_index].mtu[0]
if len(members) > 0:
bridge['interfaces'] = members
bridge['mtu'] = mtu
config['bridgedomains'][bridge_name] = bridge
return config

2
vppcfg
View File

@ -19,7 +19,7 @@ import yaml
import logging import logging
from config import Validator from config import Validator
from vpp.reconciler import Reconciler from vpp.reconciler import Reconciler
from vpp.vppapi import VPPApiDumper from vpp.dumper import VPPApiDumper
try: try:
import argparse import argparse