Files
vpp-maglev/cmd/frontend/client.go
Pim van Pelt 4347bb9b05 Bug fixes, config validation, SPA tightening, set-weight UI
This session covers three distinct arcs: correctness bug fixes in the
VPP sync path and frontend reducers, new config validation, and a
large polish pass on the web frontend (tighter layout, backend kebab
dialogs, live grouped-table, live config-reload re-sync).

 - encap for a VIP is now derived from the backend address family,
   not the VIP's. A v6 VIP with v4 backends is programmed as IP6_GRE4
   (not the buggy IP6_GRE6), matching the VPP LB plugin's
   requirement that encap reflects the tunnel inner family. desiredVIP
   gained an Encap field populated in desiredFromFrontend.
 - ActivePoolIndex now requires at least one backend in a pool to be
   BOTH in StateUp AND pb.Weight>0 before the pool counts as active.
   Previously a primary pool with every backend manually zeroed would
   still win over a fallback with weight=100, so fallback traffic
   never materialized. New TestActivePoolIndexWeightedFailover table
   pins the rule in five subcases.
 - SyncLBStateVIP gained a flushAddress parameter threaded through
   reconcileVIP; it forces flush=true on the setASWeight call for a
   specific backend regardless of the usual 0→N heuristic. Wires up
   the explicit [flush] knob the CLI exposes.

 - convertFrontend already enforced that backends within one frontend
   share a family. New cross-frontend pass validateVIPFamilyConsistency
   rejects configs where two frontends share a VIP address but carry
   backends in different families — VPP's LB plugin requires every
   VIP on a prefix to have the same encap type, so such a config
   would fail at lb_add_del_vip_v2 time with VNET_API_ERROR_INVALID
   _ARGUMENT (-73). Catching it at config load turns a silent
   runtime failure into a clear startup error.
 - Two new TestValidationErrors cases pin the behavior: mismatched
   families reject, same-family frontends on one VIP address allowed.

 - Proto adds `bool flush = 5` to SetWeightRequest. The RPC now
   drives a VIP sync immediately after mutating config (fixing the
   latent "weight change only takes effect at the next 30s periodic
   reconcile" gap), passing flushAddress = backend IP when req.Flush
   is true.
 - maglevc grows an optional [flush] token: `set frontend F pool P
   backend B weight N [flush]`. Implementation uses two Run closures
   (runSetFrontendPoolBackendWeight and -Flush) because the tree
   walker only puts slot tokens in args — literal keywords like
   `flush` advance the node but don't appear in the arg list.
 - docs/user-guide.md updated with the [flush] optional and a
   three-paragraph explainer of the graceful-drain vs. flush
   semantics at the VPP level.

 - checker.ListFrontends now sorts alphabetically to match the
   existing sort in ListBackends / ListHealthChecks — RPC responses
   no longer shuffle VIPs per call. cmd/frontend/client.go also
   sorts defensively in refreshAll so an old maglevd build renders
   alphabetically too.
 - backendFromProto was returning out.Transitions[n-1] as the
   LastTransition, but maglevd stores (and the proto carries)
   transitions newest-first, so [n-1] was actually the oldest.
   Reverse on read, which normalizes the client's Transitions slice
   to oldest-first and makes [n-1] genuinely the newest. LastTransition
   now points at the actual latest transition record.
 - applyBackendTransition (Go and TS) derives Enabled = state!="disabled"
   so the two fields stay in lockstep — closed a drift window where
   a recently re-enabled backend still rendered with a stuck
   [disabled] tag. The tag was later removed entirely since state
   and enabled carry the same information.

 - Layout tightened substantially: "FRONTENDS" panel header removed,
   zippy-summary and zippy-body paddings cut, backend-table row
   padding dropped to 2px, per-pool <h3> removed. Pools now live in
   a single consolidated table per frontend with a dedicated "pool"
   column that shows the pool name only on the first row of each
   group — classic grouped-table layout, maximally dense.
 - Description moved inline into the Zippy summary as muted italic
   text, freeing a vertical line per frontend card.
 - formatVIPAddress() helper renders IPv6 VIPs as [addr]:port and
   IPv4 as addr:port, matching RFC 3986 authority syntax.
 - Pools with effective_weight=0 on every backend (standby
   fallbacks, fully-drained primaries) render at opacity 0.35 on
   their non-actions cells; the kebab column stays at full contrast
   because its menu is still fully functional on standby backends.
 - Config-reload propagation: a maglevd config-reload-done log
   event triggers triggerConfigResync() on the frontend side —
   refreshAll() runs off the event-dispatch goroutine, then a
   BrowserEvent{Type:"resync"} is published through the broker.
   writeEvent emits type="resync" as a named SSE frame so the
   SPA's existing addEventListener("resync") handler picks it up
   and calls fetchAllState → replaceAll.
 - recomputeEffectiveWeights in stores/state.ts mirrors the
   server-side health.EffectiveWeights logic so the SPA keeps
   pool.effective_weight correct the moment a backend transitions,
   without waiting for the 30s refresh. Fixed a nasty bug where
   applyBackendEffectiveWeight wrote VIP-scoped vpp-lb-sync-as-*
   event weights into every frontend sharing the backend,
   corrupting frontends with different per-pool configured weights.
   The old log-event reducer was removed; applyConfiguredWeight is
   the narrower replacement used by the kebab set-weight flow.
 - applyBackendTransition calls recomputeEffectiveWeights after
   state updates so pool-failover transitions (primary ⇌ fallback)
   reflect instantly in the UI.

 - Confirmation dialogs via a new Modal primitive
   (Portal-mounted to document.body, escape/click-outside close,
   click-outside debounced on mousedown so mid-row-text-selection
   drags don't dismiss).
 - pause/resume/enable/disable each show a Modal with a consequence
   paragraph explaining what hits live traffic ("will keep existing
   flows", "will flush VPP's flow table", etc.). The disable commit
   button is styled btn-danger red.
 - set-weight action shows a Modal with a range slider (0-100,
   seeded from the current configured weight, accent-colored live
   numeric readout via <output>) plus a flush checkbox and a live-
   swapping note/warn paragraph describing what will happen. On
   commit, the SPA also updates its local store via
   applyConfiguredWeight so the operator sees the new weight
   immediately without waiting for the next refresh.

 - ProbeHeartbeat is now state-aware: ▶ (play) at rest for up/
   down/unknown backends, ⏸ (pause) for paused, ⏹ (stop) for
   disabled/removed, ❤️ (heart) during an in-flight probe.
 - Drop the probe-done event listener — fast probes (<10ms)
   could fire probe-done in the same render tick as probe-start
   and the heart would never visibly paint. Each probe-start now
   runs a fixed 400ms scale-pop animation on a timer; subsequent
   probe-start events reset the timer, so fast cadences produce a
   continuous heart pulse.
 - Fixed wrapper box (16x14 px, overflow hidden) so the row
   doesn't jiggle when the glyph swaps between the narrow ▶/⏸/⏹
   text glyphs and the wider ❤️ emoji.

 - Brand wordmark changed from "maglev" to "vpp-maglev" and wrapped
   in an <a> linking to https://git.ipng.ch/ipng/vpp-maglev. Logo
   link changed to https://ipng.ch/. Both open in a new tab with
   rel="noopener".
 - .gitignore fix: `frontend`, `maglevc`, `maglevd` were matching
   ANY file or directory with those names anywhere in the tree,
   silently ignoring cmd/frontend and friends. Anchored with
   leading slashes so only repo-root build artifacts match.
2026-04-12 23:06:42 +02:00

693 lines
21 KiB
Go

// Copyright (c) 2026, Pim van Pelt <pim@ipng.ch>
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net"
"sort"
"strings"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"git.ipng.ch/ipng/vpp-maglev/internal/grpcapi"
)
// maglevClient is a per-maglevd gRPC client plus cache and background loops.
type maglevClient struct {
name string
address string
conn *grpc.ClientConn
api grpcapi.MaglevClient
broker *Broker
mu sync.RWMutex
connected bool
lastErr string
cache cachedState
}
// cachedState is the per-maglevd snapshot served via the REST handlers.
// Frontends / Backends / HealthChecks are maps for O(1) lookup from the
// event path, and the *Order slices preserve the order returned by the
// corresponding List* RPC so the UI renders in a stable order across
// reloads instead of Go map iteration's randomised order.
type cachedState struct {
Frontends map[string]*FrontendSnapshot
FrontendsOrder []string
Backends map[string]*BackendSnapshot
BackendsOrder []string
HealthChecks map[string]*HealthCheckSnapshot
HealthCheckOrder []string
VPPInfo *VPPInfoSnapshot
VPPState string // "", "connected", "disconnected"
LastRefresh time.Time
}
func newMaglevClient(address string, broker *Broker) (*maglevClient, error) {
conn, err := grpc.NewClient(address,
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
return &maglevClient{
name: hostnameOf(address),
address: address,
conn: conn,
api: grpcapi.NewMaglevClient(conn),
broker: broker,
cache: cachedState{
Frontends: map[string]*FrontendSnapshot{},
Backends: map[string]*BackendSnapshot{},
HealthChecks: map[string]*HealthCheckSnapshot{},
},
}, nil
}
// hostnameOf strips the port from an address and returns a short display
// name. For DNS names we take the first label ("lb-ams.internal:9090" →
// "lb-ams"). For IP literals we return the full address so we don't
// accidentally truncate "127.0.0.1" to "127".
func hostnameOf(address string) string {
host := address
if h, _, err := net.SplitHostPort(address); err == nil {
host = h
}
host = strings.TrimPrefix(strings.TrimSuffix(host, "]"), "[")
if net.ParseIP(host) != nil {
return host
}
if i := strings.Index(host, "."); i >= 0 {
return host[:i]
}
return host
}
func (c *maglevClient) Close() {
_ = c.conn.Close()
}
// BackendAction runs one of the four lifecycle operations on a backend.
// Valid actions are "pause", "resume", "enable", and "disable". The
// fresh backend snapshot returned by maglevd is converted and sent
// back to the caller so the admin API handler can reply with the
// post-mutation state in a single round-trip. The broadcast
// WatchEvents stream will also deliver a transition event which the
// local cache and every connected browser apply through the normal
// reducer path — so the UI converges even if the HTTP response is
// slow or dropped in flight.
func (c *maglevClient) BackendAction(ctx context.Context, name, action string) (*BackendSnapshot, error) {
req := &grpcapi.BackendRequest{Name: name}
var bi *grpcapi.BackendInfo
var err error
switch action {
case "pause":
bi, err = c.api.PauseBackend(ctx, req)
case "resume":
bi, err = c.api.ResumeBackend(ctx, req)
case "enable":
bi, err = c.api.EnableBackend(ctx, req)
case "disable":
bi, err = c.api.DisableBackend(ctx, req)
default:
return nil, fmt.Errorf("unknown action %q", action)
}
if err != nil {
return nil, err
}
return backendFromProto(bi), nil
}
// SetBackendWeight runs the SetFrontendPoolBackendWeight gRPC call. A
// fresh FrontendSnapshot is returned so admin callers get the
// post-mutation effective weights in one round-trip.
func (c *maglevClient) SetBackendWeight(ctx context.Context, frontend, pool, backend string, weight int32, flush bool) (*FrontendSnapshot, error) {
fi, err := c.api.SetFrontendPoolBackendWeight(ctx, &grpcapi.SetWeightRequest{
Frontend: frontend,
Pool: pool,
Backend: backend,
Weight: weight,
Flush: flush,
})
if err != nil {
return nil, err
}
return frontendFromProto(fi), nil
}
func (c *maglevClient) Start(ctx context.Context) {
go c.watchLoop(ctx)
go c.refreshLoop(ctx)
go c.healthLoop(ctx)
}
func (c *maglevClient) setConnected(ok bool, errMsg string) {
c.mu.Lock()
prev := c.connected
c.connected = ok
c.lastErr = errMsg
c.mu.Unlock()
if prev != ok {
payload, _ := json.Marshal(MaglevdStatusPayload{Connected: ok, LastError: errMsg})
c.broker.Publish(BrowserEvent{
Maglevd: c.name,
Type: "maglevd-status",
AtUnixNs: time.Now().UnixNano(),
Payload: payload,
})
}
}
// Info returns the current connection status for this maglevd.
func (c *maglevClient) Info() MaglevdInfo {
c.mu.RLock()
defer c.mu.RUnlock()
return MaglevdInfo{
Name: c.name,
Address: c.address,
Connected: c.connected,
LastError: c.lastErr,
}
}
// Snapshot returns a deep-ish copy of the cached state for REST handlers.
// Iteration order follows the corresponding *Order slice so the UI sees a
// stable, RPC-defined order across reloads.
func (c *maglevClient) Snapshot() *StateSnapshot {
c.mu.RLock()
defer c.mu.RUnlock()
snap := &StateSnapshot{
Maglevd: MaglevdInfo{
Name: c.name,
Address: c.address,
Connected: c.connected,
LastError: c.lastErr,
},
Frontends: make([]*FrontendSnapshot, 0, len(c.cache.FrontendsOrder)),
Backends: make([]*BackendSnapshot, 0, len(c.cache.BackendsOrder)),
HealthChecks: make([]*HealthCheckSnapshot, 0, len(c.cache.HealthCheckOrder)),
VPPInfo: c.cache.VPPInfo,
VPPState: c.cache.VPPState,
}
for _, name := range c.cache.FrontendsOrder {
if f, ok := c.cache.Frontends[name]; ok {
snap.Frontends = append(snap.Frontends, f)
}
}
for _, name := range c.cache.BackendsOrder {
if b, ok := c.cache.Backends[name]; ok {
snap.Backends = append(snap.Backends, b)
}
}
for _, name := range c.cache.HealthCheckOrder {
if h, ok := c.cache.HealthChecks[name]; ok {
snap.HealthChecks = append(snap.HealthChecks, h)
}
}
return snap
}
// refreshAll pulls a full fresh view of the maglevd's state into the cache.
// Called from the refreshLoop every 30s and immediately after a successful
// reconnect.
func (c *maglevClient) refreshAll(ctx context.Context) error {
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
frontends := map[string]*FrontendSnapshot{}
fl, err := c.api.ListFrontends(rctx, &grpcapi.ListFrontendsRequest{})
if err != nil {
return fmt.Errorf("list frontends: %w", err)
}
// Sort alphabetically so the UI layout is stable across
// reloads/restarts. maglevd's checker.ListFrontends already sorts
// in current versions, but older builds don't — sort here too as
// a belt-and-braces guarantee.
frontendsOrder := append([]string(nil), fl.GetFrontendNames()...)
sort.Strings(frontendsOrder)
for _, name := range frontendsOrder {
fi, err := c.api.GetFrontend(rctx, &grpcapi.GetFrontendRequest{Name: name})
if err != nil {
return fmt.Errorf("get frontend %s: %w", name, err)
}
frontends[name] = frontendFromProto(fi)
}
backends := map[string]*BackendSnapshot{}
bl, err := c.api.ListBackends(rctx, &grpcapi.ListBackendsRequest{})
if err != nil {
return fmt.Errorf("list backends: %w", err)
}
backendsOrder := append([]string(nil), bl.GetBackendNames()...)
for _, name := range backendsOrder {
bi, err := c.api.GetBackend(rctx, &grpcapi.GetBackendRequest{Name: name})
if err != nil {
return fmt.Errorf("get backend %s: %w", name, err)
}
backends[name] = backendFromProto(bi)
}
healthchecks := map[string]*HealthCheckSnapshot{}
hl, err := c.api.ListHealthChecks(rctx, &grpcapi.ListHealthChecksRequest{})
if err != nil {
return fmt.Errorf("list healthchecks: %w", err)
}
healthCheckOrder := append([]string(nil), hl.GetNames()...)
for _, name := range healthCheckOrder {
hi, err := c.api.GetHealthCheck(rctx, &grpcapi.GetHealthCheckRequest{Name: name})
if err != nil {
return fmt.Errorf("get healthcheck %s: %w", name, err)
}
healthchecks[name] = healthCheckFromProto(hi)
}
var vppInfo *VPPInfoSnapshot
vppState := "disconnected"
if vi, err := c.api.GetVPPInfo(rctx, &grpcapi.GetVPPInfoRequest{}); err == nil {
vppInfo = &VPPInfoSnapshot{
Version: vi.GetVersion(),
BuildDate: vi.GetBuildDate(),
PID: vi.GetPid(),
BoottimeNs: vi.GetBoottimeNs(),
ConnecttimeNs: vi.GetConnecttimeNs(),
}
vppState = "connected"
}
c.mu.Lock()
// Frontend state comes from the FrontendEvent stream, not the
// FrontendInfo proto — carry any known state from the old cache over
// to the freshly-listed entries so a periodic refresh doesn't blank
// the state badges until the next live transition arrives.
for name, f := range frontends {
if old, ok := c.cache.Frontends[name]; ok && old.State != "" {
f.State = old.State
}
}
c.cache.Frontends = frontends
c.cache.FrontendsOrder = frontendsOrder
c.cache.Backends = backends
c.cache.BackendsOrder = backendsOrder
c.cache.HealthChecks = healthchecks
c.cache.HealthCheckOrder = healthCheckOrder
c.cache.VPPInfo = vppInfo
c.cache.VPPState = vppState
c.cache.LastRefresh = time.Now()
c.mu.Unlock()
return nil
}
// watchLoop subscribes to WatchEvents and feeds the broker until the context
// is cancelled. Reconnects with exponential backoff on stream errors.
func (c *maglevClient) watchLoop(ctx context.Context) {
backoff := time.Second
maxBackoff := 30 * time.Second
for {
if ctx.Err() != nil {
return
}
if err := c.watchOnce(ctx); err != nil {
if ctx.Err() != nil {
return
}
slog.Warn("watch-disconnected", "maglevd", c.name, "err", err)
c.setConnected(false, err.Error())
select {
case <-ctx.Done():
return
case <-time.After(backoff):
}
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
continue
}
backoff = time.Second
}
}
func (c *maglevClient) watchOnce(ctx context.Context) error {
logFlag := true
backendFlag := true
frontendFlag := true
req := &grpcapi.WatchRequest{
Log: &logFlag,
LogLevel: "debug",
Backend: &backendFlag,
Frontend: &frontendFlag,
}
stream, err := c.api.WatchEvents(ctx, req)
if err != nil {
return fmt.Errorf("open stream: %w", err)
}
// Successful subscribe: mark connected and pull a fresh snapshot so
// the REST cache is immediately ground-truth accurate. WatchEvents
// itself replays current state as synthetic from==to events, which
// will also update the cache as they arrive.
c.setConnected(true, "")
if err := c.refreshAll(ctx); err != nil {
slog.Warn("refresh-after-watch", "maglevd", c.name, "err", err)
}
for {
ev, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) || ctx.Err() != nil {
return nil
}
return err
}
c.handleEvent(ev)
}
}
// handleEvent applies an incoming gRPC event to the local cache and
// publishes a corresponding BrowserEvent on the broker.
func (c *maglevClient) handleEvent(ev *grpcapi.Event) {
switch body := ev.GetEvent().(type) {
case *grpcapi.Event_Log:
le := body.Log
if le == nil {
return
}
attrs := make(map[string]string, len(le.GetAttrs()))
for _, a := range le.GetAttrs() {
attrs[a.GetKey()] = a.GetValue()
}
c.applyVPPLogHeartbeat(le.GetMsg())
// A config reload on maglevd can shuffle anything: add or
// remove frontends, change pool membership, flip configured
// weights, move backends between pools. Rather than try to
// incrementally update the cache for every possible change,
// refresh the whole maglevd state and tell every connected
// browser to re-hydrate from the fresh snapshot. Only the
// "-done" event triggers this, not "-start": a failed reload
// (which never emits "-done") leaves the running config
// unchanged, so no refresh is needed.
if le.GetMsg() == "config-reload-done" {
c.triggerConfigResync()
}
payload, _ := json.Marshal(LogEventPayload{
Level: le.GetLevel(),
Msg: le.GetMsg(),
Attrs: attrs,
})
c.broker.Publish(BrowserEvent{
Maglevd: c.name,
Type: "log",
AtUnixNs: le.GetAtUnixNs(),
Payload: payload,
})
case *grpcapi.Event_Backend:
be := body.Backend
if be == nil || be.GetTransition() == nil {
return
}
tr := transitionFromProto(be.GetTransition())
// maglevd replays current state on WatchEvents subscribe as a
// synthetic event with from==to and at_unix_ns=0 (see
// internal/grpcapi/server.go). It is not a real transition — the
// in-process cache is already correct from refreshAll, so don't
// touch LastTransition (which would clobber it with at=0 and
// render as "55 years ago" in the browser) and don't forward to
// the broker.
if tr.From == tr.To {
return
}
c.applyBackendTransition(be.GetBackendName(), tr)
payload, _ := json.Marshal(BackendEventPayload{
Backend: be.GetBackendName(),
Transition: *tr,
})
c.broker.Publish(BrowserEvent{
Maglevd: c.name,
Type: "backend",
AtUnixNs: tr.AtUnixNs,
Payload: payload,
})
case *grpcapi.Event_Frontend:
fe := body.Frontend
if fe == nil || fe.GetTransition() == nil {
return
}
tr := transitionFromProto(fe.GetTransition())
// Always update the cached state — synthetic from==to events on
// subscribe are how we learn the initial frontend state (there's
// no equivalent field in the FrontendInfo proto). Only publish
// genuine transitions to the browser so the debug panel doesn't
// show 'up → up' spam on every gRPC reconnect.
c.applyFrontendState(fe.GetFrontendName(), tr.To)
if tr.From == tr.To {
return
}
payload, _ := json.Marshal(FrontendEventPayload{
Frontend: fe.GetFrontendName(),
Transition: *tr,
})
c.broker.Publish(BrowserEvent{
Maglevd: c.name,
Type: "frontend",
AtUnixNs: tr.AtUnixNs,
Payload: payload,
})
}
}
// triggerConfigResync runs refreshAll off the event-dispatch goroutine
// (so the stream.Recv loop isn't blocked while the full config refetch
// hits several gRPC calls) and then publishes a BrowserEvent of type
// "resync" so every connected browser re-fetches /view/api/state from
// the now-fresh cache. Fired in response to a maglevd "config-reload-
// done" log event.
//
// The refresh-then-publish order matters: if we published first, the
// SPA would fetchState from a stale cache and display old data until
// the next 30s refresh tick. Running refreshAll synchronously inside
// this goroutine closes that window.
//
// The resync event goes through the normal broker → ring buffer path,
// so a browser that reconnects shortly after the reload (within the
// 30s / 2000-event replay window) still sees the resync on its first
// live event and re-hydrates without needing a separate out-of-band
// signal.
func (c *maglevClient) triggerConfigResync() {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := c.refreshAll(ctx); err != nil {
slog.Warn("config-resync-refresh", "maglevd", c.name, "err", err)
// Publish anyway — the SPA's refetch will see the
// cache in whatever state refreshAll left it, and
// the periodic refreshLoop will retry. Better than
// silently dropping the signal.
}
c.broker.Publish(BrowserEvent{
Maglevd: c.name,
Type: "resync",
AtUnixNs: time.Now().UnixNano(),
Payload: json.RawMessage("{}"),
})
}()
}
// applyFrontendState writes the given state into the cached frontend
// snapshot. Called both by synthetic replay events on subscribe and by
// live transitions afterwards.
func (c *maglevClient) applyFrontendState(name, state string) {
c.mu.Lock()
defer c.mu.Unlock()
f, ok := c.cache.Frontends[name]
if !ok {
return
}
f.State = state
}
// applyVPPLogHeartbeat flips the cache.VPPState field based on the
// event's msg. vpp-connect and vpp-api-{send,recv}* are treated as
// "VPP is up" signals; vpp-disconnect flips to "down". Unrelated log
// events are a no-op. Called from handleEvent under the client's
// event-dispatch goroutine, so contention on mu is single-writer.
func (c *maglevClient) applyVPPLogHeartbeat(msg string) {
var newState string
switch {
case msg == "vpp-connect":
newState = "connected"
case msg == "vpp-disconnect":
newState = "disconnected"
case strings.HasPrefix(msg, "vpp-api-send") || strings.HasPrefix(msg, "vpp-api-recv"):
newState = "connected"
default:
return
}
c.mu.Lock()
if c.cache.VPPState == newState {
c.mu.Unlock()
return
}
c.cache.VPPState = newState
c.mu.Unlock()
payload, _ := json.Marshal(VPPStatusPayload{State: newState})
c.broker.Publish(BrowserEvent{
Maglevd: c.name,
Type: "vpp-status",
AtUnixNs: time.Now().UnixNano(),
Payload: payload,
})
}
func (c *maglevClient) applyBackendTransition(name string, tr *TransitionRecord) {
c.mu.Lock()
defer c.mu.Unlock()
b, ok := c.cache.Backends[name]
if !ok {
// Partial-create fallback for a transition that arrives before
// the first refreshAll has seen this backend. The real fields
// (address, healthcheck, pool memberships) are filled in on
// the next refresh tick; here we just stamp Name so the entry
// exists.
b = &BackendSnapshot{Name: name}
c.cache.Backends[name] = b
c.cache.BackendsOrder = append(c.cache.BackendsOrder, name)
}
b.State = tr.To
// Derive Enabled from State. In maglevd, state="disabled" and
// config.enabled=false are two ways of expressing the same
// condition — DisableBackend / EnableBackend flip both together,
// and no other state corresponds to enabled=false. Keeping them
// in sync in the reducer closes a race where the cache's cached
// Enabled could lag behind state by up to a refreshLoop tick,
// causing the SPA to render a bogus [disabled] tag next to an
// "up" badge on a freshly-re-enabled backend.
b.Enabled = tr.To != "disabled"
b.LastTransition = tr
b.Transitions = append(b.Transitions, tr)
// Cap history to the most recent 20 entries to mirror what maglevd
// returns from GetBackend.
if len(b.Transitions) > 20 {
b.Transitions = b.Transitions[len(b.Transitions)-20:]
}
}
// refreshLoop pulls a fresh snapshot every 30s to catch anything the live
// event stream may have missed (e.g. during a brief gRPC reconnect).
func (c *maglevClient) refreshLoop(ctx context.Context) {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
if err := c.refreshAll(ctx); err != nil {
slog.Debug("refresh-all", "maglevd", c.name, "err", err)
}
}
}
}
// healthLoop issues a cheap GetVPPInfo every 5s to surface connection drops
// quickly. Errors flip the connection indicator; recoveries trigger a
// refreshAll so the cache catches up.
func (c *maglevClient) healthLoop(ctx context.Context) {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
hctx, cancel := context.WithTimeout(ctx, 2*time.Second)
_, err := c.api.GetVPPInfo(hctx, &grpcapi.GetVPPInfoRequest{})
cancel()
if err != nil {
c.setConnected(false, err.Error())
} else {
c.setConnected(true, "")
}
}
}
}
// ---- proto → JSON helpers --------------------------------------------------
func frontendFromProto(fi *grpcapi.FrontendInfo) *FrontendSnapshot {
out := &FrontendSnapshot{
Name: fi.GetName(),
Address: fi.GetAddress(),
Protocol: fi.GetProtocol(),
Port: fi.GetPort(),
Description: fi.GetDescription(),
SrcIPSticky: fi.GetSrcIpSticky(),
}
for _, p := range fi.GetPools() {
ps := &PoolSnapshot{Name: p.GetName()}
for _, pb := range p.GetBackends() {
ps.Backends = append(ps.Backends, &PoolBackendSnapshot{
Name: pb.GetName(),
Weight: pb.GetWeight(),
EffectiveWeight: pb.GetEffectiveWeight(),
})
}
out.Pools = append(out.Pools, ps)
}
return out
}
func backendFromProto(bi *grpcapi.BackendInfo) *BackendSnapshot {
out := &BackendSnapshot{
Name: bi.GetName(),
Address: bi.GetAddress(),
State: bi.GetState(),
Enabled: bi.GetEnabled(),
HealthCheck: bi.GetHealthcheck(),
}
// maglevd stores and returns transitions newest-first (it prepends
// in health.Backend.transition()). The client stores them
// oldest-first so applyBackendTransition can simply append new
// events to the end. Reverse on read to reconcile the two
// conventions — then out.Transitions[n-1] is the newest, which is
// the correct LastTransition.
trs := bi.GetTransitions()
out.Transitions = make([]*TransitionRecord, len(trs))
for i, t := range trs {
out.Transitions[len(trs)-1-i] = transitionFromProto(t)
}
if n := len(out.Transitions); n > 0 {
out.LastTransition = out.Transitions[n-1]
}
return out
}
func transitionFromProto(t *grpcapi.TransitionRecord) *TransitionRecord {
return &TransitionRecord{
From: t.GetFrom(),
To: t.GetTo(),
AtUnixNs: t.GetAtUnixNs(),
}
}
func healthCheckFromProto(h *grpcapi.HealthCheckInfo) *HealthCheckSnapshot {
return &HealthCheckSnapshot{
Name: h.GetName(),
Type: h.GetType(),
Port: h.GetPort(),
IntervalNs: h.GetIntervalNs(),
FastIntervalNs: h.GetFastIntervalNs(),
DownIntervalNs: h.GetDownIntervalNs(),
TimeoutNs: h.GetTimeoutNs(),
Rise: h.GetRise(),
Fall: h.GetFall(),
}
}