// Copyright (c) 2026, Pim van Pelt package main import ( "context" "encoding/json" "errors" "fmt" "io" "log/slog" "net" "sort" "strings" "sync" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "git.ipng.ch/ipng/vpp-maglev/internal/grpcapi" ) // maglevClient is a per-maglevd gRPC client plus cache and background loops. type maglevClient struct { name string address string conn *grpc.ClientConn api grpcapi.MaglevClient broker *Broker mu sync.RWMutex connected bool lastErr string cache cachedState // lbWakeCh is a buffer-1 trigger channel feeding lbStateLoop. Every // backend transition (and a few other events) does a non-blocking send // here; the loop coalesces bursts into at most one GetVPPLBState call // per second. See lbStateLoop for the leading+trailing-edge debounce. lbWakeCh chan struct{} } // cachedState is the per-maglevd snapshot served via the REST handlers. // Frontends / Backends / HealthChecks are maps for O(1) lookup from the // event path, and the *Order slices preserve the order returned by the // corresponding List* RPC so the UI renders in a stable order across // reloads instead of Go map iteration's randomised order. type cachedState struct { Frontends map[string]*FrontendSnapshot FrontendsOrder []string Backends map[string]*BackendSnapshot BackendsOrder []string HealthChecks map[string]*HealthCheckSnapshot HealthCheckOrder []string VPPInfo *VPPInfoSnapshot VPPState string // "", "connected", "disconnected" LBState *LBStateSnapshot LastRefresh time.Time } func newMaglevClient(address string, broker *Broker) (*maglevClient, error) { conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } return &maglevClient{ name: hostnameOf(address), address: address, conn: conn, api: grpcapi.NewMaglevClient(conn), broker: broker, cache: cachedState{ Frontends: map[string]*FrontendSnapshot{}, Backends: map[string]*BackendSnapshot{}, HealthChecks: map[string]*HealthCheckSnapshot{}, }, lbWakeCh: make(chan struct{}, 1), }, nil } // hostnameOf strips the port from an address and returns a short display // name. For DNS names we take the first label ("lb-ams.internal:9090" → // "lb-ams"). For IP literals we return the full address so we don't // accidentally truncate "127.0.0.1" to "127". func hostnameOf(address string) string { host := address if h, _, err := net.SplitHostPort(address); err == nil { host = h } host = strings.TrimPrefix(strings.TrimSuffix(host, "]"), "[") if net.ParseIP(host) != nil { return host } if i := strings.Index(host, "."); i >= 0 { return host[:i] } return host } func (c *maglevClient) Close() { _ = c.conn.Close() } // BackendAction runs one of the four lifecycle operations on a backend. // Valid actions are "pause", "resume", "enable", and "disable". The // fresh backend snapshot returned by maglevd is converted and sent // back to the caller so the admin API handler can reply with the // post-mutation state in a single round-trip. The broadcast // WatchEvents stream will also deliver a transition event which the // local cache and every connected browser apply through the normal // reducer path — so the UI converges even if the HTTP response is // slow or dropped in flight. func (c *maglevClient) BackendAction(ctx context.Context, name, action string) (*BackendSnapshot, error) { req := &grpcapi.BackendRequest{Name: name} var bi *grpcapi.BackendInfo var err error switch action { case "pause": bi, err = c.api.PauseBackend(ctx, req) case "resume": bi, err = c.api.ResumeBackend(ctx, req) case "enable": bi, err = c.api.EnableBackend(ctx, req) case "disable": bi, err = c.api.DisableBackend(ctx, req) default: return nil, fmt.Errorf("unknown action %q", action) } if err != nil { return nil, err } return backendFromProto(bi), nil } // SetBackendWeight runs the SetFrontendPoolBackendWeight gRPC call. A // fresh FrontendSnapshot is returned so admin callers get the // post-mutation effective weights in one round-trip. func (c *maglevClient) SetBackendWeight(ctx context.Context, frontend, pool, backend string, weight int32, flush bool) (*FrontendSnapshot, error) { fi, err := c.api.SetFrontendPoolBackendWeight(ctx, &grpcapi.SetWeightRequest{ Frontend: frontend, Pool: pool, Backend: backend, Weight: weight, Flush: flush, }) if err != nil { return nil, err } return frontendFromProto(fi), nil } func (c *maglevClient) Start(ctx context.Context) { go c.watchLoop(ctx) go c.refreshLoop(ctx) go c.healthLoop(ctx) go c.lbStateLoop(ctx) } func (c *maglevClient) setConnected(ok bool, errMsg string) { c.mu.Lock() prev := c.connected c.connected = ok c.lastErr = errMsg c.mu.Unlock() if prev != ok { payload, _ := json.Marshal(MaglevdStatusPayload{Connected: ok, LastError: errMsg}) c.broker.Publish(BrowserEvent{ Maglevd: c.name, Type: "maglevd-status", AtUnixNs: time.Now().UnixNano(), Payload: payload, }) } } // Info returns the current connection status for this maglevd. func (c *maglevClient) Info() MaglevdInfo { c.mu.RLock() defer c.mu.RUnlock() return MaglevdInfo{ Name: c.name, Address: c.address, Connected: c.connected, LastError: c.lastErr, } } // Snapshot returns a deep-ish copy of the cached state for REST handlers. // Iteration order follows the corresponding *Order slice so the UI sees a // stable, RPC-defined order across reloads. func (c *maglevClient) Snapshot() *StateSnapshot { c.mu.RLock() defer c.mu.RUnlock() snap := &StateSnapshot{ Maglevd: MaglevdInfo{ Name: c.name, Address: c.address, Connected: c.connected, LastError: c.lastErr, }, Frontends: make([]*FrontendSnapshot, 0, len(c.cache.FrontendsOrder)), Backends: make([]*BackendSnapshot, 0, len(c.cache.BackendsOrder)), HealthChecks: make([]*HealthCheckSnapshot, 0, len(c.cache.HealthCheckOrder)), VPPInfo: c.cache.VPPInfo, VPPState: c.cache.VPPState, LBState: c.cache.LBState, } for _, name := range c.cache.FrontendsOrder { if f, ok := c.cache.Frontends[name]; ok { snap.Frontends = append(snap.Frontends, f) } } for _, name := range c.cache.BackendsOrder { if b, ok := c.cache.Backends[name]; ok { snap.Backends = append(snap.Backends, b) } } for _, name := range c.cache.HealthCheckOrder { if h, ok := c.cache.HealthChecks[name]; ok { snap.HealthChecks = append(snap.HealthChecks, h) } } return snap } // refreshAll pulls a full fresh view of the maglevd's state into the cache. // Called from the refreshLoop every 30s and immediately after a successful // reconnect. func (c *maglevClient) refreshAll(ctx context.Context) error { rctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() frontends := map[string]*FrontendSnapshot{} fl, err := c.api.ListFrontends(rctx, &grpcapi.ListFrontendsRequest{}) if err != nil { return fmt.Errorf("list frontends: %w", err) } // Sort alphabetically so the UI layout is stable across // reloads/restarts. maglevd's checker.ListFrontends already sorts // in current versions, but older builds don't — sort here too as // a belt-and-braces guarantee. frontendsOrder := append([]string(nil), fl.GetFrontendNames()...) sort.Strings(frontendsOrder) for _, name := range frontendsOrder { fi, err := c.api.GetFrontend(rctx, &grpcapi.GetFrontendRequest{Name: name}) if err != nil { return fmt.Errorf("get frontend %s: %w", name, err) } frontends[name] = frontendFromProto(fi) } backends := map[string]*BackendSnapshot{} bl, err := c.api.ListBackends(rctx, &grpcapi.ListBackendsRequest{}) if err != nil { return fmt.Errorf("list backends: %w", err) } backendsOrder := append([]string(nil), bl.GetBackendNames()...) for _, name := range backendsOrder { bi, err := c.api.GetBackend(rctx, &grpcapi.GetBackendRequest{Name: name}) if err != nil { return fmt.Errorf("get backend %s: %w", name, err) } backends[name] = backendFromProto(bi) } healthchecks := map[string]*HealthCheckSnapshot{} hl, err := c.api.ListHealthChecks(rctx, &grpcapi.ListHealthChecksRequest{}) if err != nil { return fmt.Errorf("list healthchecks: %w", err) } healthCheckOrder := append([]string(nil), hl.GetNames()...) for _, name := range healthCheckOrder { hi, err := c.api.GetHealthCheck(rctx, &grpcapi.GetHealthCheckRequest{Name: name}) if err != nil { return fmt.Errorf("get healthcheck %s: %w", name, err) } healthchecks[name] = healthCheckFromProto(hi) } var vppInfo *VPPInfoSnapshot vppState := "disconnected" if vi, err := c.api.GetVPPInfo(rctx, &grpcapi.GetVPPInfoRequest{}); err == nil { vppInfo = &VPPInfoSnapshot{ Version: vi.GetVersion(), BuildDate: vi.GetBuildDate(), PID: vi.GetPid(), BoottimeNs: vi.GetBoottimeNs(), ConnecttimeNs: vi.GetConnecttimeNs(), } vppState = "connected" } c.mu.Lock() // Frontend state comes from the FrontendEvent stream, not the // FrontendInfo proto — carry any known state from the old cache over // to the freshly-listed entries so a periodic refresh doesn't blank // the state badges until the next live transition arrives. for name, f := range frontends { if old, ok := c.cache.Frontends[name]; ok && old.State != "" { f.State = old.State } } c.cache.Frontends = frontends c.cache.FrontendsOrder = frontendsOrder c.cache.Backends = backends c.cache.BackendsOrder = backendsOrder c.cache.HealthChecks = healthchecks c.cache.HealthCheckOrder = healthCheckOrder c.cache.VPPInfo = vppInfo c.cache.VPPState = vppState c.cache.LastRefresh = time.Now() c.mu.Unlock() // Best-effort LB state pull so /view/api/state served on a fresh // page load already carries the bucket column. Errors are // swallowed by fetchLBStateAndPublish (which clears the cache and // emits an empty event so the SPA renders "—"). c.fetchLBStateAndPublish(ctx) return nil } // watchLoop subscribes to WatchEvents and feeds the broker until the context // is cancelled. Reconnects with exponential backoff on stream errors. func (c *maglevClient) watchLoop(ctx context.Context) { backoff := time.Second maxBackoff := 30 * time.Second for { if ctx.Err() != nil { return } if err := c.watchOnce(ctx); err != nil { if ctx.Err() != nil { return } slog.Warn("watch-disconnected", "maglevd", c.name, "err", err) c.setConnected(false, err.Error()) select { case <-ctx.Done(): return case <-time.After(backoff): } backoff *= 2 if backoff > maxBackoff { backoff = maxBackoff } continue } backoff = time.Second } } func (c *maglevClient) watchOnce(ctx context.Context) error { logFlag := true backendFlag := true frontendFlag := true req := &grpcapi.WatchRequest{ Log: &logFlag, LogLevel: "debug", Backend: &backendFlag, Frontend: &frontendFlag, } stream, err := c.api.WatchEvents(ctx, req) if err != nil { return fmt.Errorf("open stream: %w", err) } // Successful subscribe: mark connected and pull a fresh snapshot so // the REST cache is immediately ground-truth accurate. WatchEvents // itself replays current state as synthetic from==to events, which // will also update the cache as they arrive. c.setConnected(true, "") if err := c.refreshAll(ctx); err != nil { slog.Warn("refresh-after-watch", "maglevd", c.name, "err", err) } for { ev, err := stream.Recv() if err != nil { if errors.Is(err, io.EOF) || ctx.Err() != nil { return nil } return err } c.handleEvent(ev) } } // handleEvent applies an incoming gRPC event to the local cache and // publishes a corresponding BrowserEvent on the broker. func (c *maglevClient) handleEvent(ev *grpcapi.Event) { switch body := ev.GetEvent().(type) { case *grpcapi.Event_Log: le := body.Log if le == nil { return } attrs := make(map[string]string, len(le.GetAttrs())) for _, a := range le.GetAttrs() { attrs[a.GetKey()] = a.GetValue() } c.applyVPPLogHeartbeat(le.GetMsg()) // A config reload on maglevd can shuffle anything: add or // remove frontends, change pool membership, flip configured // weights, move backends between pools. Rather than try to // incrementally update the cache for every possible change, // refresh the whole maglevd state and tell every connected // browser to re-hydrate from the fresh snapshot. Only the // "-done" event triggers this, not "-start": a failed reload // (which never emits "-done") leaves the running config // unchanged, so no refresh is needed. if le.GetMsg() == "config-reload-done" { c.triggerConfigResync() } payload, _ := json.Marshal(LogEventPayload{ Level: le.GetLevel(), Msg: le.GetMsg(), Attrs: attrs, }) c.broker.Publish(BrowserEvent{ Maglevd: c.name, Type: "log", AtUnixNs: le.GetAtUnixNs(), Payload: payload, }) case *grpcapi.Event_Backend: be := body.Backend if be == nil || be.GetTransition() == nil { return } tr := transitionFromProto(be.GetTransition()) // maglevd replays current state on WatchEvents subscribe as a // synthetic event with from==to and at_unix_ns=0 (see // internal/grpcapi/server.go). It is not a real transition — the // in-process cache is already correct from refreshAll, so don't // touch LastTransition (which would clobber it with at=0 and // render as "55 years ago" in the browser) and don't forward to // the broker. if tr.From == tr.To { return } c.applyBackendTransition(be.GetBackendName(), tr) payload, _ := json.Marshal(BackendEventPayload{ Backend: be.GetBackendName(), Transition: *tr, }) c.broker.Publish(BrowserEvent{ Maglevd: c.name, Type: "backend", AtUnixNs: tr.AtUnixNs, Payload: payload, }) // A real transition means VPP is about to (or already did) // reshuffle bucket allocations across the affected VIP. Wake // the lb-state loop so the SPA's bucket column converges // without waiting for the 30s refresh. c.triggerLBStateFetch() case *grpcapi.Event_Frontend: fe := body.Frontend if fe == nil || fe.GetTransition() == nil { return } tr := transitionFromProto(fe.GetTransition()) // Always update the cached state — synthetic from==to events on // subscribe are how we learn the initial frontend state (there's // no equivalent field in the FrontendInfo proto). Only publish // genuine transitions to the browser so the debug panel doesn't // show 'up → up' spam on every gRPC reconnect. c.applyFrontendState(fe.GetFrontendName(), tr.To) if tr.From == tr.To { return } payload, _ := json.Marshal(FrontendEventPayload{ Frontend: fe.GetFrontendName(), Transition: *tr, }) c.broker.Publish(BrowserEvent{ Maglevd: c.name, Type: "frontend", AtUnixNs: tr.AtUnixNs, Payload: payload, }) } } // triggerConfigResync runs refreshAll off the event-dispatch goroutine // (so the stream.Recv loop isn't blocked while the full config refetch // hits several gRPC calls) and then publishes a BrowserEvent of type // "resync" so every connected browser re-fetches /view/api/state from // the now-fresh cache. Fired in response to a maglevd "config-reload- // done" log event. // // The refresh-then-publish order matters: if we published first, the // SPA would fetchState from a stale cache and display old data until // the next 30s refresh tick. Running refreshAll synchronously inside // this goroutine closes that window. // // The resync event goes through the normal broker → ring buffer path, // so a browser that reconnects shortly after the reload (within the // 30s / 2000-event replay window) still sees the resync on its first // live event and re-hydrates without needing a separate out-of-band // signal. func (c *maglevClient) triggerConfigResync() { go func() { ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() if err := c.refreshAll(ctx); err != nil { slog.Warn("config-resync-refresh", "maglevd", c.name, "err", err) // Publish anyway — the SPA's refetch will see the // cache in whatever state refreshAll left it, and // the periodic refreshLoop will retry. Better than // silently dropping the signal. } c.broker.Publish(BrowserEvent{ Maglevd: c.name, Type: "resync", AtUnixNs: time.Now().UnixNano(), Payload: json.RawMessage("{}"), }) }() } // applyFrontendState writes the given state into the cached frontend // snapshot. Called both by synthetic replay events on subscribe and by // live transitions afterwards. func (c *maglevClient) applyFrontendState(name, state string) { c.mu.Lock() defer c.mu.Unlock() f, ok := c.cache.Frontends[name] if !ok { return } f.State = state } // applyVPPLogHeartbeat flips the cache.VPPState field based on the // event's msg. vpp-connect and vpp-api-{send,recv}* are treated as // "VPP is up" signals; vpp-disconnect flips to "down". Unrelated log // events are a no-op. Called from handleEvent under the client's // event-dispatch goroutine, so contention on mu is single-writer. func (c *maglevClient) applyVPPLogHeartbeat(msg string) { var newState string switch { case msg == "vpp-connect": newState = "connected" case msg == "vpp-disconnect": newState = "disconnected" case strings.HasPrefix(msg, "vpp-api-send") || strings.HasPrefix(msg, "vpp-api-recv"): newState = "connected" default: return } c.mu.Lock() if c.cache.VPPState == newState { c.mu.Unlock() return } c.cache.VPPState = newState c.mu.Unlock() payload, _ := json.Marshal(VPPStatusPayload{State: newState}) c.broker.Publish(BrowserEvent{ Maglevd: c.name, Type: "vpp-status", AtUnixNs: time.Now().UnixNano(), Payload: payload, }) // VPP just came back: pull fresh LB state so the bucket column // repopulates immediately instead of waiting up to 30s for the // next refresh tick. On vpp-disconnect the next fetch will fail // and clear the cache, which is also the right behaviour. c.triggerLBStateFetch() } func (c *maglevClient) applyBackendTransition(name string, tr *TransitionRecord) { c.mu.Lock() defer c.mu.Unlock() b, ok := c.cache.Backends[name] if !ok { // Partial-create fallback for a transition that arrives before // the first refreshAll has seen this backend. The real fields // (address, healthcheck, pool memberships) are filled in on // the next refresh tick; here we just stamp Name so the entry // exists. b = &BackendSnapshot{Name: name} c.cache.Backends[name] = b c.cache.BackendsOrder = append(c.cache.BackendsOrder, name) } b.State = tr.To // Derive Enabled from State. In maglevd, state="disabled" and // config.enabled=false are two ways of expressing the same // condition — DisableBackend / EnableBackend flip both together, // and no other state corresponds to enabled=false. Keeping them // in sync in the reducer closes a race where the cache's cached // Enabled could lag behind state by up to a refreshLoop tick, // causing the SPA to render a bogus [disabled] tag next to an // "up" badge on a freshly-re-enabled backend. b.Enabled = tr.To != "disabled" b.LastTransition = tr b.Transitions = append(b.Transitions, tr) // Cap history to the most recent 20 entries to mirror what maglevd // returns from GetBackend. if len(b.Transitions) > 20 { b.Transitions = b.Transitions[len(b.Transitions)-20:] } } // refreshLoop pulls a fresh snapshot every 30s to catch anything the live // event stream may have missed (e.g. during a brief gRPC reconnect). func (c *maglevClient) refreshLoop(ctx context.Context) { t := time.NewTicker(30 * time.Second) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: if err := c.refreshAll(ctx); err != nil { slog.Debug("refresh-all", "maglevd", c.name, "err", err) } } } } // healthLoop issues a cheap GetVPPInfo every 5s to surface connection drops // quickly. Errors flip the connection indicator; recoveries trigger a // refreshAll so the cache catches up. func (c *maglevClient) healthLoop(ctx context.Context) { t := time.NewTicker(5 * time.Second) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: hctx, cancel := context.WithTimeout(ctx, 2*time.Second) _, err := c.api.GetVPPInfo(hctx, &grpcapi.GetVPPInfoRequest{}) cancel() if err != nil { c.setConnected(false, err.Error()) } else { c.setConnected(true, "") } } } } // ---- proto → JSON helpers -------------------------------------------------- func frontendFromProto(fi *grpcapi.FrontendInfo) *FrontendSnapshot { out := &FrontendSnapshot{ Name: fi.GetName(), Address: fi.GetAddress(), Protocol: fi.GetProtocol(), Port: fi.GetPort(), Description: fi.GetDescription(), SrcIPSticky: fi.GetSrcIpSticky(), } for _, p := range fi.GetPools() { ps := &PoolSnapshot{Name: p.GetName()} for _, pb := range p.GetBackends() { ps.Backends = append(ps.Backends, &PoolBackendSnapshot{ Name: pb.GetName(), Weight: pb.GetWeight(), EffectiveWeight: pb.GetEffectiveWeight(), }) } out.Pools = append(out.Pools, ps) } return out } func backendFromProto(bi *grpcapi.BackendInfo) *BackendSnapshot { out := &BackendSnapshot{ Name: bi.GetName(), Address: bi.GetAddress(), State: bi.GetState(), Enabled: bi.GetEnabled(), HealthCheck: bi.GetHealthcheck(), } // maglevd stores and returns transitions newest-first (it prepends // in health.Backend.transition()). The client stores them // oldest-first so applyBackendTransition can simply append new // events to the end. Reverse on read to reconcile the two // conventions — then out.Transitions[n-1] is the newest, which is // the correct LastTransition. trs := bi.GetTransitions() out.Transitions = make([]*TransitionRecord, len(trs)) for i, t := range trs { out.Transitions[len(trs)-1-i] = transitionFromProto(t) } if n := len(out.Transitions); n > 0 { out.LastTransition = out.Transitions[n-1] } return out } func transitionFromProto(t *grpcapi.TransitionRecord) *TransitionRecord { return &TransitionRecord{ From: t.GetFrom(), To: t.GetTo(), AtUnixNs: t.GetAtUnixNs(), } } // triggerLBStateFetch sends a non-blocking wake to lbStateLoop. The // channel has buffer 1 so coalesced bursts never block the publisher. func (c *maglevClient) triggerLBStateFetch() { select { case c.lbWakeCh <- struct{}{}: default: } } // lbStateLoop consumes wake signals and calls GetVPPLBState, with a // leading+trailing-edge debounce so we never exceed one fetch per // minLBInterval (1s). The leading edge means the very first wake after // an idle period fires immediately — important so a single isolated // transition isn't artificially delayed by a second. The trailing edge // means a burst of wakes during the cool-down still gets one final // fetch right after the gate opens, so the SPA always converges to a // post-burst snapshot rather than missing the last update. func (c *maglevClient) lbStateLoop(ctx context.Context) { const minLBInterval = time.Second var ( timer *time.Timer lastFetch time.Time ) timerCh := func() <-chan time.Time { if timer == nil { return nil } return timer.C } fire := func() { if timer != nil { if !timer.Stop() { select { case <-timer.C: default: } } timer = nil } c.fetchLBStateAndPublish(ctx) lastFetch = time.Now() } for { select { case <-ctx.Done(): return case <-c.lbWakeCh: wait := minLBInterval - time.Since(lastFetch) if wait <= 0 { fire() } else if timer == nil { timer = time.NewTimer(wait) } case <-timerCh(): timer = nil fire() } } } // fetchLBStateAndPublish runs one GetVPPLBState round-trip, rebuilds // the per-frontend bucket map, swaps it into the cache, and broadcasts // a "lb-state" BrowserEvent. On error the cache is cleared and an // empty event is published so the SPA can switch the bucket column to // em-dashes — clear-on-error is simpler than stale-but-visible and // doesn't risk showing a confusing snapshot from before VPP died. func (c *maglevClient) fetchLBStateAndPublish(ctx context.Context) { fctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() lbs, err := c.api.GetVPPLBState(fctx, &grpcapi.GetVPPLBStateRequest{}) if err != nil { c.mu.Lock() had := c.cache.LBState != nil c.cache.LBState = nil c.mu.Unlock() slog.Debug("lb-state-fetch", "maglevd", c.name, "err", err) if had { c.publishLBState(nil) } return } snap := c.buildLBStateSnapshot(lbs) c.mu.Lock() c.cache.LBState = snap c.mu.Unlock() c.publishLBState(snap.PerFrontend) } func (c *maglevClient) publishLBState(perFrontend map[string]map[string]int32) { payload, _ := json.Marshal(LBStatePayload{PerFrontend: perFrontend}) c.broker.Publish(BrowserEvent{ Maglevd: c.name, Type: "lb-state", AtUnixNs: time.Now().UnixNano(), Payload: payload, }) } // buildLBStateSnapshot translates a VPP-side state record (keyed by // CIDR/protocol/port and AS address) into a maglev-side record (keyed // by frontend name and backend name). Unmatched VIPs and unmatched AS // addresses are silently skipped — they're benign side effects of a // transient sync gap or a backend address that's only present in one // of the two universes. func (c *maglevClient) buildLBStateSnapshot(lbs *grpcapi.VPPLBState) *LBStateSnapshot { c.mu.RLock() feByVIP := make(map[string]string, len(c.cache.Frontends)) for _, f := range c.cache.Frontends { feByVIP[lbVIPKey(f.Address, f.Protocol, f.Port)] = f.Name } backendByAddr := make(map[string]string, len(c.cache.Backends)) for _, b := range c.cache.Backends { backendByAddr[b.Address] = b.Name } c.mu.RUnlock() out := &LBStateSnapshot{PerFrontend: map[string]map[string]int32{}} for _, v := range lbs.GetVips() { feName, ok := feByVIP[lbVIPKey(stripLBHostMask(v.GetPrefix()), lbProtoString(v.GetProtocol()), v.GetPort())] if !ok { continue } row := out.PerFrontend[feName] if row == nil { row = map[string]int32{} out.PerFrontend[feName] = row } for _, as := range v.GetApplicationServers() { bname, ok := backendByAddr[as.GetAddress()] if !ok { continue } row[bname] = int32(as.GetNumBuckets()) } } return out } // lbVIPKey is the join key between a maglev FrontendSnapshot and a // VPP-side VPPLBVIP record. Stripping the mask and lower-casing the // protocol gives a canonical form that both sides can produce. func lbVIPKey(addr, proto string, port uint32) string { return fmt.Sprintf("%s/%s/%d", addr, strings.ToLower(proto), port) } // lbProtoString mirrors maglevc's protoString — kept local to avoid a // cross-package import for two trivial helpers. func lbProtoString(p uint32) string { switch p { case 6: return "tcp" case 17: return "udp" case 255: return "any" } return fmt.Sprintf("%d", p) } // stripLBHostMask trims "/32" or "/128" from a VPP host-prefix VIP so // it can be compared against a maglev FrontendSnapshot.Address (which // is bare). Other shapes are returned unchanged. func stripLBHostMask(prefix string) string { if strings.HasSuffix(prefix, "/32") || strings.HasSuffix(prefix, "/128") { return prefix[:strings.LastIndexByte(prefix, '/')] } return prefix } func healthCheckFromProto(h *grpcapi.HealthCheckInfo) *HealthCheckSnapshot { return &HealthCheckSnapshot{ Name: h.GetName(), Type: h.GetType(), Port: h.GetPort(), IntervalNs: h.GetIntervalNs(), FastIntervalNs: h.GetFastIntervalNs(), DownIntervalNs: h.GetDownIntervalNs(), TimeoutNs: h.GetTimeoutNs(), Rise: h.GetRise(), Fall: h.GetFall(), } }