VPP reconciler: event-driven sync, pool failover, bug fixes

This commit wires the checker's state machine through to the VPP dataplane:
every backend state transition flows through a single code path that
recomputes the effective per-backend weight (with pool failover) and pushes
the result to VPP. Along the way several latent bugs in the state machine
and the sync path were fixed.

internal/vpp/reconciler.go (new)
- New Reconciler type subscribes to checker.Checker events and, on every
  transition, calls Client.SyncLBStateVIP for the affected frontend. This
  is the ONLY place in the codebase where backend state changes cause VPP
  calls — the "single path" discipline requested during design.
- Defines an EventSource interface (checker.Checker satisfies it) so the
  dependency direction stays vpp → checker; the checker never imports vpp.

internal/vpp/client.go
- Renamed ConfigSource → StateSource. The interface now has two methods:
  Config() and BackendState(name) — the reconciler and the desired-state
  builder both need live health state to compute effective weights.
- SetConfigSource → SetStateSource; internal cfgSrc field → stateSrc.
- New getStateSource() helper for internal locked access.
- lbSyncLoop still uses the state source for its periodic drift
  reconciliation; it's fully idempotent and runs the same code path as
  event-driven syncs.

internal/vpp/lbsync.go
- desiredAS grows a Flush bool so the mapping function can signal "on
  transition to weight 0, flush existing flow-table entries".
- asFromBackend is now the single source of truth for the state →
  (weight, flush) rule. Documented with a full truth table. Takes an
  activePool parameter so it can distinguish "up in active pool" from
  "up but standby".
- activePoolIndex(fe, states) implements priority failover: returns the
  index of the first pool containing any StateUp backend. pool[0] wins
  when at least one member is up; pool[1] takes over when pool[0] is
  empty; and so on. Defaults to 0 (unobservable, since all backends map
  to weight 0 when nothing is up).
- desiredFromFrontend snapshots backend states once, computes activePool,
  then walks every backend through asFromBackend. No more filtering on
  b.Enabled — disabled backends stay in the desired set so they keep
  their AS entry in VPP with weight=0. The previous filter caused delAS
  on disable, which destroyed the entry and broke enable afterwards.
- EffectiveWeights(fe, src) exported helper that returns the per-pool
  per-backend weight map for one frontend. Used by the gRPC GetFrontend
  handler and robot tests to observe failover without touching VPP.
- reconcileVIP computes flush at the weight-change call site:
    flush = desired.Flush && cur.Weight > 0 && desired.Weight == 0
  This ensures only the *transition* to disabled flushes sessions —
  steady-state syncs with already-zero weight skip the call entirely.
- setASWeight now plumbs IsFlush into lb_as_set_weight.

internal/vpp/lbsync_test.go (new)
- TestAsFromBackend: 15 cases locking down the truth table, including
  failover scenarios (up in standby pool, up promoted in pool[1]).
- TestActivePoolIndex: 8 cases covering pool[0]-has-up, pool[0]-all-down,
  all-disabled, all-paused, all-unknown, nothing-up-anywhere, and
  three-tier failover.
- TestDesiredFromFrontendFailover: 5 end-to-end scenarios wiring a fake
  StateSource through desiredFromFrontend and asserting the final
  per-IP weight map. Exercises the complete pipeline without VPP.

internal/checker/checker.go
- Added BackendState(name) (health.State, bool) — one-line method that
  satisfies vpp.StateSource. The checker is otherwise unchanged.
- EnableBackend rewritten to reuse the existing worker (parallel to
  ResumeBackend). The old code called startWorker which constructed a
  brand-new Backend via health.New, throwing away the transition
  history; the resulting 'backend-transition' log showed the bogus
  from=unknown,to=unknown. Now uses w.backend.Enable() to record a
  proper disabled→unknown transition and launches a fresh goroutine.
- Static (no-healthcheck) backends now fire their synthetic 'always up'
  pass on the first iteration of runProbe instead of sleeping 30s
  first. Previously static backends sat in StateUnknown for 30s after
  startup — useless for deterministic testing and surprising for
  operators. The fix is a simple first-iteration flag.

internal/health/state.go
- New Enable(maxHistory) method parallel to Disable. Transitions the
  backend from whatever state it's in (typically StateDisabled) to
  StateUnknown, resets the health counter to rise-1 so the expedited
  resolution kicks in on the first probe result, and emits a transition
  with code 'enabled'.

proto/maglev.proto
- PoolBackendInfo gains effective_weight: the state-aware weight that
  would be programmed into VPP (distinct from the configured weight in
  the YAML). Exposed via GetFrontend.

internal/grpcapi/server.go
- frontendToProto takes a vpp.StateSource, computes effective weights
  via vpp.EffectiveWeights, and populates PoolBackendInfo.EffectiveWeight.
- GetFrontend and SetFrontendPoolBackendWeight updated to pass the
  checker in.

cmd/maglevc/commands.go
- 'show frontends <name>' now renders every pool backend row as
    <name>  weight <cfg>  effective <eff>  [disabled]?
  so both values are always visible. The VPP-style key/value format
  avoids the ANSI-alignment pitfall we hit earlier and makes the output
  regex-parseable for robot tests.

cmd/maglevd/main.go
- Construct and start the Reconciler alongside the VPP client. Two
  extra lines, no other changes to startup.

tests/01-maglevd/maglevd-lab/maglev.yaml
- Two new static backends (static-primary, static-fallback) and a new
  failover-vip frontend with one backend per pool. No healthcheck, so
  the state machine resolves them to 'up' immediately via the synthetic
  pass. Used by the failover robot tests.

tests/01-maglevd/01-healthcheck.robot
- Three new test cases exercising pool failover end-to-end:
  1. primary up, secondary standby (initial state)
  2. disable primary → fallback takes over (effective weight flips)
  3. enable primary → fallback steps back
  All run without VPP: they scrape 'maglevc show frontends <name>' and
  regex-match the effective weight in the output. Deterministic and
  fast (~2s total) because the static backends don't probe.
- Two helper keywords: Static Backend Should Be Up and
  Effective Weight Should Be.

Net result: 16/16 robot tests pass. Backend state transitions now
flow through a single documented path (checker event → reconciler →
SyncLBStateVIP → desiredFromFrontend → asFromBackend → reconcileVIP →
setASWeight), and the pool failover / enable-after-disable / static-
backend-startup bugs are all fixed.
This commit is contained in:
2026-04-12 12:39:57 +02:00
parent d3c5c86037
commit 0049c2ae73
14 changed files with 714 additions and 72 deletions

View File

@@ -413,15 +413,18 @@ func runShowFrontend(ctx context.Context, client grpcapi.MaglevClient, args []st
if beErr == nil && !beInfo.Enabled { if beErr == nil && !beInfo.Enabled {
suffix = " [disabled]" suffix = " [disabled]"
} }
weightStr := "" // Show both the configured weight (from YAML) and the
if pb.Weight != 100 { // state-aware effective weight (what gets programmed into VPP
weightStr = fmt.Sprintf(" %s %d", label("weight"), pb.Weight) // after pool-failover logic). Format matches the VPP-style
} // key-value line so robot tests can parse it with a regex.
metaStr := fmt.Sprintf(" %s %d %s %d",
label("weight"), pb.Weight,
label("effective"), pb.EffectiveWeight)
if i == 0 { if i == 0 {
bePad := strings.Repeat(" ", poolLblWidth-len("backends")) bePad := strings.Repeat(" ", poolLblWidth-len("backends"))
fmt.Printf("%s%s%s%s%s%s%s\n", poolIndent, label("backends"), bePad, poolSep, pb.Name, weightStr, suffix) fmt.Printf("%s%s%s%s%s%s%s\n", poolIndent, label("backends"), bePad, poolSep, pb.Name, metaStr, suffix)
} else { } else {
fmt.Printf("%s%s%s%s\n", contIndent, pb.Name, weightStr, suffix) fmt.Printf("%s%s%s%s\n", contIndent, pb.Name, metaStr, suffix)
} }
} }
} }

View File

@@ -99,8 +99,13 @@ func run() error {
var vppClient *vpp.Client var vppClient *vpp.Client
if *vppAPIAddr != "" { if *vppAPIAddr != "" {
vppClient = vpp.New(*vppAPIAddr, *vppStatsAddr) vppClient = vpp.New(*vppAPIAddr, *vppStatsAddr)
vppClient.SetConfigSource(chkr) vppClient.SetStateSource(chkr)
go vppClient.Run(ctx) go vppClient.Run(ctx)
// The reconciler subscribes to checker events and pushes per-VIP
// syncs into VPP on every backend state transition. This is the
// single place where transitions translate into dataplane changes.
reconciler := vpp.NewReconciler(vppClient, chkr, chkr)
go reconciler.Run(ctx)
} }
// ---- gRPC server -------------------------------------------------------- // ---- gRPC server --------------------------------------------------------

View File

@@ -161,6 +161,19 @@ func (c *Checker) Config() *config.Config {
return c.cfg return c.cfg
} }
// BackendState returns the current health state of a backend. Returns
// (StateUnknown, false) when the backend has no worker. Satisfies
// vpp.StateSource.
func (c *Checker) BackendState(name string) (health.State, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
w, ok := c.workers[name]
if !ok {
return health.StateUnknown, false
}
return w.backend.State, true
}
// ListFrontends returns the names of all configured frontends. // ListFrontends returns the names of all configured frontends.
func (c *Checker) ListFrontends() []string { func (c *Checker) ListFrontends() []string {
c.mu.RLock() c.mu.RLock()
@@ -382,8 +395,9 @@ func (c *Checker) DisableBackend(name string) (BackendSnapshot, bool) {
return BackendSnapshot{Health: w.backend, Config: w.entry}, true return BackendSnapshot{Health: w.backend, Config: w.entry}, true
} }
// EnableBackend re-enables a previously disabled backend. A fresh probe // EnableBackend re-enables a previously disabled backend. The existing
// goroutine is started and the backend re-enters StateUnknown. // Backend struct is reused — its transition history is preserved — and a
// fresh probe goroutine is launched. The backend re-enters StateUnknown.
func (c *Checker) EnableBackend(name string) (BackendSnapshot, bool) { func (c *Checker) EnableBackend(name string) (BackendSnapshot, bool) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@@ -394,23 +408,26 @@ func (c *Checker) EnableBackend(name string) (BackendSnapshot, bool) {
if w.entry.Enabled { if w.entry.Enabled {
return BackendSnapshot{Health: w.backend, Config: w.entry}, true return BackendSnapshot{Health: w.backend, Config: w.entry}, true
} }
entry := w.entry w.entry.Enabled = true
entry.Enabled = true
if b, ok := c.cfg.Backends[name]; ok { if b, ok := c.cfg.Backends[name]; ok {
b.Enabled = true b.Enabled = true
c.cfg.Backends[name] = b c.cfg.Backends[name] = b
} }
maxHistory := c.cfg.HealthChecker.TransitionHistory maxHistory := c.cfg.HealthChecker.TransitionHistory
hc := c.cfg.HealthChecks[entry.HealthCheck] t := w.backend.Enable(maxHistory)
c.startWorker(c.runCtx, name, entry, hc, 0, 1, maxHistory)
nw := c.workers[name]
t := nw.backend.Transitions[0]
slog.Info("backend-transition", "backend", name, slog.Info("backend-transition", "backend", name,
"from", t.From.String(), "from", t.From.String(),
"to", t.To.String(), "to", t.To.String(),
) )
c.emitForBackend(name, nw.backend.Address, t, c.cfg.Frontends) c.emitForBackend(name, w.backend.Address, t, c.cfg.Frontends)
return BackendSnapshot{Health: nw.backend, Config: nw.entry}, true
// Launch a fresh probe goroutine with a new cancellable context,
// keeping the existing worker and its transition history.
wCtx, cancel := context.WithCancel(c.runCtx)
w.cancel = cancel
w.wakeCh = make(chan struct{}, 1)
go c.runProbe(wCtx, name, 0, 1)
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
} }
// ---- internal -------------------------------------------------------------- // ---- internal --------------------------------------------------------------
@@ -455,6 +472,7 @@ func (c *Checker) runProbe(ctx context.Context, name string, pos, total int) {
} }
} }
first := true
for { for {
c.mu.RLock() c.mu.RLock()
w, ok := c.workers[name] w, ok := c.workers[name]
@@ -469,7 +487,15 @@ func (c *Checker) runProbe(ctx context.Context, name string, pos, total int) {
wakeCh := w.wakeCh wakeCh := w.wakeCh
var sleepFor time.Duration var sleepFor time.Duration
if entry.HealthCheck == "" { if entry.HealthCheck == "" {
sleepFor = 30 * time.Second // Static (no-healthcheck) backends: the first iteration fires
// the synthetic pass immediately so the backend reaches "up"
// without delay; subsequent iterations idle at 30s since there's
// nothing to do anyway.
if first {
sleepFor = 0
} else {
sleepFor = 30 * time.Second
}
} else { } else {
sleepFor = w.backend.NextInterval(hc.Interval, hc.FastInterval, hc.DownInterval) sleepFor = w.backend.NextInterval(hc.Interval, hc.FastInterval, hc.DownInterval)
} }
@@ -481,6 +507,7 @@ func (c *Checker) runProbe(ctx context.Context, name string, pos, total int) {
case <-time.After(sleepFor): case <-time.After(sleepFor):
case <-wakeCh: case <-wakeCh:
} }
first = false
var result health.ProbeResult var result health.ProbeResult
if entry.HealthCheck == "" { if entry.HealthCheck == "" {

View File

@@ -1214,11 +1214,12 @@ func (x *ListFrontendsResponse) GetFrontendNames() []string {
} }
type PoolBackendInfo struct { type PoolBackendInfo struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Weight int32 `protobuf:"varint,2,opt,name=weight,proto3" json:"weight,omitempty"` Weight int32 `protobuf:"varint,2,opt,name=weight,proto3" json:"weight,omitempty"` // configured weight from YAML (0-100)
unknownFields protoimpl.UnknownFields EffectiveWeight int32 `protobuf:"varint,3,opt,name=effective_weight,json=effectiveWeight,proto3" json:"effective_weight,omitempty"` // state-aware weight after pool-failover logic
sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
} }
func (x *PoolBackendInfo) Reset() { func (x *PoolBackendInfo) Reset() {
@@ -1265,6 +1266,13 @@ func (x *PoolBackendInfo) GetWeight() int32 {
return 0 return 0
} }
func (x *PoolBackendInfo) GetEffectiveWeight() int32 {
if x != nil {
return x.EffectiveWeight
}
return 0
}
type PoolInfo struct { type PoolInfo struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
@@ -2318,10 +2326,11 @@ const file_proto_maglev_proto_rawDesc = "" +
"\b_backendB\v\n" + "\b_backendB\v\n" +
"\t_frontend\">\n" + "\t_frontend\">\n" +
"\x15ListFrontendsResponse\x12%\n" + "\x15ListFrontendsResponse\x12%\n" +
"\x0efrontend_names\x18\x01 \x03(\tR\rfrontendNames\"=\n" + "\x0efrontend_names\x18\x01 \x03(\tR\rfrontendNames\"h\n" +
"\x0fPoolBackendInfo\x12\x12\n" + "\x0fPoolBackendInfo\x12\x12\n" +
"\x04name\x18\x01 \x01(\tR\x04name\x12\x16\n" + "\x04name\x18\x01 \x01(\tR\x04name\x12\x16\n" +
"\x06weight\x18\x02 \x01(\x05R\x06weight\"S\n" + "\x06weight\x18\x02 \x01(\x05R\x06weight\x12)\n" +
"\x10effective_weight\x18\x03 \x01(\x05R\x0feffectiveWeight\"S\n" +
"\bPoolInfo\x12\x12\n" + "\bPoolInfo\x12\x12\n" +
"\x04name\x18\x01 \x01(\tR\x04name\x123\n" + "\x04name\x18\x01 \x01(\tR\x04name\x123\n" +
"\bbackends\x18\x02 \x03(\v2\x17.maglev.PoolBackendInfoR\bbackends\"\xb6\x01\n" + "\bbackends\x18\x02 \x03(\v2\x17.maglev.PoolBackendInfoR\bbackends\"\xb6\x01\n" +

View File

@@ -48,7 +48,7 @@ func (s *Server) GetFrontend(_ context.Context, req *GetFrontendRequest) (*Front
if !ok { if !ok {
return nil, status.Errorf(codes.NotFound, "frontend %q not found", req.Name) return nil, status.Errorf(codes.NotFound, "frontend %q not found", req.Name)
} }
return frontendToProto(req.Name, fe), nil return frontendToProto(req.Name, fe, s.checker), nil
} }
// ListBackends returns the names of all active backends. // ListBackends returns the names of all active backends.
@@ -110,7 +110,7 @@ func (s *Server) SetFrontendPoolBackendWeight(_ context.Context, req *SetWeightR
if err != nil { if err != nil {
return nil, status.Errorf(codes.NotFound, "%v", err) return nil, status.Errorf(codes.NotFound, "%v", err)
} }
return frontendToProto(req.Frontend, fe), nil return frontendToProto(req.Frontend, fe, s.checker), nil
} }
// ListHealthChecks returns the names of all configured health checks. // ListHealthChecks returns the names of all configured health checks.
@@ -370,14 +370,18 @@ func ipStringOrEmpty(ip net.IP) string {
// ---- conversion helpers ---------------------------------------------------- // ---- conversion helpers ----------------------------------------------------
func frontendToProto(name string, fe config.Frontend) *FrontendInfo { func frontendToProto(name string, fe config.Frontend, src vpp.StateSource) *FrontendInfo {
// Compute the state-aware effective weights once; these reflect the
// pool-failover logic and what would be programmed into VPP.
eff := vpp.EffectiveWeights(fe, src)
pools := make([]*PoolInfo, 0, len(fe.Pools)) pools := make([]*PoolInfo, 0, len(fe.Pools))
for _, p := range fe.Pools { for poolIdx, p := range fe.Pools {
pi := &PoolInfo{Name: p.Name} pi := &PoolInfo{Name: p.Name}
for bName, pb := range p.Backends { for bName, pb := range p.Backends {
pi.Backends = append(pi.Backends, &PoolBackendInfo{ pi.Backends = append(pi.Backends, &PoolBackendInfo{
Name: bName, Name: bName,
Weight: int32(pb.Weight), Weight: int32(pb.Weight),
EffectiveWeight: int32(eff[poolIdx][bName]),
}) })
} }
pools = append(pools, pi) pools = append(pools, pi)

View File

@@ -206,6 +206,15 @@ func (b *Backend) Disable(maxHistory int) Transition {
return b.Transitions[0] return b.Transitions[0]
} }
// Enable transitions a disabled backend back to StateUnknown, resetting the
// counter so the first probe result resolves state (rise-1 preload gives
// 1-pass → Up, 1-fail → Down). Returns the transition.
func (b *Backend) Enable(maxHistory int) Transition {
b.transition(StateUnknown, ProbeResult{Code: "enabled"}, maxHistory)
b.Counter.Health = b.Counter.Rise - 1
return b.Transitions[0]
}
// Remove transitions the backend to StateRemoved. Returns the transition. // Remove transitions the backend to StateRemoved. Returns the transition.
// After this call no further probe results are accepted. // After this call no further probe results are accepted.
func (b *Backend) Remove(maxHistory int) Transition { func (b *Backend) Remove(maxHistory int) Transition {

View File

@@ -23,7 +23,7 @@ const _ = api.GoVppAPIPackageIsVersion2
const ( const (
APIFile = "lb" APIFile = "lb"
APIVersion = "1.2.0" APIVersion = "1.2.0"
VersionCrc = 0x853a5710 VersionCrc = 0xac602d7b
) )
// Add an application server for a given VIP // Add an application server for a given VIP
@@ -753,6 +753,7 @@ func (m *LbAsDump) Unmarshal(b []byte) error {
// - port - destination port. // - port - destination port.
// - as_address - The application server address. // - as_address - The application server address.
// - weight - new bucket weight 0-100. // - weight - new bucket weight 0-100.
// - is_flush - The sessions related to this AS should be flushed.
// //
// LbAsSetWeight defines message 'lb_as_set_weight'. // LbAsSetWeight defines message 'lb_as_set_weight'.
type LbAsSetWeight struct { type LbAsSetWeight struct {
@@ -761,11 +762,12 @@ type LbAsSetWeight struct {
Port uint16 `binapi:"u16,name=port" json:"port,omitempty"` Port uint16 `binapi:"u16,name=port" json:"port,omitempty"`
AsAddress ip_types.Address `binapi:"address,name=as_address" json:"as_address,omitempty"` AsAddress ip_types.Address `binapi:"address,name=as_address" json:"as_address,omitempty"`
Weight uint8 `binapi:"u8,name=weight" json:"weight,omitempty"` Weight uint8 `binapi:"u8,name=weight" json:"weight,omitempty"`
IsFlush bool `binapi:"bool,name=is_flush" json:"is_flush,omitempty"`
} }
func (m *LbAsSetWeight) Reset() { *m = LbAsSetWeight{} } func (m *LbAsSetWeight) Reset() { *m = LbAsSetWeight{} }
func (*LbAsSetWeight) GetMessageName() string { return "lb_as_set_weight" } func (*LbAsSetWeight) GetMessageName() string { return "lb_as_set_weight" }
func (*LbAsSetWeight) GetCrcString() string { return "2c72979e" } func (*LbAsSetWeight) GetCrcString() string { return "2d89bdbd" }
func (*LbAsSetWeight) GetMessageType() api.MessageType { func (*LbAsSetWeight) GetMessageType() api.MessageType {
return api.RequestMessage return api.RequestMessage
} }
@@ -782,6 +784,7 @@ func (m *LbAsSetWeight) Size() (size int) {
size += 1 // m.AsAddress.Af size += 1 // m.AsAddress.Af
size += 1 * 16 // m.AsAddress.Un size += 1 * 16 // m.AsAddress.Un
size += 1 // m.Weight size += 1 // m.Weight
size += 1 // m.IsFlush
return size return size
} }
func (m *LbAsSetWeight) Marshal(b []byte) ([]byte, error) { func (m *LbAsSetWeight) Marshal(b []byte) ([]byte, error) {
@@ -797,6 +800,7 @@ func (m *LbAsSetWeight) Marshal(b []byte) ([]byte, error) {
buf.EncodeUint8(uint8(m.AsAddress.Af)) buf.EncodeUint8(uint8(m.AsAddress.Af))
buf.EncodeBytes(m.AsAddress.Un.XXX_UnionData[:], 16) buf.EncodeBytes(m.AsAddress.Un.XXX_UnionData[:], 16)
buf.EncodeUint8(m.Weight) buf.EncodeUint8(m.Weight)
buf.EncodeBool(m.IsFlush)
return buf.Bytes(), nil return buf.Bytes(), nil
} }
func (m *LbAsSetWeight) Unmarshal(b []byte) error { func (m *LbAsSetWeight) Unmarshal(b []byte) error {
@@ -809,6 +813,7 @@ func (m *LbAsSetWeight) Unmarshal(b []byte) error {
m.AsAddress.Af = ip_types.AddressFamily(buf.DecodeUint8()) m.AsAddress.Af = ip_types.AddressFamily(buf.DecodeUint8())
copy(m.AsAddress.Un.XXX_UnionData[:], buf.DecodeBytes(16)) copy(m.AsAddress.Un.XXX_UnionData[:], buf.DecodeBytes(16))
m.Weight = buf.DecodeUint8() m.Weight = buf.DecodeUint8()
m.IsFlush = buf.DecodeBool()
return nil return nil
} }
@@ -1368,7 +1373,7 @@ func file_lb_binapi_init() {
api.RegisterMessage((*LbAddDelVipV2Reply)(nil), "lb_add_del_vip_v2_reply_e8d4e804") api.RegisterMessage((*LbAddDelVipV2Reply)(nil), "lb_add_del_vip_v2_reply_e8d4e804")
api.RegisterMessage((*LbAsDetails)(nil), "lb_as_details_8d24c29e") api.RegisterMessage((*LbAsDetails)(nil), "lb_as_details_8d24c29e")
api.RegisterMessage((*LbAsDump)(nil), "lb_as_dump_1063f819") api.RegisterMessage((*LbAsDump)(nil), "lb_as_dump_1063f819")
api.RegisterMessage((*LbAsSetWeight)(nil), "lb_as_set_weight_2c72979e") api.RegisterMessage((*LbAsSetWeight)(nil), "lb_as_set_weight_2d89bdbd")
api.RegisterMessage((*LbAsSetWeightReply)(nil), "lb_as_set_weight_reply_e8d4e804") api.RegisterMessage((*LbAsSetWeightReply)(nil), "lb_as_set_weight_reply_e8d4e804")
api.RegisterMessage((*LbAsV2Details)(nil), "lb_as_v2_details_90064aae") api.RegisterMessage((*LbAsV2Details)(nil), "lb_as_v2_details_90064aae")
api.RegisterMessage((*LbAsV2Dump)(nil), "lb_as_v2_dump_1063f819") api.RegisterMessage((*LbAsV2Dump)(nil), "lb_as_v2_dump_1063f819")

View File

@@ -18,14 +18,17 @@ import (
"go.fd.io/govpp/core" "go.fd.io/govpp/core"
"git.ipng.ch/ipng/vpp-maglev/internal/config" "git.ipng.ch/ipng/vpp-maglev/internal/config"
"git.ipng.ch/ipng/vpp-maglev/internal/health"
lb "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/lb" lb "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/lb"
) )
// ConfigSource provides a snapshot of the current maglev config to the VPP // StateSource provides a live view of the running config and the current
// sync loop. checker.Checker satisfies this interface via its Config() method. // health state of each backend. checker.Checker satisfies this interface via
// Decoupling via an interface avoids an import cycle with the checker package. // its Config() and BackendState() methods. Decoupling via an interface avoids
type ConfigSource interface { // an import cycle with the checker package.
type StateSource interface {
Config() *config.Config Config() *config.Config
BackendState(name string) (health.State, bool)
} }
const retryInterval = 5 * time.Second const retryInterval = 5 * time.Second
@@ -54,17 +57,26 @@ type Client struct {
statsConn *core.StatsConnection statsConn *core.StatsConnection
statsClient adapter.StatsAPI // raw adapter for DumpStats statsClient adapter.StatsAPI // raw adapter for DumpStats
info Info // populated on successful connect info Info // populated on successful connect
cfgSrc ConfigSource // optional; enables periodic LB sync stateSrc StateSource // optional; enables periodic LB sync
lastLBConf *lb.LbConf // cached last-pushed lb_conf (dedup) lastLBConf *lb.LbConf // cached last-pushed lb_conf (dedup)
} }
// SetConfigSource attaches a live config source. When set, the VPP client // SetStateSource attaches a live config + health state source. When set, the
// runs a periodic SyncLBStateAll loop (at the interval from cfg.VPP.LB.SyncInterval) // VPP client runs a periodic SyncLBStateAll loop (at the interval from
// for as long as the VPP connection is up. Must be called before Run. // cfg.VPP.LB.SyncInterval) for as long as the VPP connection is up, and
func (c *Client) SetConfigSource(src ConfigSource) { // state-aware weights are used throughout the sync path. Must be called
// before Run.
func (c *Client) SetStateSource(src StateSource) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
c.cfgSrc = src c.stateSrc = src
}
// getStateSource returns the registered state source under the mutex.
func (c *Client) getStateSource() StateSource {
c.mu.Lock()
defer c.mu.Unlock()
return c.stateSrc
} }
// New creates a Client for the given socket paths. // New creates a Client for the given socket paths.
@@ -117,7 +129,7 @@ func (c *Client) Run(ctx context.Context) {
// running config. On startup this is the initial set; on reconnect // running config. On startup this is the initial set; on reconnect
// (VPP restart) VPP has forgotten everything, so we set it again. // (VPP restart) VPP has forgotten everything, so we set it again.
c.mu.Lock() c.mu.Lock()
src := c.cfgSrc src := c.stateSrc
c.mu.Unlock() c.mu.Unlock()
if src != nil { if src != nil {
if cfg := src.Config(); cfg != nil { if cfg := src.Config(); cfg != nil {
@@ -151,11 +163,9 @@ func (c *Client) Run(ctx context.Context) {
// reconciliation). Subsequent runs fire every cfg.VPP.LB.SyncInterval. // reconciliation). Subsequent runs fire every cfg.VPP.LB.SyncInterval.
// Exits when ctx is cancelled. // Exits when ctx is cancelled.
func (c *Client) lbSyncLoop(ctx context.Context) { func (c *Client) lbSyncLoop(ctx context.Context) {
c.mu.Lock() src := c.getStateSource()
src := c.cfgSrc
c.mu.Unlock()
if src == nil { if src == nil {
return // no config source registered; nothing to sync return // no state source registered; nothing to sync
} }
// next-run timestamp starts at "now" so the first tick is immediate. // next-run timestamp starts at "now" so the first tick is immediate.

View File

@@ -9,6 +9,7 @@ import (
"net" "net"
"git.ipng.ch/ipng/vpp-maglev/internal/config" "git.ipng.ch/ipng/vpp-maglev/internal/config"
"git.ipng.ch/ipng/vpp-maglev/internal/health"
ip_types "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/ip_types" ip_types "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/ip_types"
lb "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/lb" lb "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/lb"
lb_types "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/lb_types" lb_types "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/lb_types"
@@ -37,6 +38,7 @@ type desiredVIP struct {
type desiredAS struct { type desiredAS struct {
Address net.IP Address net.IP
Weight uint8 // 0-100 Weight uint8 // 0-100
Flush bool // if true, drop existing flows when transitioning to weight 0
} }
// syncStats counts changes made to the dataplane during a sync run. // syncStats counts changes made to the dataplane during a sync run.
@@ -60,12 +62,16 @@ func (c *Client) SyncLBStateAll(cfg *config.Config) error {
if !c.IsConnected() { if !c.IsConnected() {
return errNotConnected return errNotConnected
} }
src := c.getStateSource()
if src == nil {
return fmt.Errorf("no state source configured")
}
cur, err := c.GetLBStateAll() cur, err := c.GetLBStateAll()
if err != nil { if err != nil {
return fmt.Errorf("read VPP LB state: %w", err) return fmt.Errorf("read VPP LB state: %w", err)
} }
desired := desiredFromConfig(cfg) desired := desiredFromConfig(cfg, src)
ch, err := c.apiChannel() ch, err := c.apiChannel()
if err != nil { if err != nil {
@@ -131,11 +137,15 @@ func (c *Client) SyncLBStateVIP(cfg *config.Config, feName string) error {
if !c.IsConnected() { if !c.IsConnected() {
return errNotConnected return errNotConnected
} }
src := c.getStateSource()
if src == nil {
return fmt.Errorf("no state source configured")
}
fe, ok := cfg.Frontends[feName] fe, ok := cfg.Frontends[feName]
if !ok { if !ok {
return fmt.Errorf("%q: %w", feName, ErrFrontendNotFound) return fmt.Errorf("%q: %w", feName, ErrFrontendNotFound)
} }
d := desiredFromFrontend(cfg, fe) d := desiredFromFrontend(cfg, fe, src)
cur, err := c.GetLBStateVIP(d.Prefix, d.Protocol, d.Port) cur, err := c.GetLBStateVIP(d.Prefix, d.Protocol, d.Port)
if err != nil { if err != nil {
@@ -215,7 +225,12 @@ func reconcileVIP(ch *loggedChannel, d desiredVIP, cur *LBVIP, st *syncStats) er
continue continue
} }
if c.Weight != a.Weight { if c.Weight != a.Weight {
if err := setASWeight(ch, d.Prefix, d.Protocol, d.Port, a); err != nil { // Flush only on the transition from serving traffic (cur > 0) to
// zero, and only when the desired state explicitly asks for it
// (i.e. the backend was disabled, not merely drained). Steady-
// state syncs where weight doesn't change never re-flush.
flush := a.Flush && c.Weight > 0 && a.Weight == 0
if err := setASWeight(ch, d.Prefix, d.Protocol, d.Port, a, flush); err != nil {
return err return err
} }
st.asWeight++ st.asWeight++
@@ -240,10 +255,12 @@ func removeVIP(ch *loggedChannel, v LBVIP, st *syncStats) error {
} }
// desiredFromConfig flattens every frontend in cfg into a desired VIP set. // desiredFromConfig flattens every frontend in cfg into a desired VIP set.
func desiredFromConfig(cfg *config.Config) []desiredVIP { // src provides the per-backend health state so weights and flush hints
// reflect the current runtime state, not just the static config.
func desiredFromConfig(cfg *config.Config, src StateSource) []desiredVIP {
out := make([]desiredVIP, 0, len(cfg.Frontends)) out := make([]desiredVIP, 0, len(cfg.Frontends))
for _, fe := range cfg.Frontends { for _, fe := range cfg.Frontends {
out = append(out, desiredFromFrontend(cfg, fe)) out = append(out, desiredFromFrontend(cfg, fe, src))
} }
return out return out
} }
@@ -252,16 +269,13 @@ func desiredFromConfig(cfg *config.Config) []desiredVIP {
// //
// All backends across all pools of a frontend are merged into a single // All backends across all pools of a frontend are merged into a single
// application-server list so VPP knows about every backend that could ever // application-server list so VPP knows about every backend that could ever
// receive traffic. Weights are assigned as follows: // receive traffic. The per-AS weight and flush hint are computed by
// asFromBackend from three inputs: (pool index, backend health state,
// configured pool weight).
// //
// - primary (first) pool: the backend's configured weight // When the same backend appears in multiple pools, the first pool it
// - any subsequent pool: weight 0 (backend is known but receives no traffic) // appears in wins.
// func desiredFromFrontend(cfg *config.Config, fe config.Frontend, src StateSource) desiredVIP {
// This preserves the pool priority model: higher layers can later flip
// secondary-pool backends to non-zero weights on failover without needing to
// add/remove ASes in the dataplane. When the same backend appears in multiple
// pools, the first pool it appears in wins.
func desiredFromFrontend(cfg *config.Config, fe config.Frontend) desiredVIP {
bits := 32 bits := 32
if fe.Address.To4() == nil { if fe.Address.To4() == nil {
bits = 128 bits = 128
@@ -272,26 +286,138 @@ func desiredFromFrontend(cfg *config.Config, fe config.Frontend) desiredVIP {
Port: fe.Port, Port: fe.Port,
ASes: make(map[string]desiredAS), ASes: make(map[string]desiredAS),
} }
// Snapshot backend states once so the active-pool computation and the
// per-backend weight assignment see a consistent view.
states := make(map[string]health.State)
for _, pool := range fe.Pools {
for bName := range pool.Backends {
if s, ok := src.BackendState(bName); ok {
states[bName] = s
} else {
states[bName] = health.StateUnknown
}
}
}
activePool := activePoolIndex(fe, states)
for poolIdx, pool := range fe.Pools { for poolIdx, pool := range fe.Pools {
for bName, pb := range pool.Backends { for bName, pb := range pool.Backends {
b, ok := cfg.Backends[bName] b, ok := cfg.Backends[bName]
if !ok || !b.Enabled || b.Address == nil { if !ok || b.Address == nil {
continue continue
} }
// Disabled backends (either via operator action or config) are
// kept in the desired set so they stay installed in VPP with
// weight=0 — they must not be deleted, otherwise a subsequent
// enable has to re-add them and existing flow-table state (if
// any) is lost. The state machine drives what weight to set
// via asFromBackend; we never filter on b.Enabled here.
addr := b.Address.String() addr := b.Address.String()
if _, already := d.ASes[addr]; already { if _, already := d.ASes[addr]; already {
continue continue
} }
var w uint8 w, flush := asFromBackend(poolIdx, activePool, states[bName], pb.Weight)
if poolIdx == 0 { d.ASes[addr] = desiredAS{
w = clampWeight(pb.Weight) Address: b.Address,
} // secondary pools: weight 0 (default) Weight: w,
d.ASes[addr] = desiredAS{Address: b.Address, Weight: w} Flush: flush,
}
} }
} }
return d return d
} }
// EffectiveWeights returns the current effective VPP weight for every backend
// in every pool of fe, keyed by poolIdx and backend name. It runs the same
// failover + state-aware weight calculation that the sync path uses, but
// produces a plain map instead of desiredVIP — intended for observability
// (e.g. the GetFrontend gRPC handler) and for robot-testing the failover
// logic without needing a running VPP instance.
//
// The returned map layout is: result[poolIdx][backendName] = effective weight.
func EffectiveWeights(fe config.Frontend, src StateSource) map[int]map[string]uint8 {
states := make(map[string]health.State)
for _, pool := range fe.Pools {
for bName := range pool.Backends {
if s, ok := src.BackendState(bName); ok {
states[bName] = s
} else {
states[bName] = health.StateUnknown
}
}
}
activePool := activePoolIndex(fe, states)
out := make(map[int]map[string]uint8, len(fe.Pools))
for poolIdx, pool := range fe.Pools {
out[poolIdx] = make(map[string]uint8, len(pool.Backends))
for bName, pb := range pool.Backends {
w, _ := asFromBackend(poolIdx, activePool, states[bName], pb.Weight)
out[poolIdx][bName] = w
}
}
return out
}
// activePoolIndex returns the index of the first pool in fe that contains at
// least one backend currently in StateUp. This is the priority-failover
// selector: pool[0] is the primary, pool[1] is the first fallback, and so on.
// As long as pool[0] has any up backend, it stays active. When every pool[0]
// backend leaves StateUp (down, paused, disabled, unknown), pool[1] is
// promoted — and so on for further fallback tiers. When no pool has any up
// backend, returns 0 (the return value is unobservable in that case since
// every backend maps to weight 0 regardless of the active pool).
func activePoolIndex(fe config.Frontend, states map[string]health.State) int {
for i, pool := range fe.Pools {
for bName := range pool.Backends {
if states[bName] == health.StateUp {
return i
}
}
}
return 0
}
// asFromBackend is the pure mapping from (pool index, active pool, backend
// state, config weight) to the desired VPP AS weight and flush hint. This is
// the single source of truth for the state → dataplane rule — every LB change
// flows through this function.
//
// A backend gets its configured weight iff it is up AND belongs to the
// currently-active pool. Every other case yields weight 0. The only
// state that produces flush=true is disabled.
//
// state in active pool not in active pool flush
// -------- -------------- ------------------- -----
// unknown 0 0 no
// up configured 0 (standby) no
// down 0 0 no
// paused 0 0 no
// disabled 0 0 yes
// removed handled separately (AS deleted via delAS)
//
// Flush semantics: flush=true means "if the AS currently has a non-zero
// weight in VPP, drop its existing flow-table entries when setting weight
// to 0". The reconciler only acts on flush when transitioning (current
// weight > 0), so steady-state syncs never re-flush. Failover demotion
// (e.g. pool[1] up→standby when pool[0] recovers) does NOT flush — we
// let those sessions drain naturally.
func asFromBackend(poolIdx, activePool int, state health.State, cfgWeight int) (weight uint8, flush bool) {
switch state {
case health.StateUp:
if poolIdx == activePool {
return clampWeight(cfgWeight), false
}
return 0, false
case health.StateDisabled:
return 0, true
default:
// unknown, down, paused: off, drain existing flows naturally.
return 0, false
}
}
// ---- API call helpers ------------------------------------------------------ // ---- API call helpers ------------------------------------------------------
// defaultFlowsTableLength is sent as NewFlowsTableLength in lb_add_del_vip_v2. // defaultFlowsTableLength is sent as NewFlowsTableLength in lb_add_del_vip_v2.
@@ -397,13 +523,14 @@ func delAS(ch *loggedChannel, prefix *net.IPNet, protocol uint8, port uint16, ad
return nil return nil
} }
func setASWeight(ch *loggedChannel, prefix *net.IPNet, protocol uint8, port uint16, a desiredAS) error { func setASWeight(ch *loggedChannel, prefix *net.IPNet, protocol uint8, port uint16, a desiredAS, flush bool) error {
req := &lb.LbAsSetWeight{ req := &lb.LbAsSetWeight{
Pfx: ip_types.NewAddressWithPrefix(*prefix), Pfx: ip_types.NewAddressWithPrefix(*prefix),
Protocol: protocol, Protocol: protocol,
Port: port, Port: port,
AsAddress: ip_types.NewAddress(a.Address), AsAddress: ip_types.NewAddress(a.Address),
Weight: a.Weight, Weight: a.Weight,
IsFlush: flush,
} }
reply := &lb.LbAsSetWeightReply{} reply := &lb.LbAsSetWeightReply{}
if err := ch.SendRequest(req).ReceiveReply(reply); err != nil { if err := ch.SendRequest(req).ReceiveReply(reply); err != nil {
@@ -417,7 +544,8 @@ func setASWeight(ch *loggedChannel, prefix *net.IPNet, protocol uint8, port uint
"protocol", protocolName(protocol), "protocol", protocolName(protocol),
"port", port, "port", port,
"address", a.Address.String(), "address", a.Address.String(),
"weight", a.Weight) "weight", a.Weight,
"flush", flush)
return nil return nil
} }

273
internal/vpp/lbsync_test.go Normal file
View File

@@ -0,0 +1,273 @@
// Copyright (c) 2026, Pim van Pelt <pim@ipng.ch>
package vpp
import (
"net"
"testing"
"git.ipng.ch/ipng/vpp-maglev/internal/config"
"git.ipng.ch/ipng/vpp-maglev/internal/health"
)
// TestAsFromBackend locks down the state → (weight, flush) truth table.
// This is the single source of truth for how maglevd decides what to
// program into VPP for each backend state. If this test needs updating
// the behavior has deliberately changed.
func TestAsFromBackend(t *testing.T) {
cases := []struct {
name string
poolIdx int
activePool int
state health.State
cfgWeight int
wantWeight uint8
wantFlush bool
}{
// up in active pool → configured weight, no flush
{"up active w100", 0, 0, health.StateUp, 100, 100, false},
{"up active w50", 0, 0, health.StateUp, 50, 50, false},
{"up active w0", 0, 0, health.StateUp, 0, 0, false},
{"up active clamp-high", 0, 0, health.StateUp, 150, 100, false},
{"up active clamp-low", 0, 0, health.StateUp, -5, 0, false},
// up in non-active pool → standby (weight 0), no flush
{"up standby pool0 active=1", 0, 1, health.StateUp, 100, 0, false},
{"up standby pool1 active=0", 1, 0, health.StateUp, 100, 0, false},
{"up standby pool2 active=0", 2, 0, health.StateUp, 100, 0, false},
// up in secondary, promoted because pool[1] is now active
{"up failover pool1 active=1", 1, 1, health.StateUp, 100, 100, false},
// unknown → off, drain
{"unknown pool0 active=0", 0, 0, health.StateUnknown, 100, 0, false},
{"unknown pool1 active=0", 1, 0, health.StateUnknown, 100, 0, false},
// down → off, drain (probe might be wrong)
{"down pool0 active=0", 0, 0, health.StateDown, 100, 0, false},
{"down pool1 active=1", 1, 1, health.StateDown, 100, 0, false},
// paused → off, drain (graceful maintenance)
{"paused pool0 active=0", 0, 0, health.StatePaused, 100, 0, false},
// disabled → off, flush (hard stop)
{"disabled pool0 active=0", 0, 0, health.StateDisabled, 100, 0, true},
{"disabled pool1 active=1", 1, 1, health.StateDisabled, 100, 0, true},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
w, f := asFromBackend(tc.poolIdx, tc.activePool, tc.state, tc.cfgWeight)
if w != tc.wantWeight {
t.Errorf("weight: got %d, want %d", w, tc.wantWeight)
}
if f != tc.wantFlush {
t.Errorf("flush: got %v, want %v", f, tc.wantFlush)
}
})
}
}
// TestActivePoolIndex locks down the priority-failover selector: the first
// pool containing at least one up backend is the active pool. Default 0.
func TestActivePoolIndex(t *testing.T) {
mkFE := func(pools ...[]string) config.Frontend {
out := make([]config.Pool, len(pools))
for i, p := range pools {
out[i] = config.Pool{Name: "p", Backends: map[string]config.PoolBackend{}}
for _, name := range p {
out[i].Backends[name] = config.PoolBackend{Weight: 100}
}
}
return config.Frontend{Pools: out}
}
cases := []struct {
name string
fe config.Frontend
states map[string]health.State
want int
}{
{
name: "pool0 has up, pool1 standby",
fe: mkFE([]string{"a", "b"}, []string{"c", "d"}),
states: map[string]health.State{"a": health.StateUp, "b": health.StateDown, "c": health.StateUp, "d": health.StateUp},
want: 0,
},
{
name: "pool0 all down, pool1 has up → failover",
fe: mkFE([]string{"a", "b"}, []string{"c", "d"}),
states: map[string]health.State{"a": health.StateDown, "b": health.StateDown, "c": health.StateUp, "d": health.StateUp},
want: 1,
},
{
name: "pool0 all disabled, pool1 has up → failover",
fe: mkFE([]string{"a", "b"}, []string{"c"}),
states: map[string]health.State{"a": health.StateDisabled, "b": health.StateDisabled, "c": health.StateUp},
want: 1,
},
{
name: "pool0 all paused, pool1 has up → failover",
fe: mkFE([]string{"a"}, []string{"c"}),
states: map[string]health.State{"a": health.StatePaused, "c": health.StateUp},
want: 1,
},
{
name: "pool0 all unknown (startup), pool1 up → pool1",
fe: mkFE([]string{"a"}, []string{"c"}),
states: map[string]health.State{"a": health.StateUnknown, "c": health.StateUp},
want: 1,
},
{
name: "nothing up anywhere → default 0",
fe: mkFE([]string{"a"}, []string{"c"}),
states: map[string]health.State{"a": health.StateDown, "c": health.StateDown},
want: 0,
},
{
name: "1 up in pool0 is enough",
fe: mkFE([]string{"a", "b", "c"}, []string{"d"}),
states: map[string]health.State{"a": health.StateDown, "b": health.StateDown, "c": health.StateUp, "d": health.StateUp},
want: 0,
},
{
name: "three tiers, pool0 and pool1 both empty → pool2",
fe: mkFE([]string{"a"}, []string{"b"}, []string{"c"}),
states: map[string]health.State{"a": health.StateDown, "b": health.StateDown, "c": health.StateUp},
want: 2,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := activePoolIndex(tc.fe, tc.states)
if got != tc.want {
t.Errorf("got pool %d, want pool %d", got, tc.want)
}
})
}
}
// fakeStateSource implements StateSource from a static map.
type fakeStateSource struct {
cfg *config.Config
states map[string]health.State
}
func (f *fakeStateSource) Config() *config.Config { return f.cfg }
func (f *fakeStateSource) BackendState(name string) (health.State, bool) {
s, ok := f.states[name]
return s, ok
}
// TestDesiredFromFrontendFailover is the end-to-end integration test for
// priority-failover: given a frontend with two pools, the desired weights
// flip between pools based on which has any up backends.
func TestDesiredFromFrontendFailover(t *testing.T) {
ip := func(s string) net.IP { return net.ParseIP(s).To4() }
cfg := &config.Config{
Backends: map[string]config.Backend{
"p1": {Address: ip("10.0.0.1"), Enabled: true},
"p2": {Address: ip("10.0.0.2"), Enabled: true},
"s1": {Address: ip("10.0.0.11"), Enabled: true},
"s2": {Address: ip("10.0.0.12"), Enabled: true},
},
}
fe := config.Frontend{
Address: ip("192.0.2.1"),
Protocol: "tcp",
Port: 80,
Pools: []config.Pool{
{Name: "primary", Backends: map[string]config.PoolBackend{
"p1": {Weight: 100},
"p2": {Weight: 100},
}},
{Name: "fallback", Backends: map[string]config.PoolBackend{
"s1": {Weight: 100},
"s2": {Weight: 100},
}},
},
}
tests := []struct {
name string
states map[string]health.State
want map[string]uint8 // backend IP → expected weight
}{
{
name: "primary all up → primary serves, secondary standby",
states: map[string]health.State{
"p1": health.StateUp, "p2": health.StateUp,
"s1": health.StateUp, "s2": health.StateUp,
},
want: map[string]uint8{
"10.0.0.1": 100, "10.0.0.2": 100,
"10.0.0.11": 0, "10.0.0.12": 0,
},
},
{
name: "primary 1 up → primary still serves",
states: map[string]health.State{
"p1": health.StateDown, "p2": health.StateUp,
"s1": health.StateUp, "s2": health.StateUp,
},
want: map[string]uint8{
"10.0.0.1": 0, "10.0.0.2": 100,
"10.0.0.11": 0, "10.0.0.12": 0,
},
},
{
name: "primary all down → failover to secondary",
states: map[string]health.State{
"p1": health.StateDown, "p2": health.StateDown,
"s1": health.StateUp, "s2": health.StateUp,
},
want: map[string]uint8{
"10.0.0.1": 0, "10.0.0.2": 0,
"10.0.0.11": 100, "10.0.0.12": 100,
},
},
{
name: "primary all disabled → failover",
states: map[string]health.State{
"p1": health.StateDisabled, "p2": health.StateDisabled,
"s1": health.StateUp, "s2": health.StateUp,
},
want: map[string]uint8{
"10.0.0.1": 0, "10.0.0.2": 0,
"10.0.0.11": 100, "10.0.0.12": 100,
},
},
{
name: "everything down → all zero, no serving",
states: map[string]health.State{
"p1": health.StateDown, "p2": health.StateDown,
"s1": health.StateDown, "s2": health.StateDown,
},
want: map[string]uint8{
"10.0.0.1": 0, "10.0.0.2": 0,
"10.0.0.11": 0, "10.0.0.12": 0,
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
src := &fakeStateSource{cfg: cfg, states: tc.states}
d := desiredFromFrontend(cfg, fe, src)
for addr, wantW := range tc.want {
got, ok := d.ASes[addr]
if !ok {
t.Errorf("%s: missing from desired set", addr)
continue
}
if got.Weight != wantW {
t.Errorf("%s: weight got %d, want %d", addr, got.Weight, wantW)
}
}
if len(d.ASes) != len(tc.want) {
t.Errorf("got %d ASes, want %d", len(d.ASes), len(tc.want))
}
})
}
}

View File

@@ -0,0 +1,97 @@
// Copyright (c) 2026, Pim van Pelt <pim@ipng.ch>
package vpp
import (
"context"
"errors"
"log/slog"
"git.ipng.ch/ipng/vpp-maglev/internal/checker"
)
// EventSource is the subset of checker.Checker that Reconciler needs.
// Decoupling via an interface keeps the dependency direction
// vpp → checker (checker never imports vpp).
type EventSource interface {
Subscribe() (<-chan checker.Event, func())
}
// Reconciler bridges checker state transitions to VPP dataplane changes.
// It subscribes to the checker's event channel and, for every transition,
// runs SyncLBStateVIP for the frontend the backend belongs to. This is
// the ONLY place in the codebase where backend state transitions cause
// VPP calls — every LB change flows through Client.SyncLBStateVIP.
//
// The reconciler carries no state of its own. Idempotency is guaranteed
// by SyncLBStateVIP itself (diff-based, driven by the pure asFromBackend
// mapping in lbsync.go).
type Reconciler struct {
client *Client
events EventSource
stateSrc StateSource
}
// NewReconciler creates a Reconciler. client is the VPP client, events is
// the checker (or anything that implements Subscribe), and stateSrc provides
// the live config for SyncLBStateVIP calls. All three are normally the
// checker/vpp client pair constructed at daemon startup.
func NewReconciler(client *Client, events EventSource, stateSrc StateSource) *Reconciler {
return &Reconciler{client: client, events: events, stateSrc: stateSrc}
}
// Run subscribes to the checker and loops until ctx is cancelled. Each
// received event fires a single-VIP sync for the frontend the transitioned
// backend belongs to.
func (r *Reconciler) Run(ctx context.Context) {
ch, unsub := r.events.Subscribe()
defer unsub()
slog.Info("vpp-reconciler-start")
defer slog.Info("vpp-reconciler-stop")
for {
select {
case <-ctx.Done():
return
case ev, ok := <-ch:
if !ok {
return
}
r.handle(ev)
}
}
}
// handle reconciles one event. Operates only on events that carry a
// frontend name (the checker emits one event per frontend that references
// the backend, so a backend shared across multiple frontends produces
// multiple events and all relevant VIPs are reconciled).
func (r *Reconciler) handle(ev checker.Event) {
if ev.FrontendName == "" {
return
}
cfg := r.stateSrc.Config()
if cfg == nil {
return
}
slog.Debug("vpp-reconciler-event",
"frontend", ev.FrontendName,
"backend", ev.BackendName,
"from", ev.Transition.From.String(),
"to", ev.Transition.To.String())
if err := r.client.SyncLBStateVIP(cfg, ev.FrontendName); err != nil {
if errors.Is(err, ErrFrontendNotFound) {
// Frontend was removed between the event being emitted and
// us handling it; a periodic SyncLBStateAll will clean it up.
return
}
slog.Warn("vpp-reconciler-error",
"frontend", ev.FrontendName,
"backend", ev.BackendName,
"from", ev.Transition.From.String(),
"to", ev.Transition.To.String(),
"err", err)
}
}

View File

@@ -150,7 +150,8 @@ message ListFrontendsResponse {
message PoolBackendInfo { message PoolBackendInfo {
string name = 1; string name = 1;
int32 weight = 2; int32 weight = 2; // configured weight from YAML (0-100)
int32 effective_weight = 3; // state-aware weight after pool-failover logic
} }
message PoolInfo { message PoolInfo {

View File

@@ -114,6 +114,37 @@ Prometheus reports transition counters
Should Match Regexp ${output} maglev_backend_transitions_total\\{backend="nginx1",from="unknown",to="up"\\}\\s+[1-9] Should Match Regexp ${output} maglev_backend_transitions_total\\{backend="nginx1",from="unknown",to="up"\\}\\s+[1-9]
# ---- pool failover tests ----------------------------------------------------
#
# These tests use the static failover-vip frontend defined in maglev.yaml:
# one backend in the primary pool (static-primary) and one in the fallback
# pool (static-fallback). Both have no healthcheck, so they're always in
# state=up. Because the effective weight is computed from the pool-failover
# logic (and not from probes), these tests are deterministic and don't
# depend on timing or a running VPP.
Failover: primary up, secondary standby
Wait Until Keyword Succeeds 3s 200ms
... Static Backend Should Be Up static-primary
Wait Until Keyword Succeeds 3s 200ms
... Static Backend Should Be Up static-fallback
Effective Weight Should Be failover-vip static-primary 100
Effective Weight Should Be failover-vip static-fallback 0
Failover: disable primary → fallback takes over
Maglevc set backend static-primary disable
Backend Should Have State static-primary disabled
Effective Weight Should Be failover-vip static-primary 0
Effective Weight Should Be failover-vip static-fallback 100
Failover: enable primary → fallback steps back
Maglevc set backend static-primary enable
Wait Until Keyword Succeeds 3s 200ms
... Static Backend Should Be Up static-primary
Effective Weight Should Be failover-vip static-primary 100
Effective Weight Should Be failover-vip static-fallback 0
*** Keywords *** *** Keywords ***
Setup Suite Setup Suite
${arch} = Run go env GOARCH ${arch} = Run go env GOARCH
@@ -166,3 +197,23 @@ Scrape Metrics
... curl -sf ${METRICS_URL} ... curl -sf ${METRICS_URL}
Should Be Equal As Integers ${rc} 0 Should Be Equal As Integers ${rc} 0
RETURN ${output} RETURN ${output}
Static Backend Should Be Up
[Documentation] Like Backend Should Be Up but for backends without a
... healthcheck (hop straight to up via the synthetic-pass path).
[Arguments] ${name}
${output} = Maglevc show backends ${name}
Should Match Regexp ${output} state\\s+up
Effective Weight Should Be
[Documentation] Parse 'show frontends <fe>' output for the named
... backend and assert its effective weight matches the expected value.
... Backend rows have the form:
... [backends ]<name> weight <cfg> effective <eff>
... so we match on <name> followed by 'weight N effective E' anywhere
... on a single line.
[Arguments] ${frontend} ${backend} ${expected}
${output} = Maglevc show frontends ${frontend}
Should Match Regexp ${output}
... ${backend}\\s+weight\\s+\\d+\\s+effective\\s+${expected}\\b
... backend ${backend}: expected effective weight ${expected} in:\n${output}

View File

@@ -31,6 +31,13 @@ maglev:
nginx3: nginx3:
address: 172.20.30.13 address: 172.20.30.13
healthcheck: http-check healthcheck: http-check
# Static (no-healthcheck) backends used by the failover test. These
# skip the probe loop entirely and go to state=up immediately, giving
# the test deterministic state without any timing.
static-primary:
address: 192.0.2.10
static-fallback:
address: 192.0.2.11
frontends: frontends:
http-vip: http-vip:
@@ -46,3 +53,16 @@ maglev:
- name: fallback - name: fallback
backends: backends:
nginx3: {} nginx3: {}
failover-vip:
description: "Static failover test VIP"
address: 192.0.2.2
protocol: tcp
port: 80
pools:
- name: primary
backends:
static-primary: {}
- name: fallback
backends:
static-fallback: {}