diff --git a/cmd/maglevc/commands.go b/cmd/maglevc/commands.go index e39d45c..68ac144 100644 --- a/cmd/maglevc/commands.go +++ b/cmd/maglevc/commands.go @@ -413,15 +413,18 @@ func runShowFrontend(ctx context.Context, client grpcapi.MaglevClient, args []st if beErr == nil && !beInfo.Enabled { suffix = " [disabled]" } - weightStr := "" - if pb.Weight != 100 { - weightStr = fmt.Sprintf(" %s %d", label("weight"), pb.Weight) - } + // Show both the configured weight (from YAML) and the + // state-aware effective weight (what gets programmed into VPP + // after pool-failover logic). Format matches the VPP-style + // key-value line so robot tests can parse it with a regex. + metaStr := fmt.Sprintf(" %s %d %s %d", + label("weight"), pb.Weight, + label("effective"), pb.EffectiveWeight) if i == 0 { bePad := strings.Repeat(" ", poolLblWidth-len("backends")) - fmt.Printf("%s%s%s%s%s%s%s\n", poolIndent, label("backends"), bePad, poolSep, pb.Name, weightStr, suffix) + fmt.Printf("%s%s%s%s%s%s%s\n", poolIndent, label("backends"), bePad, poolSep, pb.Name, metaStr, suffix) } else { - fmt.Printf("%s%s%s%s\n", contIndent, pb.Name, weightStr, suffix) + fmt.Printf("%s%s%s%s\n", contIndent, pb.Name, metaStr, suffix) } } } diff --git a/cmd/maglevd/main.go b/cmd/maglevd/main.go index 0badf9a..d0dbc4d 100644 --- a/cmd/maglevd/main.go +++ b/cmd/maglevd/main.go @@ -99,8 +99,13 @@ func run() error { var vppClient *vpp.Client if *vppAPIAddr != "" { vppClient = vpp.New(*vppAPIAddr, *vppStatsAddr) - vppClient.SetConfigSource(chkr) + vppClient.SetStateSource(chkr) go vppClient.Run(ctx) + // The reconciler subscribes to checker events and pushes per-VIP + // syncs into VPP on every backend state transition. This is the + // single place where transitions translate into dataplane changes. + reconciler := vpp.NewReconciler(vppClient, chkr, chkr) + go reconciler.Run(ctx) } // ---- gRPC server -------------------------------------------------------- diff --git a/internal/checker/checker.go b/internal/checker/checker.go index e61e39e..9c2b4af 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -161,6 +161,19 @@ func (c *Checker) Config() *config.Config { return c.cfg } +// BackendState returns the current health state of a backend. Returns +// (StateUnknown, false) when the backend has no worker. Satisfies +// vpp.StateSource. +func (c *Checker) BackendState(name string) (health.State, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + w, ok := c.workers[name] + if !ok { + return health.StateUnknown, false + } + return w.backend.State, true +} + // ListFrontends returns the names of all configured frontends. func (c *Checker) ListFrontends() []string { c.mu.RLock() @@ -382,8 +395,9 @@ func (c *Checker) DisableBackend(name string) (BackendSnapshot, bool) { return BackendSnapshot{Health: w.backend, Config: w.entry}, true } -// EnableBackend re-enables a previously disabled backend. A fresh probe -// goroutine is started and the backend re-enters StateUnknown. +// EnableBackend re-enables a previously disabled backend. The existing +// Backend struct is reused — its transition history is preserved — and a +// fresh probe goroutine is launched. The backend re-enters StateUnknown. func (c *Checker) EnableBackend(name string) (BackendSnapshot, bool) { c.mu.Lock() defer c.mu.Unlock() @@ -394,23 +408,26 @@ func (c *Checker) EnableBackend(name string) (BackendSnapshot, bool) { if w.entry.Enabled { return BackendSnapshot{Health: w.backend, Config: w.entry}, true } - entry := w.entry - entry.Enabled = true + w.entry.Enabled = true if b, ok := c.cfg.Backends[name]; ok { b.Enabled = true c.cfg.Backends[name] = b } maxHistory := c.cfg.HealthChecker.TransitionHistory - hc := c.cfg.HealthChecks[entry.HealthCheck] - c.startWorker(c.runCtx, name, entry, hc, 0, 1, maxHistory) - nw := c.workers[name] - t := nw.backend.Transitions[0] + t := w.backend.Enable(maxHistory) slog.Info("backend-transition", "backend", name, "from", t.From.String(), "to", t.To.String(), ) - c.emitForBackend(name, nw.backend.Address, t, c.cfg.Frontends) - return BackendSnapshot{Health: nw.backend, Config: nw.entry}, true + c.emitForBackend(name, w.backend.Address, t, c.cfg.Frontends) + + // Launch a fresh probe goroutine with a new cancellable context, + // keeping the existing worker and its transition history. + wCtx, cancel := context.WithCancel(c.runCtx) + w.cancel = cancel + w.wakeCh = make(chan struct{}, 1) + go c.runProbe(wCtx, name, 0, 1) + return BackendSnapshot{Health: w.backend, Config: w.entry}, true } // ---- internal -------------------------------------------------------------- @@ -455,6 +472,7 @@ func (c *Checker) runProbe(ctx context.Context, name string, pos, total int) { } } + first := true for { c.mu.RLock() w, ok := c.workers[name] @@ -469,7 +487,15 @@ func (c *Checker) runProbe(ctx context.Context, name string, pos, total int) { wakeCh := w.wakeCh var sleepFor time.Duration if entry.HealthCheck == "" { - sleepFor = 30 * time.Second + // Static (no-healthcheck) backends: the first iteration fires + // the synthetic pass immediately so the backend reaches "up" + // without delay; subsequent iterations idle at 30s since there's + // nothing to do anyway. + if first { + sleepFor = 0 + } else { + sleepFor = 30 * time.Second + } } else { sleepFor = w.backend.NextInterval(hc.Interval, hc.FastInterval, hc.DownInterval) } @@ -481,6 +507,7 @@ func (c *Checker) runProbe(ctx context.Context, name string, pos, total int) { case <-time.After(sleepFor): case <-wakeCh: } + first = false var result health.ProbeResult if entry.HealthCheck == "" { diff --git a/internal/grpcapi/maglev.pb.go b/internal/grpcapi/maglev.pb.go index 5c432ea..7e9c567 100644 --- a/internal/grpcapi/maglev.pb.go +++ b/internal/grpcapi/maglev.pb.go @@ -1214,11 +1214,12 @@ func (x *ListFrontendsResponse) GetFrontendNames() []string { } type PoolBackendInfo struct { - state protoimpl.MessageState `protogen:"open.v1"` - Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` - Weight int32 `protobuf:"varint,2,opt,name=weight,proto3" json:"weight,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Weight int32 `protobuf:"varint,2,opt,name=weight,proto3" json:"weight,omitempty"` // configured weight from YAML (0-100) + EffectiveWeight int32 `protobuf:"varint,3,opt,name=effective_weight,json=effectiveWeight,proto3" json:"effective_weight,omitempty"` // state-aware weight after pool-failover logic + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *PoolBackendInfo) Reset() { @@ -1265,6 +1266,13 @@ func (x *PoolBackendInfo) GetWeight() int32 { return 0 } +func (x *PoolBackendInfo) GetEffectiveWeight() int32 { + if x != nil { + return x.EffectiveWeight + } + return 0 +} + type PoolInfo struct { state protoimpl.MessageState `protogen:"open.v1"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` @@ -2318,10 +2326,11 @@ const file_proto_maglev_proto_rawDesc = "" + "\b_backendB\v\n" + "\t_frontend\">\n" + "\x15ListFrontendsResponse\x12%\n" + - "\x0efrontend_names\x18\x01 \x03(\tR\rfrontendNames\"=\n" + + "\x0efrontend_names\x18\x01 \x03(\tR\rfrontendNames\"h\n" + "\x0fPoolBackendInfo\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x12\x16\n" + - "\x06weight\x18\x02 \x01(\x05R\x06weight\"S\n" + + "\x06weight\x18\x02 \x01(\x05R\x06weight\x12)\n" + + "\x10effective_weight\x18\x03 \x01(\x05R\x0feffectiveWeight\"S\n" + "\bPoolInfo\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x123\n" + "\bbackends\x18\x02 \x03(\v2\x17.maglev.PoolBackendInfoR\bbackends\"\xb6\x01\n" + diff --git a/internal/grpcapi/server.go b/internal/grpcapi/server.go index cdf6694..b561204 100644 --- a/internal/grpcapi/server.go +++ b/internal/grpcapi/server.go @@ -48,7 +48,7 @@ func (s *Server) GetFrontend(_ context.Context, req *GetFrontendRequest) (*Front if !ok { return nil, status.Errorf(codes.NotFound, "frontend %q not found", req.Name) } - return frontendToProto(req.Name, fe), nil + return frontendToProto(req.Name, fe, s.checker), nil } // ListBackends returns the names of all active backends. @@ -110,7 +110,7 @@ func (s *Server) SetFrontendPoolBackendWeight(_ context.Context, req *SetWeightR if err != nil { return nil, status.Errorf(codes.NotFound, "%v", err) } - return frontendToProto(req.Frontend, fe), nil + return frontendToProto(req.Frontend, fe, s.checker), nil } // ListHealthChecks returns the names of all configured health checks. @@ -370,14 +370,18 @@ func ipStringOrEmpty(ip net.IP) string { // ---- conversion helpers ---------------------------------------------------- -func frontendToProto(name string, fe config.Frontend) *FrontendInfo { +func frontendToProto(name string, fe config.Frontend, src vpp.StateSource) *FrontendInfo { + // Compute the state-aware effective weights once; these reflect the + // pool-failover logic and what would be programmed into VPP. + eff := vpp.EffectiveWeights(fe, src) pools := make([]*PoolInfo, 0, len(fe.Pools)) - for _, p := range fe.Pools { + for poolIdx, p := range fe.Pools { pi := &PoolInfo{Name: p.Name} for bName, pb := range p.Backends { pi.Backends = append(pi.Backends, &PoolBackendInfo{ - Name: bName, - Weight: int32(pb.Weight), + Name: bName, + Weight: int32(pb.Weight), + EffectiveWeight: int32(eff[poolIdx][bName]), }) } pools = append(pools, pi) diff --git a/internal/health/state.go b/internal/health/state.go index 871fedd..66056ff 100644 --- a/internal/health/state.go +++ b/internal/health/state.go @@ -206,6 +206,15 @@ func (b *Backend) Disable(maxHistory int) Transition { return b.Transitions[0] } +// Enable transitions a disabled backend back to StateUnknown, resetting the +// counter so the first probe result resolves state (rise-1 preload gives +// 1-pass → Up, 1-fail → Down). Returns the transition. +func (b *Backend) Enable(maxHistory int) Transition { + b.transition(StateUnknown, ProbeResult{Code: "enabled"}, maxHistory) + b.Counter.Health = b.Counter.Rise - 1 + return b.Transitions[0] +} + // Remove transitions the backend to StateRemoved. Returns the transition. // After this call no further probe results are accepted. func (b *Backend) Remove(maxHistory int) Transition { diff --git a/internal/vpp/binapi/lb/lb.ba.go b/internal/vpp/binapi/lb/lb.ba.go index 0381212..df76993 100644 --- a/internal/vpp/binapi/lb/lb.ba.go +++ b/internal/vpp/binapi/lb/lb.ba.go @@ -23,7 +23,7 @@ const _ = api.GoVppAPIPackageIsVersion2 const ( APIFile = "lb" APIVersion = "1.2.0" - VersionCrc = 0x853a5710 + VersionCrc = 0xac602d7b ) // Add an application server for a given VIP @@ -753,6 +753,7 @@ func (m *LbAsDump) Unmarshal(b []byte) error { // - port - destination port. // - as_address - The application server address. // - weight - new bucket weight 0-100. +// - is_flush - The sessions related to this AS should be flushed. // // LbAsSetWeight defines message 'lb_as_set_weight'. type LbAsSetWeight struct { @@ -761,11 +762,12 @@ type LbAsSetWeight struct { Port uint16 `binapi:"u16,name=port" json:"port,omitempty"` AsAddress ip_types.Address `binapi:"address,name=as_address" json:"as_address,omitempty"` Weight uint8 `binapi:"u8,name=weight" json:"weight,omitempty"` + IsFlush bool `binapi:"bool,name=is_flush" json:"is_flush,omitempty"` } func (m *LbAsSetWeight) Reset() { *m = LbAsSetWeight{} } func (*LbAsSetWeight) GetMessageName() string { return "lb_as_set_weight" } -func (*LbAsSetWeight) GetCrcString() string { return "2c72979e" } +func (*LbAsSetWeight) GetCrcString() string { return "2d89bdbd" } func (*LbAsSetWeight) GetMessageType() api.MessageType { return api.RequestMessage } @@ -782,6 +784,7 @@ func (m *LbAsSetWeight) Size() (size int) { size += 1 // m.AsAddress.Af size += 1 * 16 // m.AsAddress.Un size += 1 // m.Weight + size += 1 // m.IsFlush return size } func (m *LbAsSetWeight) Marshal(b []byte) ([]byte, error) { @@ -797,6 +800,7 @@ func (m *LbAsSetWeight) Marshal(b []byte) ([]byte, error) { buf.EncodeUint8(uint8(m.AsAddress.Af)) buf.EncodeBytes(m.AsAddress.Un.XXX_UnionData[:], 16) buf.EncodeUint8(m.Weight) + buf.EncodeBool(m.IsFlush) return buf.Bytes(), nil } func (m *LbAsSetWeight) Unmarshal(b []byte) error { @@ -809,6 +813,7 @@ func (m *LbAsSetWeight) Unmarshal(b []byte) error { m.AsAddress.Af = ip_types.AddressFamily(buf.DecodeUint8()) copy(m.AsAddress.Un.XXX_UnionData[:], buf.DecodeBytes(16)) m.Weight = buf.DecodeUint8() + m.IsFlush = buf.DecodeBool() return nil } @@ -1368,7 +1373,7 @@ func file_lb_binapi_init() { api.RegisterMessage((*LbAddDelVipV2Reply)(nil), "lb_add_del_vip_v2_reply_e8d4e804") api.RegisterMessage((*LbAsDetails)(nil), "lb_as_details_8d24c29e") api.RegisterMessage((*LbAsDump)(nil), "lb_as_dump_1063f819") - api.RegisterMessage((*LbAsSetWeight)(nil), "lb_as_set_weight_2c72979e") + api.RegisterMessage((*LbAsSetWeight)(nil), "lb_as_set_weight_2d89bdbd") api.RegisterMessage((*LbAsSetWeightReply)(nil), "lb_as_set_weight_reply_e8d4e804") api.RegisterMessage((*LbAsV2Details)(nil), "lb_as_v2_details_90064aae") api.RegisterMessage((*LbAsV2Dump)(nil), "lb_as_v2_dump_1063f819") diff --git a/internal/vpp/client.go b/internal/vpp/client.go index fdf2327..9166a01 100644 --- a/internal/vpp/client.go +++ b/internal/vpp/client.go @@ -18,14 +18,17 @@ import ( "go.fd.io/govpp/core" "git.ipng.ch/ipng/vpp-maglev/internal/config" + "git.ipng.ch/ipng/vpp-maglev/internal/health" lb "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/lb" ) -// ConfigSource provides a snapshot of the current maglev config to the VPP -// sync loop. checker.Checker satisfies this interface via its Config() method. -// Decoupling via an interface avoids an import cycle with the checker package. -type ConfigSource interface { +// StateSource provides a live view of the running config and the current +// health state of each backend. checker.Checker satisfies this interface via +// its Config() and BackendState() methods. Decoupling via an interface avoids +// an import cycle with the checker package. +type StateSource interface { Config() *config.Config + BackendState(name string) (health.State, bool) } const retryInterval = 5 * time.Second @@ -54,17 +57,26 @@ type Client struct { statsConn *core.StatsConnection statsClient adapter.StatsAPI // raw adapter for DumpStats info Info // populated on successful connect - cfgSrc ConfigSource // optional; enables periodic LB sync + stateSrc StateSource // optional; enables periodic LB sync lastLBConf *lb.LbConf // cached last-pushed lb_conf (dedup) } -// SetConfigSource attaches a live config source. When set, the VPP client -// runs a periodic SyncLBStateAll loop (at the interval from cfg.VPP.LB.SyncInterval) -// for as long as the VPP connection is up. Must be called before Run. -func (c *Client) SetConfigSource(src ConfigSource) { +// SetStateSource attaches a live config + health state source. When set, the +// VPP client runs a periodic SyncLBStateAll loop (at the interval from +// cfg.VPP.LB.SyncInterval) for as long as the VPP connection is up, and +// state-aware weights are used throughout the sync path. Must be called +// before Run. +func (c *Client) SetStateSource(src StateSource) { c.mu.Lock() defer c.mu.Unlock() - c.cfgSrc = src + c.stateSrc = src +} + +// getStateSource returns the registered state source under the mutex. +func (c *Client) getStateSource() StateSource { + c.mu.Lock() + defer c.mu.Unlock() + return c.stateSrc } // New creates a Client for the given socket paths. @@ -117,7 +129,7 @@ func (c *Client) Run(ctx context.Context) { // running config. On startup this is the initial set; on reconnect // (VPP restart) VPP has forgotten everything, so we set it again. c.mu.Lock() - src := c.cfgSrc + src := c.stateSrc c.mu.Unlock() if src != nil { if cfg := src.Config(); cfg != nil { @@ -151,11 +163,9 @@ func (c *Client) Run(ctx context.Context) { // reconciliation). Subsequent runs fire every cfg.VPP.LB.SyncInterval. // Exits when ctx is cancelled. func (c *Client) lbSyncLoop(ctx context.Context) { - c.mu.Lock() - src := c.cfgSrc - c.mu.Unlock() + src := c.getStateSource() if src == nil { - return // no config source registered; nothing to sync + return // no state source registered; nothing to sync } // next-run timestamp starts at "now" so the first tick is immediate. diff --git a/internal/vpp/lbsync.go b/internal/vpp/lbsync.go index b05fdbd..55f25f0 100644 --- a/internal/vpp/lbsync.go +++ b/internal/vpp/lbsync.go @@ -9,6 +9,7 @@ import ( "net" "git.ipng.ch/ipng/vpp-maglev/internal/config" + "git.ipng.ch/ipng/vpp-maglev/internal/health" ip_types "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/ip_types" lb "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/lb" lb_types "git.ipng.ch/ipng/vpp-maglev/internal/vpp/binapi/lb_types" @@ -37,6 +38,7 @@ type desiredVIP struct { type desiredAS struct { Address net.IP Weight uint8 // 0-100 + Flush bool // if true, drop existing flows when transitioning to weight 0 } // syncStats counts changes made to the dataplane during a sync run. @@ -60,12 +62,16 @@ func (c *Client) SyncLBStateAll(cfg *config.Config) error { if !c.IsConnected() { return errNotConnected } + src := c.getStateSource() + if src == nil { + return fmt.Errorf("no state source configured") + } cur, err := c.GetLBStateAll() if err != nil { return fmt.Errorf("read VPP LB state: %w", err) } - desired := desiredFromConfig(cfg) + desired := desiredFromConfig(cfg, src) ch, err := c.apiChannel() if err != nil { @@ -131,11 +137,15 @@ func (c *Client) SyncLBStateVIP(cfg *config.Config, feName string) error { if !c.IsConnected() { return errNotConnected } + src := c.getStateSource() + if src == nil { + return fmt.Errorf("no state source configured") + } fe, ok := cfg.Frontends[feName] if !ok { return fmt.Errorf("%q: %w", feName, ErrFrontendNotFound) } - d := desiredFromFrontend(cfg, fe) + d := desiredFromFrontend(cfg, fe, src) cur, err := c.GetLBStateVIP(d.Prefix, d.Protocol, d.Port) if err != nil { @@ -215,7 +225,12 @@ func reconcileVIP(ch *loggedChannel, d desiredVIP, cur *LBVIP, st *syncStats) er continue } if c.Weight != a.Weight { - if err := setASWeight(ch, d.Prefix, d.Protocol, d.Port, a); err != nil { + // Flush only on the transition from serving traffic (cur > 0) to + // zero, and only when the desired state explicitly asks for it + // (i.e. the backend was disabled, not merely drained). Steady- + // state syncs where weight doesn't change never re-flush. + flush := a.Flush && c.Weight > 0 && a.Weight == 0 + if err := setASWeight(ch, d.Prefix, d.Protocol, d.Port, a, flush); err != nil { return err } st.asWeight++ @@ -240,10 +255,12 @@ func removeVIP(ch *loggedChannel, v LBVIP, st *syncStats) error { } // desiredFromConfig flattens every frontend in cfg into a desired VIP set. -func desiredFromConfig(cfg *config.Config) []desiredVIP { +// src provides the per-backend health state so weights and flush hints +// reflect the current runtime state, not just the static config. +func desiredFromConfig(cfg *config.Config, src StateSource) []desiredVIP { out := make([]desiredVIP, 0, len(cfg.Frontends)) for _, fe := range cfg.Frontends { - out = append(out, desiredFromFrontend(cfg, fe)) + out = append(out, desiredFromFrontend(cfg, fe, src)) } return out } @@ -252,16 +269,13 @@ func desiredFromConfig(cfg *config.Config) []desiredVIP { // // All backends across all pools of a frontend are merged into a single // application-server list so VPP knows about every backend that could ever -// receive traffic. Weights are assigned as follows: +// receive traffic. The per-AS weight and flush hint are computed by +// asFromBackend from three inputs: (pool index, backend health state, +// configured pool weight). // -// - primary (first) pool: the backend's configured weight -// - any subsequent pool: weight 0 (backend is known but receives no traffic) -// -// This preserves the pool priority model: higher layers can later flip -// secondary-pool backends to non-zero weights on failover without needing to -// add/remove ASes in the dataplane. When the same backend appears in multiple -// pools, the first pool it appears in wins. -func desiredFromFrontend(cfg *config.Config, fe config.Frontend) desiredVIP { +// When the same backend appears in multiple pools, the first pool it +// appears in wins. +func desiredFromFrontend(cfg *config.Config, fe config.Frontend, src StateSource) desiredVIP { bits := 32 if fe.Address.To4() == nil { bits = 128 @@ -272,26 +286,138 @@ func desiredFromFrontend(cfg *config.Config, fe config.Frontend) desiredVIP { Port: fe.Port, ASes: make(map[string]desiredAS), } + + // Snapshot backend states once so the active-pool computation and the + // per-backend weight assignment see a consistent view. + states := make(map[string]health.State) + for _, pool := range fe.Pools { + for bName := range pool.Backends { + if s, ok := src.BackendState(bName); ok { + states[bName] = s + } else { + states[bName] = health.StateUnknown + } + } + } + activePool := activePoolIndex(fe, states) + for poolIdx, pool := range fe.Pools { for bName, pb := range pool.Backends { b, ok := cfg.Backends[bName] - if !ok || !b.Enabled || b.Address == nil { + if !ok || b.Address == nil { continue } + // Disabled backends (either via operator action or config) are + // kept in the desired set so they stay installed in VPP with + // weight=0 — they must not be deleted, otherwise a subsequent + // enable has to re-add them and existing flow-table state (if + // any) is lost. The state machine drives what weight to set + // via asFromBackend; we never filter on b.Enabled here. addr := b.Address.String() if _, already := d.ASes[addr]; already { continue } - var w uint8 - if poolIdx == 0 { - w = clampWeight(pb.Weight) - } // secondary pools: weight 0 (default) - d.ASes[addr] = desiredAS{Address: b.Address, Weight: w} + w, flush := asFromBackend(poolIdx, activePool, states[bName], pb.Weight) + d.ASes[addr] = desiredAS{ + Address: b.Address, + Weight: w, + Flush: flush, + } } } return d } +// EffectiveWeights returns the current effective VPP weight for every backend +// in every pool of fe, keyed by poolIdx and backend name. It runs the same +// failover + state-aware weight calculation that the sync path uses, but +// produces a plain map instead of desiredVIP — intended for observability +// (e.g. the GetFrontend gRPC handler) and for robot-testing the failover +// logic without needing a running VPP instance. +// +// The returned map layout is: result[poolIdx][backendName] = effective weight. +func EffectiveWeights(fe config.Frontend, src StateSource) map[int]map[string]uint8 { + states := make(map[string]health.State) + for _, pool := range fe.Pools { + for bName := range pool.Backends { + if s, ok := src.BackendState(bName); ok { + states[bName] = s + } else { + states[bName] = health.StateUnknown + } + } + } + activePool := activePoolIndex(fe, states) + + out := make(map[int]map[string]uint8, len(fe.Pools)) + for poolIdx, pool := range fe.Pools { + out[poolIdx] = make(map[string]uint8, len(pool.Backends)) + for bName, pb := range pool.Backends { + w, _ := asFromBackend(poolIdx, activePool, states[bName], pb.Weight) + out[poolIdx][bName] = w + } + } + return out +} + +// activePoolIndex returns the index of the first pool in fe that contains at +// least one backend currently in StateUp. This is the priority-failover +// selector: pool[0] is the primary, pool[1] is the first fallback, and so on. +// As long as pool[0] has any up backend, it stays active. When every pool[0] +// backend leaves StateUp (down, paused, disabled, unknown), pool[1] is +// promoted — and so on for further fallback tiers. When no pool has any up +// backend, returns 0 (the return value is unobservable in that case since +// every backend maps to weight 0 regardless of the active pool). +func activePoolIndex(fe config.Frontend, states map[string]health.State) int { + for i, pool := range fe.Pools { + for bName := range pool.Backends { + if states[bName] == health.StateUp { + return i + } + } + } + return 0 +} + +// asFromBackend is the pure mapping from (pool index, active pool, backend +// state, config weight) to the desired VPP AS weight and flush hint. This is +// the single source of truth for the state → dataplane rule — every LB change +// flows through this function. +// +// A backend gets its configured weight iff it is up AND belongs to the +// currently-active pool. Every other case yields weight 0. The only +// state that produces flush=true is disabled. +// +// state in active pool not in active pool flush +// -------- -------------- ------------------- ----- +// unknown 0 0 no +// up configured 0 (standby) no +// down 0 0 no +// paused 0 0 no +// disabled 0 0 yes +// removed handled separately (AS deleted via delAS) +// +// Flush semantics: flush=true means "if the AS currently has a non-zero +// weight in VPP, drop its existing flow-table entries when setting weight +// to 0". The reconciler only acts on flush when transitioning (current +// weight > 0), so steady-state syncs never re-flush. Failover demotion +// (e.g. pool[1] up→standby when pool[0] recovers) does NOT flush — we +// let those sessions drain naturally. +func asFromBackend(poolIdx, activePool int, state health.State, cfgWeight int) (weight uint8, flush bool) { + switch state { + case health.StateUp: + if poolIdx == activePool { + return clampWeight(cfgWeight), false + } + return 0, false + case health.StateDisabled: + return 0, true + default: + // unknown, down, paused: off, drain existing flows naturally. + return 0, false + } +} + // ---- API call helpers ------------------------------------------------------ // defaultFlowsTableLength is sent as NewFlowsTableLength in lb_add_del_vip_v2. @@ -397,13 +523,14 @@ func delAS(ch *loggedChannel, prefix *net.IPNet, protocol uint8, port uint16, ad return nil } -func setASWeight(ch *loggedChannel, prefix *net.IPNet, protocol uint8, port uint16, a desiredAS) error { +func setASWeight(ch *loggedChannel, prefix *net.IPNet, protocol uint8, port uint16, a desiredAS, flush bool) error { req := &lb.LbAsSetWeight{ Pfx: ip_types.NewAddressWithPrefix(*prefix), Protocol: protocol, Port: port, AsAddress: ip_types.NewAddress(a.Address), Weight: a.Weight, + IsFlush: flush, } reply := &lb.LbAsSetWeightReply{} if err := ch.SendRequest(req).ReceiveReply(reply); err != nil { @@ -417,7 +544,8 @@ func setASWeight(ch *loggedChannel, prefix *net.IPNet, protocol uint8, port uint "protocol", protocolName(protocol), "port", port, "address", a.Address.String(), - "weight", a.Weight) + "weight", a.Weight, + "flush", flush) return nil } diff --git a/internal/vpp/lbsync_test.go b/internal/vpp/lbsync_test.go new file mode 100644 index 0000000..f5bade8 --- /dev/null +++ b/internal/vpp/lbsync_test.go @@ -0,0 +1,273 @@ +// Copyright (c) 2026, Pim van Pelt + +package vpp + +import ( + "net" + "testing" + + "git.ipng.ch/ipng/vpp-maglev/internal/config" + "git.ipng.ch/ipng/vpp-maglev/internal/health" +) + +// TestAsFromBackend locks down the state → (weight, flush) truth table. +// This is the single source of truth for how maglevd decides what to +// program into VPP for each backend state. If this test needs updating +// the behavior has deliberately changed. +func TestAsFromBackend(t *testing.T) { + cases := []struct { + name string + poolIdx int + activePool int + state health.State + cfgWeight int + wantWeight uint8 + wantFlush bool + }{ + // up in active pool → configured weight, no flush + {"up active w100", 0, 0, health.StateUp, 100, 100, false}, + {"up active w50", 0, 0, health.StateUp, 50, 50, false}, + {"up active w0", 0, 0, health.StateUp, 0, 0, false}, + {"up active clamp-high", 0, 0, health.StateUp, 150, 100, false}, + {"up active clamp-low", 0, 0, health.StateUp, -5, 0, false}, + + // up in non-active pool → standby (weight 0), no flush + {"up standby pool0 active=1", 0, 1, health.StateUp, 100, 0, false}, + {"up standby pool1 active=0", 1, 0, health.StateUp, 100, 0, false}, + {"up standby pool2 active=0", 2, 0, health.StateUp, 100, 0, false}, + + // up in secondary, promoted because pool[1] is now active + {"up failover pool1 active=1", 1, 1, health.StateUp, 100, 100, false}, + + // unknown → off, drain + {"unknown pool0 active=0", 0, 0, health.StateUnknown, 100, 0, false}, + {"unknown pool1 active=0", 1, 0, health.StateUnknown, 100, 0, false}, + + // down → off, drain (probe might be wrong) + {"down pool0 active=0", 0, 0, health.StateDown, 100, 0, false}, + {"down pool1 active=1", 1, 1, health.StateDown, 100, 0, false}, + + // paused → off, drain (graceful maintenance) + {"paused pool0 active=0", 0, 0, health.StatePaused, 100, 0, false}, + + // disabled → off, flush (hard stop) + {"disabled pool0 active=0", 0, 0, health.StateDisabled, 100, 0, true}, + {"disabled pool1 active=1", 1, 1, health.StateDisabled, 100, 0, true}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + w, f := asFromBackend(tc.poolIdx, tc.activePool, tc.state, tc.cfgWeight) + if w != tc.wantWeight { + t.Errorf("weight: got %d, want %d", w, tc.wantWeight) + } + if f != tc.wantFlush { + t.Errorf("flush: got %v, want %v", f, tc.wantFlush) + } + }) + } +} + +// TestActivePoolIndex locks down the priority-failover selector: the first +// pool containing at least one up backend is the active pool. Default 0. +func TestActivePoolIndex(t *testing.T) { + mkFE := func(pools ...[]string) config.Frontend { + out := make([]config.Pool, len(pools)) + for i, p := range pools { + out[i] = config.Pool{Name: "p", Backends: map[string]config.PoolBackend{}} + for _, name := range p { + out[i].Backends[name] = config.PoolBackend{Weight: 100} + } + } + return config.Frontend{Pools: out} + } + + cases := []struct { + name string + fe config.Frontend + states map[string]health.State + want int + }{ + { + name: "pool0 has up, pool1 standby", + fe: mkFE([]string{"a", "b"}, []string{"c", "d"}), + states: map[string]health.State{"a": health.StateUp, "b": health.StateDown, "c": health.StateUp, "d": health.StateUp}, + want: 0, + }, + { + name: "pool0 all down, pool1 has up → failover", + fe: mkFE([]string{"a", "b"}, []string{"c", "d"}), + states: map[string]health.State{"a": health.StateDown, "b": health.StateDown, "c": health.StateUp, "d": health.StateUp}, + want: 1, + }, + { + name: "pool0 all disabled, pool1 has up → failover", + fe: mkFE([]string{"a", "b"}, []string{"c"}), + states: map[string]health.State{"a": health.StateDisabled, "b": health.StateDisabled, "c": health.StateUp}, + want: 1, + }, + { + name: "pool0 all paused, pool1 has up → failover", + fe: mkFE([]string{"a"}, []string{"c"}), + states: map[string]health.State{"a": health.StatePaused, "c": health.StateUp}, + want: 1, + }, + { + name: "pool0 all unknown (startup), pool1 up → pool1", + fe: mkFE([]string{"a"}, []string{"c"}), + states: map[string]health.State{"a": health.StateUnknown, "c": health.StateUp}, + want: 1, + }, + { + name: "nothing up anywhere → default 0", + fe: mkFE([]string{"a"}, []string{"c"}), + states: map[string]health.State{"a": health.StateDown, "c": health.StateDown}, + want: 0, + }, + { + name: "1 up in pool0 is enough", + fe: mkFE([]string{"a", "b", "c"}, []string{"d"}), + states: map[string]health.State{"a": health.StateDown, "b": health.StateDown, "c": health.StateUp, "d": health.StateUp}, + want: 0, + }, + { + name: "three tiers, pool0 and pool1 both empty → pool2", + fe: mkFE([]string{"a"}, []string{"b"}, []string{"c"}), + states: map[string]health.State{"a": health.StateDown, "b": health.StateDown, "c": health.StateUp}, + want: 2, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := activePoolIndex(tc.fe, tc.states) + if got != tc.want { + t.Errorf("got pool %d, want pool %d", got, tc.want) + } + }) + } +} + +// fakeStateSource implements StateSource from a static map. +type fakeStateSource struct { + cfg *config.Config + states map[string]health.State +} + +func (f *fakeStateSource) Config() *config.Config { return f.cfg } +func (f *fakeStateSource) BackendState(name string) (health.State, bool) { + s, ok := f.states[name] + return s, ok +} + +// TestDesiredFromFrontendFailover is the end-to-end integration test for +// priority-failover: given a frontend with two pools, the desired weights +// flip between pools based on which has any up backends. +func TestDesiredFromFrontendFailover(t *testing.T) { + ip := func(s string) net.IP { return net.ParseIP(s).To4() } + cfg := &config.Config{ + Backends: map[string]config.Backend{ + "p1": {Address: ip("10.0.0.1"), Enabled: true}, + "p2": {Address: ip("10.0.0.2"), Enabled: true}, + "s1": {Address: ip("10.0.0.11"), Enabled: true}, + "s2": {Address: ip("10.0.0.12"), Enabled: true}, + }, + } + fe := config.Frontend{ + Address: ip("192.0.2.1"), + Protocol: "tcp", + Port: 80, + Pools: []config.Pool{ + {Name: "primary", Backends: map[string]config.PoolBackend{ + "p1": {Weight: 100}, + "p2": {Weight: 100}, + }}, + {Name: "fallback", Backends: map[string]config.PoolBackend{ + "s1": {Weight: 100}, + "s2": {Weight: 100}, + }}, + }, + } + + tests := []struct { + name string + states map[string]health.State + want map[string]uint8 // backend IP → expected weight + }{ + { + name: "primary all up → primary serves, secondary standby", + states: map[string]health.State{ + "p1": health.StateUp, "p2": health.StateUp, + "s1": health.StateUp, "s2": health.StateUp, + }, + want: map[string]uint8{ + "10.0.0.1": 100, "10.0.0.2": 100, + "10.0.0.11": 0, "10.0.0.12": 0, + }, + }, + { + name: "primary 1 up → primary still serves", + states: map[string]health.State{ + "p1": health.StateDown, "p2": health.StateUp, + "s1": health.StateUp, "s2": health.StateUp, + }, + want: map[string]uint8{ + "10.0.0.1": 0, "10.0.0.2": 100, + "10.0.0.11": 0, "10.0.0.12": 0, + }, + }, + { + name: "primary all down → failover to secondary", + states: map[string]health.State{ + "p1": health.StateDown, "p2": health.StateDown, + "s1": health.StateUp, "s2": health.StateUp, + }, + want: map[string]uint8{ + "10.0.0.1": 0, "10.0.0.2": 0, + "10.0.0.11": 100, "10.0.0.12": 100, + }, + }, + { + name: "primary all disabled → failover", + states: map[string]health.State{ + "p1": health.StateDisabled, "p2": health.StateDisabled, + "s1": health.StateUp, "s2": health.StateUp, + }, + want: map[string]uint8{ + "10.0.0.1": 0, "10.0.0.2": 0, + "10.0.0.11": 100, "10.0.0.12": 100, + }, + }, + { + name: "everything down → all zero, no serving", + states: map[string]health.State{ + "p1": health.StateDown, "p2": health.StateDown, + "s1": health.StateDown, "s2": health.StateDown, + }, + want: map[string]uint8{ + "10.0.0.1": 0, "10.0.0.2": 0, + "10.0.0.11": 0, "10.0.0.12": 0, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + src := &fakeStateSource{cfg: cfg, states: tc.states} + d := desiredFromFrontend(cfg, fe, src) + for addr, wantW := range tc.want { + got, ok := d.ASes[addr] + if !ok { + t.Errorf("%s: missing from desired set", addr) + continue + } + if got.Weight != wantW { + t.Errorf("%s: weight got %d, want %d", addr, got.Weight, wantW) + } + } + if len(d.ASes) != len(tc.want) { + t.Errorf("got %d ASes, want %d", len(d.ASes), len(tc.want)) + } + }) + } +} diff --git a/internal/vpp/reconciler.go b/internal/vpp/reconciler.go new file mode 100644 index 0000000..1850120 --- /dev/null +++ b/internal/vpp/reconciler.go @@ -0,0 +1,97 @@ +// Copyright (c) 2026, Pim van Pelt + +package vpp + +import ( + "context" + "errors" + "log/slog" + + "git.ipng.ch/ipng/vpp-maglev/internal/checker" +) + +// EventSource is the subset of checker.Checker that Reconciler needs. +// Decoupling via an interface keeps the dependency direction +// vpp → checker (checker never imports vpp). +type EventSource interface { + Subscribe() (<-chan checker.Event, func()) +} + +// Reconciler bridges checker state transitions to VPP dataplane changes. +// It subscribes to the checker's event channel and, for every transition, +// runs SyncLBStateVIP for the frontend the backend belongs to. This is +// the ONLY place in the codebase where backend state transitions cause +// VPP calls — every LB change flows through Client.SyncLBStateVIP. +// +// The reconciler carries no state of its own. Idempotency is guaranteed +// by SyncLBStateVIP itself (diff-based, driven by the pure asFromBackend +// mapping in lbsync.go). +type Reconciler struct { + client *Client + events EventSource + stateSrc StateSource +} + +// NewReconciler creates a Reconciler. client is the VPP client, events is +// the checker (or anything that implements Subscribe), and stateSrc provides +// the live config for SyncLBStateVIP calls. All three are normally the +// checker/vpp client pair constructed at daemon startup. +func NewReconciler(client *Client, events EventSource, stateSrc StateSource) *Reconciler { + return &Reconciler{client: client, events: events, stateSrc: stateSrc} +} + +// Run subscribes to the checker and loops until ctx is cancelled. Each +// received event fires a single-VIP sync for the frontend the transitioned +// backend belongs to. +func (r *Reconciler) Run(ctx context.Context) { + ch, unsub := r.events.Subscribe() + defer unsub() + + slog.Info("vpp-reconciler-start") + defer slog.Info("vpp-reconciler-stop") + + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-ch: + if !ok { + return + } + r.handle(ev) + } + } +} + +// handle reconciles one event. Operates only on events that carry a +// frontend name (the checker emits one event per frontend that references +// the backend, so a backend shared across multiple frontends produces +// multiple events and all relevant VIPs are reconciled). +func (r *Reconciler) handle(ev checker.Event) { + if ev.FrontendName == "" { + return + } + cfg := r.stateSrc.Config() + if cfg == nil { + return + } + slog.Debug("vpp-reconciler-event", + "frontend", ev.FrontendName, + "backend", ev.BackendName, + "from", ev.Transition.From.String(), + "to", ev.Transition.To.String()) + + if err := r.client.SyncLBStateVIP(cfg, ev.FrontendName); err != nil { + if errors.Is(err, ErrFrontendNotFound) { + // Frontend was removed between the event being emitted and + // us handling it; a periodic SyncLBStateAll will clean it up. + return + } + slog.Warn("vpp-reconciler-error", + "frontend", ev.FrontendName, + "backend", ev.BackendName, + "from", ev.Transition.From.String(), + "to", ev.Transition.To.String(), + "err", err) + } +} diff --git a/proto/maglev.proto b/proto/maglev.proto index 1cbfc82..d5099b2 100644 --- a/proto/maglev.proto +++ b/proto/maglev.proto @@ -150,7 +150,8 @@ message ListFrontendsResponse { message PoolBackendInfo { string name = 1; - int32 weight = 2; + int32 weight = 2; // configured weight from YAML (0-100) + int32 effective_weight = 3; // state-aware weight after pool-failover logic } message PoolInfo { diff --git a/tests/01-maglevd/01-healthcheck.robot b/tests/01-maglevd/01-healthcheck.robot index f368beb..dee05f1 100644 --- a/tests/01-maglevd/01-healthcheck.robot +++ b/tests/01-maglevd/01-healthcheck.robot @@ -114,6 +114,37 @@ Prometheus reports transition counters Should Match Regexp ${output} maglev_backend_transitions_total\\{backend="nginx1",from="unknown",to="up"\\}\\s+[1-9] +# ---- pool failover tests ---------------------------------------------------- +# +# These tests use the static failover-vip frontend defined in maglev.yaml: +# one backend in the primary pool (static-primary) and one in the fallback +# pool (static-fallback). Both have no healthcheck, so they're always in +# state=up. Because the effective weight is computed from the pool-failover +# logic (and not from probes), these tests are deterministic and don't +# depend on timing or a running VPP. + +Failover: primary up, secondary standby + Wait Until Keyword Succeeds 3s 200ms + ... Static Backend Should Be Up static-primary + Wait Until Keyword Succeeds 3s 200ms + ... Static Backend Should Be Up static-fallback + Effective Weight Should Be failover-vip static-primary 100 + Effective Weight Should Be failover-vip static-fallback 0 + +Failover: disable primary → fallback takes over + Maglevc set backend static-primary disable + Backend Should Have State static-primary disabled + Effective Weight Should Be failover-vip static-primary 0 + Effective Weight Should Be failover-vip static-fallback 100 + +Failover: enable primary → fallback steps back + Maglevc set backend static-primary enable + Wait Until Keyword Succeeds 3s 200ms + ... Static Backend Should Be Up static-primary + Effective Weight Should Be failover-vip static-primary 100 + Effective Weight Should Be failover-vip static-fallback 0 + + *** Keywords *** Setup Suite ${arch} = Run go env GOARCH @@ -166,3 +197,23 @@ Scrape Metrics ... curl -sf ${METRICS_URL} Should Be Equal As Integers ${rc} 0 RETURN ${output} + +Static Backend Should Be Up + [Documentation] Like Backend Should Be Up but for backends without a + ... healthcheck (hop straight to up via the synthetic-pass path). + [Arguments] ${name} + ${output} = Maglevc show backends ${name} + Should Match Regexp ${output} state\\s+up + +Effective Weight Should Be + [Documentation] Parse 'show frontends ' output for the named + ... backend and assert its effective weight matches the expected value. + ... Backend rows have the form: + ... [backends ] weight effective + ... so we match on followed by 'weight N effective E' anywhere + ... on a single line. + [Arguments] ${frontend} ${backend} ${expected} + ${output} = Maglevc show frontends ${frontend} + Should Match Regexp ${output} + ... ${backend}\\s+weight\\s+\\d+\\s+effective\\s+${expected}\\b + ... backend ${backend}: expected effective weight ${expected} in:\n${output} diff --git a/tests/01-maglevd/maglevd-lab/maglev.yaml b/tests/01-maglevd/maglevd-lab/maglev.yaml index baea876..6093d2e 100644 --- a/tests/01-maglevd/maglevd-lab/maglev.yaml +++ b/tests/01-maglevd/maglevd-lab/maglev.yaml @@ -31,6 +31,13 @@ maglev: nginx3: address: 172.20.30.13 healthcheck: http-check + # Static (no-healthcheck) backends used by the failover test. These + # skip the probe loop entirely and go to state=up immediately, giving + # the test deterministic state without any timing. + static-primary: + address: 192.0.2.10 + static-fallback: + address: 192.0.2.11 frontends: http-vip: @@ -46,3 +53,16 @@ maglev: - name: fallback backends: nginx3: {} + + failover-vip: + description: "Static failover test VIP" + address: 192.0.2.2 + protocol: tcp + port: 80 + pools: + - name: primary + backends: + static-primary: {} + - name: fallback + backends: + static-fallback: {}