Files
vpp-maglev/internal/vpp/lbsync.go
Pim van Pelt d3c5c86037 VPP load-balancer dataplane integration: state, sync, and global conf
This commit wires maglevd through to VPP's LB plugin end-to-end, using
locally-generated GoVPP bindings for the newer v2 API messages.

VPP binapi (vendored)
- New package internal/vpp/binapi/ containing lb, lb_types, ip_types, and
  interface_types, generated from a local VPP build (~/src/vpp) via a new
  'make vpp-binapi' target. GoVPP v0.12.0 upstream lacks the v2 messages we
  need (lb_conf_get, lb_add_del_vip_v2, lb_add_del_as_v2, lb_as_v2_dump,
  lb_as_set_weight), so we commit the generated output in-tree.
- All generated files go through our loggedChannel wrapper; every VPP API
  send/receive is recorded at DEBUG via slog (vpp-api-send / vpp-api-recv /
  vpp-api-send-multi / vpp-api-recv-multi) so the full wire-level trail is
  auditable. NewAPIChannel is unexported — callers must use c.apiChannel().

Read path: GetLBState{All,VIP}
- GetLBStateAll returns a full snapshot (global conf + every VIP with its
  attached application servers).
- GetLBStateVIP looks up a single VIP by (prefix, protocol, port) and
  returns (nil, nil) when the VIP doesn't exist in VPP. This is the
  efficient path for targeted updates on a busy LB.
- Helpers factored out: getLBConf, dumpAllVIPs, dumpASesForVIP, lookupVIP,
  vipFromDetails.

Write path: SyncLBState{All,VIP}
- SyncLBStateAll reconciles every configured frontend with VPP: creates
  missing VIPs, removes stale ones (with AS flush), and reconciles AS
  membership and weights within VIPs that exist on both sides.
- SyncLBStateVIP targets a single frontend by name. Never removes VIPs.
  Returns ErrFrontendNotFound (wrapped with the name) when the frontend
  isn't in config, so callers can use errors.Is.
- Shared reconcileVIP helper does the per-VIP AS diff; removeVIP is used
  only by the full-sync pass.
- LbAddDelVipV2 requests always set NewFlowsTableLength=1024. The .api
  default=1024 annotation is only applied by VAT/CLI parsers, not wire-
  level marshalling — sending 0 caused VPP to vec_validate with mask
  0xFFFFFFFF and OOM-panic.
- Pool semantics: backends in the primary (first) pool of a frontend get
  their configured weight; backends in secondary pools get weight 0. All
  backends are installed so higher layers can flip weights on failover
  without add/remove churn.
- Every individual change emits a DEBUG slog (vpp-lbsync-vip-add/del,
  vpp-lbsync-as-add/del, vpp-lbsync-as-weight). Start/done INFO logs
  carry a scope=all|vip label plus aggregate counts.

Global conf push: SetLBConf
- New SetLBConf(cfg) sends lb_conf with ipv4-src, ipv6-src, sticky-buckets,
  and flow-timeout. Called automatically on VPP (re)connect and after
  every config reload (via doReloadConfig). Results are cached on the
  Client so redundant pushes are silently skipped — only actual changes
  produce a vpp-lb-conf-set INFO log line.

Periodic drift reconciliation
- vpp.Client.lbSyncLoop runs in a goroutine tied to each VPP connection's
  lifetime. Its first tick is immediate (startup and post-reconnect
  sync quickly); subsequent ticks fire every vpp.lb.sync-interval from
  config (default 30s). Purpose: catch drift if something/someone
  modifies VPP state by hand. The loop uses a ConfigSource interface
  (satisfied by checker.Checker via its new Config() accessor) to avoid
  an import cycle with the checker package.

Config schema additions (maglev.vpp.lb)
- sync-interval: positive Go duration, default 30s.
- ipv4-src-address: REQUIRED. Used as the outer source for GRE4 encap
  to application servers. Missing this is a hard semantic error —
  maglevd --check exits 2 and the daemon refuses to start. VPP GRE
  needs a source address and every VIP we program uses GRE, so there
  is no meaningful config without it.
- ipv6-src-address: REQUIRED. Same treatment as ipv4-src-address.
- sticky-buckets-per-core: default 65536, must be a power of 2.
- flow-timeout: default 40s, must be a whole number of seconds in [1s, 120s].
- VPP validation runs at the end of convert() so structural errors in
  healthchecks/backends/frontends surface first — operators fix those,
  then get the VPP-specific requirements.

gRPC API
- New GetVPPLBState RPC returning VPPLBState: global conf + VIPs with
  ASes. Mirrors the read-path but strips fields irrelevant to our
  GRE-only deployment (srv_type, dscp, target_port).
- New SyncVPPLBState RPC with optional frontend_name. Unset → full sync
  (may remove stale VIPs). Set → single-VIP sync (never removes).
  Returns codes.NotFound for unknown frontends, codes.Unavailable when
  VPP integration is disabled or disconnected.

maglevc (CLI)
- New 'show vpp lbstate' command displaying the LB plugin state. VPP-only
  fields the dataplane irrelevant to GRE are suppressed. Per-AS lines use
  a key-value format ("address X  weight Y  flow-table-buckets Z")
  instead of a tabwriter column, which avoids the ANSI-color alignment
  issue we hit with mixed label/data rows.
- New 'sync vpp lbstate [<name>]' command. Without a name, triggers a
  full reconciliation; with a name, targets one frontend.
- Previous 'show vpp lb' renamed to 'show vpp lbstate' for consistency
  with the new sync command.

Test fixtures
- validConfig and all ad-hoc config_test.go fixtures that reach the end
  of convert() now include the two required vpp.lb src addresses.
- tests/01-maglevd/maglevd-lab/maglev.yaml gains a vpp.lb section so the
  robot integration tests can still load the config.
- cmd/maglevc/tree_test.go gains expected paths for the new commands.

Docs
- config-guide.md: new 'vpp' section in the basic structure, detailed
  vpp.lb field reference, noting ipv4/ipv6 src addresses as REQUIRED
  (hard error) with no defaults; example config updated.
- user-guide.md: documented 'show vpp info', 'show vpp lbstate',
  'sync vpp lbstate [<name>]', new --vpp-api-addr and --vpp-stats-addr
  flags, the vpp-lb-conf-set log line, and corrected the pause/resume
  description to reflect that pause cancels the probe goroutine.
- debian/maglev.yaml: example config gains a vpp.lb block with src
  addresses and commented optional overrides.
2026-04-12 10:58:44 +02:00

478 lines
13 KiB
Go

// Copyright (c) 2026, Pim van Pelt <pim@ipng.ch>
package vpp
import (
"errors"
"fmt"
"log/slog"
"net"
"git.ipng.ch/ipng/vpp-maglev/internal/config"
ip_types "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/ip_types"
lb "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/lb"
lb_types "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/lb_types"
)
// ErrFrontendNotFound is returned by SyncLBStateVIP when the caller asks for
// a frontend name that does not exist in the config.
var ErrFrontendNotFound = errors.New("frontend not found in config")
// vipKey uniquely identifies a VPP LB VIP by its prefix, protocol, and port.
type vipKey struct {
prefix string // canonical CIDR form
protocol uint8
port uint16
}
// desiredVIP is the sync's view of one VIP derived from the maglev config.
type desiredVIP struct {
Prefix *net.IPNet
Protocol uint8 // 6=TCP, 17=UDP, 255=any
Port uint16
ASes map[string]desiredAS // keyed by AS IP string
}
// desiredAS is one application server to be installed under a VIP.
type desiredAS struct {
Address net.IP
Weight uint8 // 0-100
}
// syncStats counts changes made to the dataplane during a sync run.
type syncStats struct {
vipAdd int
vipDel int
asAdd int
asDel int
asWeight int
}
// SyncLBStateAll reconciles the full VPP load-balancer state with the given
// config. For every frontend in cfg:
// - if the VIP does not exist in VPP, create it;
// - for every pool backend, add the application server if missing, or
// update its weight if different.
//
// VIPs and ASes present in VPP but absent from the config are removed.
// Returns an error if any VPP API call fails.
func (c *Client) SyncLBStateAll(cfg *config.Config) error {
if !c.IsConnected() {
return errNotConnected
}
cur, err := c.GetLBStateAll()
if err != nil {
return fmt.Errorf("read VPP LB state: %w", err)
}
desired := desiredFromConfig(cfg)
ch, err := c.apiChannel()
if err != nil {
return err
}
defer ch.Close()
slog.Info("vpp-lbsync-start",
"scope", "all",
"vips-desired", len(desired),
"vips-current", len(cur.VIPs))
// Index both sides by (prefix, protocol, port).
curByKey := make(map[vipKey]LBVIP, len(cur.VIPs))
for _, v := range cur.VIPs {
curByKey[makeVIPKey(v.Prefix, v.Protocol, v.Port)] = v
}
desByKey := make(map[vipKey]desiredVIP, len(desired))
for _, d := range desired {
desByKey[makeVIPKey(d.Prefix, d.Protocol, d.Port)] = d
}
var st syncStats
// ---- pass 1: remove VIPs that are in VPP but not in config ----
for k, v := range curByKey {
if _, keep := desByKey[k]; keep {
continue
}
if err := removeVIP(ch, v, &st); err != nil {
return err
}
}
// ---- pass 2: add/update VIPs that are in config ----
for k, d := range desByKey {
cur, existing := curByKey[k]
var curPtr *LBVIP
if existing {
curPtr = &cur
}
if err := reconcileVIP(ch, d, curPtr, &st); err != nil {
return err
}
}
slog.Info("vpp-lbsync-done",
"scope", "all",
"vip-added", st.vipAdd,
"vip-removed", st.vipDel,
"as-added", st.asAdd,
"as-removed", st.asDel,
"as-weight-updated", st.asWeight)
return nil
}
// SyncLBStateVIP reconciles a single VIP (identified by frontend name) with
// the given config. Unlike SyncLBStateAll, it never removes VIPs: if the
// frontend is missing from cfg, SyncLBStateVIP returns ErrFrontendNotFound.
// This is the right tool for targeted updates on a busy load-balancer with
// many VIPs — only one VIP is read from VPP and only its ASes are modified.
func (c *Client) SyncLBStateVIP(cfg *config.Config, feName string) error {
if !c.IsConnected() {
return errNotConnected
}
fe, ok := cfg.Frontends[feName]
if !ok {
return fmt.Errorf("%q: %w", feName, ErrFrontendNotFound)
}
d := desiredFromFrontend(cfg, fe)
cur, err := c.GetLBStateVIP(d.Prefix, d.Protocol, d.Port)
if err != nil {
return fmt.Errorf("read VPP VIP state: %w", err)
}
ch, err := c.apiChannel()
if err != nil {
return err
}
defer ch.Close()
slog.Info("vpp-lbsync-start",
"scope", "vip",
"frontend", feName,
"prefix", d.Prefix.String(),
"protocol", protocolName(d.Protocol),
"port", d.Port)
var st syncStats
if err := reconcileVIP(ch, d, cur, &st); err != nil {
return err
}
slog.Info("vpp-lbsync-done",
"scope", "vip",
"frontend", feName,
"vip-added", st.vipAdd,
"as-added", st.asAdd,
"as-removed", st.asDel,
"as-weight-updated", st.asWeight)
return nil
}
// reconcileVIP brings one VIP's state in VPP into alignment with the desired
// state. If cur is nil the VIP is added from scratch; otherwise ASes are
// added, removed, and reweighted individually. Stats are accumulated into st.
func reconcileVIP(ch *loggedChannel, d desiredVIP, cur *LBVIP, st *syncStats) error {
if cur == nil {
if err := addVIP(ch, d); err != nil {
return err
}
st.vipAdd++
for _, as := range d.ASes {
if err := addAS(ch, d.Prefix, d.Protocol, d.Port, as); err != nil {
return err
}
st.asAdd++
}
return nil
}
// VIP exists in both — reconcile ASes.
curASes := make(map[string]LBAS, len(cur.ASes))
for _, a := range cur.ASes {
curASes[a.Address.String()] = a
}
// Remove ASes that are in VPP but not desired.
for addr, a := range curASes {
if _, keep := d.ASes[addr]; keep {
continue
}
if err := delAS(ch, cur.Prefix, cur.Protocol, cur.Port, a.Address); err != nil {
return err
}
st.asDel++
}
// Add new ASes, update weights on existing ones.
for addr, a := range d.ASes {
c, hit := curASes[addr]
if !hit {
if err := addAS(ch, d.Prefix, d.Protocol, d.Port, a); err != nil {
return err
}
st.asAdd++
continue
}
if c.Weight != a.Weight {
if err := setASWeight(ch, d.Prefix, d.Protocol, d.Port, a); err != nil {
return err
}
st.asWeight++
}
}
return nil
}
// removeVIP flushes all ASes from a VIP and then deletes the VIP itself.
func removeVIP(ch *loggedChannel, v LBVIP, st *syncStats) error {
for _, as := range v.ASes {
if err := delAS(ch, v.Prefix, v.Protocol, v.Port, as.Address); err != nil {
return err
}
st.asDel++
}
if err := delVIP(ch, v.Prefix, v.Protocol, v.Port); err != nil {
return err
}
st.vipDel++
return nil
}
// desiredFromConfig flattens every frontend in cfg into a desired VIP set.
func desiredFromConfig(cfg *config.Config) []desiredVIP {
out := make([]desiredVIP, 0, len(cfg.Frontends))
for _, fe := range cfg.Frontends {
out = append(out, desiredFromFrontend(cfg, fe))
}
return out
}
// desiredFromFrontend builds the desired VIP for a single frontend.
//
// All backends across all pools of a frontend are merged into a single
// application-server list so VPP knows about every backend that could ever
// receive traffic. Weights are assigned as follows:
//
// - primary (first) pool: the backend's configured weight
// - any subsequent pool: weight 0 (backend is known but receives no traffic)
//
// This preserves the pool priority model: higher layers can later flip
// secondary-pool backends to non-zero weights on failover without needing to
// add/remove ASes in the dataplane. When the same backend appears in multiple
// pools, the first pool it appears in wins.
func desiredFromFrontend(cfg *config.Config, fe config.Frontend) desiredVIP {
bits := 32
if fe.Address.To4() == nil {
bits = 128
}
d := desiredVIP{
Prefix: &net.IPNet{IP: fe.Address, Mask: net.CIDRMask(bits, bits)},
Protocol: protocolFromConfig(fe.Protocol),
Port: fe.Port,
ASes: make(map[string]desiredAS),
}
for poolIdx, pool := range fe.Pools {
for bName, pb := range pool.Backends {
b, ok := cfg.Backends[bName]
if !ok || !b.Enabled || b.Address == nil {
continue
}
addr := b.Address.String()
if _, already := d.ASes[addr]; already {
continue
}
var w uint8
if poolIdx == 0 {
w = clampWeight(pb.Weight)
} // secondary pools: weight 0 (default)
d.ASes[addr] = desiredAS{Address: b.Address, Weight: w}
}
}
return d
}
// ---- API call helpers ------------------------------------------------------
// defaultFlowsTableLength is sent as NewFlowsTableLength in lb_add_del_vip_v2.
// The .api file declares default=1024 but that default is only applied by VAT/
// the CLI parser, not when a raw message is marshalled over the socket. If we
// send 0, the plugin's vec_validate explodes (OOM / panic). Must be a power of
// two — 1024 matches the default that would have been applied via CLI.
const defaultFlowsTableLength = 1024
func addVIP(ch *loggedChannel, d desiredVIP) error {
encap := encapForIP(d.Prefix.IP)
req := &lb.LbAddDelVipV2{
Pfx: ip_types.NewAddressWithPrefix(*d.Prefix),
Protocol: d.Protocol,
Port: d.Port,
Encap: encap,
Type: lb_types.LB_API_SRV_TYPE_CLUSTERIP,
NewFlowsTableLength: defaultFlowsTableLength,
IsDel: false,
}
reply := &lb.LbAddDelVipV2Reply{}
if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
return fmt.Errorf("lb_add_del_vip_v2 add %s: %w", d.Prefix, err)
}
if reply.Retval != 0 {
return fmt.Errorf("lb_add_del_vip_v2 add %s: retval=%d", d.Prefix, reply.Retval)
}
slog.Debug("vpp-lbsync-vip-add",
"prefix", d.Prefix.String(),
"protocol", protocolName(d.Protocol),
"port", d.Port,
"encap", encapName(encap))
return nil
}
func delVIP(ch *loggedChannel, prefix *net.IPNet, protocol uint8, port uint16) error {
req := &lb.LbAddDelVipV2{
Pfx: ip_types.NewAddressWithPrefix(*prefix),
Protocol: protocol,
Port: port,
IsDel: true,
}
reply := &lb.LbAddDelVipV2Reply{}
if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
return fmt.Errorf("lb_add_del_vip_v2 del %s: %w", prefix, err)
}
if reply.Retval != 0 {
return fmt.Errorf("lb_add_del_vip_v2 del %s: retval=%d", prefix, reply.Retval)
}
slog.Debug("vpp-lbsync-vip-del",
"prefix", prefix.String(),
"protocol", protocolName(protocol),
"port", port)
return nil
}
func addAS(ch *loggedChannel, prefix *net.IPNet, protocol uint8, port uint16, a desiredAS) error {
req := &lb.LbAddDelAsV2{
Pfx: ip_types.NewAddressWithPrefix(*prefix),
Protocol: protocol,
Port: port,
AsAddress: ip_types.NewAddress(a.Address),
Weight: a.Weight,
IsDel: false,
}
reply := &lb.LbAddDelAsV2Reply{}
if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
return fmt.Errorf("lb_add_del_as_v2 add %s@%s: %w", a.Address, prefix, err)
}
if reply.Retval != 0 {
return fmt.Errorf("lb_add_del_as_v2 add %s@%s: retval=%d", a.Address, prefix, reply.Retval)
}
slog.Debug("vpp-lbsync-as-add",
"vip", prefix.String(),
"protocol", protocolName(protocol),
"port", port,
"address", a.Address.String(),
"weight", a.Weight)
return nil
}
func delAS(ch *loggedChannel, prefix *net.IPNet, protocol uint8, port uint16, addr net.IP) error {
req := &lb.LbAddDelAsV2{
Pfx: ip_types.NewAddressWithPrefix(*prefix),
Protocol: protocol,
Port: port,
AsAddress: ip_types.NewAddress(addr),
IsDel: true,
IsFlush: true,
}
reply := &lb.LbAddDelAsV2Reply{}
if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
return fmt.Errorf("lb_add_del_as_v2 del %s@%s: %w", addr, prefix, err)
}
if reply.Retval != 0 {
return fmt.Errorf("lb_add_del_as_v2 del %s@%s: retval=%d", addr, prefix, reply.Retval)
}
slog.Debug("vpp-lbsync-as-del",
"vip", prefix.String(),
"protocol", protocolName(protocol),
"port", port,
"address", addr.String())
return nil
}
func setASWeight(ch *loggedChannel, prefix *net.IPNet, protocol uint8, port uint16, a desiredAS) error {
req := &lb.LbAsSetWeight{
Pfx: ip_types.NewAddressWithPrefix(*prefix),
Protocol: protocol,
Port: port,
AsAddress: ip_types.NewAddress(a.Address),
Weight: a.Weight,
}
reply := &lb.LbAsSetWeightReply{}
if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
return fmt.Errorf("lb_as_set_weight %s@%s: %w", a.Address, prefix, err)
}
if reply.Retval != 0 {
return fmt.Errorf("lb_as_set_weight %s@%s: retval=%d", a.Address, prefix, reply.Retval)
}
slog.Debug("vpp-lbsync-as-weight",
"vip", prefix.String(),
"protocol", protocolName(protocol),
"port", port,
"address", a.Address.String(),
"weight", a.Weight)
return nil
}
// ---- utility ---------------------------------------------------------------
func makeVIPKey(prefix *net.IPNet, protocol uint8, port uint16) vipKey {
return vipKey{prefix: prefix.String(), protocol: protocol, port: port}
}
func protocolFromConfig(s string) uint8 {
switch s {
case "tcp":
return 6
case "udp":
return 17
}
return 255 // any
}
func protocolName(p uint8) string {
switch p {
case 6:
return "tcp"
case 17:
return "udp"
case 255:
return "any"
}
return fmt.Sprintf("%d", p)
}
func encapForIP(ip net.IP) lb_types.LbEncapType {
if ip.To4() != nil {
return lb_types.LB_API_ENCAP_TYPE_GRE4
}
return lb_types.LB_API_ENCAP_TYPE_GRE6
}
func encapName(e lb_types.LbEncapType) string {
switch e {
case lb_types.LB_API_ENCAP_TYPE_GRE4:
return "gre4"
case lb_types.LB_API_ENCAP_TYPE_GRE6:
return "gre6"
}
return fmt.Sprintf("%d", e)
}
func clampWeight(w int) uint8 {
if w < 0 {
return 0
}
if w > 100 {
return 100
}
return uint8(w)
}