SPA (cmd/frontend/web): - New "lb buckets" column backed by a 1s-debounced GetVPPLBState fetch loop with leading+trailing edge coalesce. - Per-frontend health icon (✅/⚠️/❗/‼️/❓) in the Zippy header, gated by a settling flag that suppresses ‼️ until the next lb-state reconciliation after a backend transition or weight change. - In-place leaf merge on lb-state so stable bucket values (e.g. "0") don't retrigger the Flash animation on every refresh. - Zippy cards remember open state in a cookie, default closed on fresh load; fixed-width frontend-title-name + reserved icon slot so headers line up across all cards. - Clock-drift watchdog in sse.ts that forces a fresh EventSource on laptop-wake so the broker emits a resync instead of hanging on a dead half-open socket. Frontend service (cmd/frontend): - maglevClient.lbStateLoop, trigger on backend transitions + vpp-connect, best-effort fetch on refreshAll. - Admin handlers explicitly wake the lb-state loop after lifecycle ops and set-weight (the latter emits no transition event on the maglevd side, so the WatchEvents path wouldn't have caught it). - /favicon.ico served from embedded web/public IPng logo. VPP integration: - internal/vpp/lbstate.go: dumpASesForVIP drops Pfx from the dump request (setting it silently wipes IPv4 replies in the LB plugin) and filters results by prefix on the response side instead, which also demuxes multi-VIP-on-same-port cases correctly. maglevc: - Walk now returns the unconsumed token tail; dispatch and the question listener reject unknown commands with a targeted error instead of dumping the full command tree prefixed with garbage. - On '?', echo the current line (including the '?') before the help list so the output reads like birdc. Checker / prober: - internal/checker: ±10% jitter on NextInterval so probes across restart don't all fire on the same tick. - internal/prober: HTTP User-Agent now carries the build version and project URL. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
887 lines
27 KiB
Go
887 lines
27 KiB
Go
// Copyright (c) 2026, Pim van Pelt <pim@ipng.ch>
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
|
|
"git.ipng.ch/ipng/vpp-maglev/internal/grpcapi"
|
|
)
|
|
|
|
// maglevClient is a per-maglevd gRPC client plus cache and background loops.
|
|
type maglevClient struct {
|
|
name string
|
|
address string
|
|
conn *grpc.ClientConn
|
|
api grpcapi.MaglevClient
|
|
broker *Broker
|
|
|
|
mu sync.RWMutex
|
|
connected bool
|
|
lastErr string
|
|
cache cachedState
|
|
|
|
// lbWakeCh is a buffer-1 trigger channel feeding lbStateLoop. Every
|
|
// backend transition (and a few other events) does a non-blocking send
|
|
// here; the loop coalesces bursts into at most one GetVPPLBState call
|
|
// per second. See lbStateLoop for the leading+trailing-edge debounce.
|
|
lbWakeCh chan struct{}
|
|
}
|
|
|
|
// cachedState is the per-maglevd snapshot served via the REST handlers.
|
|
// Frontends / Backends / HealthChecks are maps for O(1) lookup from the
|
|
// event path, and the *Order slices preserve the order returned by the
|
|
// corresponding List* RPC so the UI renders in a stable order across
|
|
// reloads instead of Go map iteration's randomised order.
|
|
type cachedState struct {
|
|
Frontends map[string]*FrontendSnapshot
|
|
FrontendsOrder []string
|
|
Backends map[string]*BackendSnapshot
|
|
BackendsOrder []string
|
|
HealthChecks map[string]*HealthCheckSnapshot
|
|
HealthCheckOrder []string
|
|
VPPInfo *VPPInfoSnapshot
|
|
VPPState string // "", "connected", "disconnected"
|
|
LBState *LBStateSnapshot
|
|
LastRefresh time.Time
|
|
}
|
|
|
|
func newMaglevClient(address string, broker *Broker) (*maglevClient, error) {
|
|
conn, err := grpc.NewClient(address,
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &maglevClient{
|
|
name: hostnameOf(address),
|
|
address: address,
|
|
conn: conn,
|
|
api: grpcapi.NewMaglevClient(conn),
|
|
broker: broker,
|
|
cache: cachedState{
|
|
Frontends: map[string]*FrontendSnapshot{},
|
|
Backends: map[string]*BackendSnapshot{},
|
|
HealthChecks: map[string]*HealthCheckSnapshot{},
|
|
},
|
|
lbWakeCh: make(chan struct{}, 1),
|
|
}, nil
|
|
}
|
|
|
|
// hostnameOf strips the port from an address and returns a short display
|
|
// name. For DNS names we take the first label ("lb-ams.internal:9090" →
|
|
// "lb-ams"). For IP literals we return the full address so we don't
|
|
// accidentally truncate "127.0.0.1" to "127".
|
|
func hostnameOf(address string) string {
|
|
host := address
|
|
if h, _, err := net.SplitHostPort(address); err == nil {
|
|
host = h
|
|
}
|
|
host = strings.TrimPrefix(strings.TrimSuffix(host, "]"), "[")
|
|
if net.ParseIP(host) != nil {
|
|
return host
|
|
}
|
|
if i := strings.Index(host, "."); i >= 0 {
|
|
return host[:i]
|
|
}
|
|
return host
|
|
}
|
|
|
|
func (c *maglevClient) Close() {
|
|
_ = c.conn.Close()
|
|
}
|
|
|
|
// BackendAction runs one of the four lifecycle operations on a backend.
|
|
// Valid actions are "pause", "resume", "enable", and "disable". The
|
|
// fresh backend snapshot returned by maglevd is converted and sent
|
|
// back to the caller so the admin API handler can reply with the
|
|
// post-mutation state in a single round-trip. The broadcast
|
|
// WatchEvents stream will also deliver a transition event which the
|
|
// local cache and every connected browser apply through the normal
|
|
// reducer path — so the UI converges even if the HTTP response is
|
|
// slow or dropped in flight.
|
|
func (c *maglevClient) BackendAction(ctx context.Context, name, action string) (*BackendSnapshot, error) {
|
|
req := &grpcapi.BackendRequest{Name: name}
|
|
var bi *grpcapi.BackendInfo
|
|
var err error
|
|
switch action {
|
|
case "pause":
|
|
bi, err = c.api.PauseBackend(ctx, req)
|
|
case "resume":
|
|
bi, err = c.api.ResumeBackend(ctx, req)
|
|
case "enable":
|
|
bi, err = c.api.EnableBackend(ctx, req)
|
|
case "disable":
|
|
bi, err = c.api.DisableBackend(ctx, req)
|
|
default:
|
|
return nil, fmt.Errorf("unknown action %q", action)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return backendFromProto(bi), nil
|
|
}
|
|
|
|
// SetBackendWeight runs the SetFrontendPoolBackendWeight gRPC call. A
|
|
// fresh FrontendSnapshot is returned so admin callers get the
|
|
// post-mutation effective weights in one round-trip.
|
|
func (c *maglevClient) SetBackendWeight(ctx context.Context, frontend, pool, backend string, weight int32, flush bool) (*FrontendSnapshot, error) {
|
|
fi, err := c.api.SetFrontendPoolBackendWeight(ctx, &grpcapi.SetWeightRequest{
|
|
Frontend: frontend,
|
|
Pool: pool,
|
|
Backend: backend,
|
|
Weight: weight,
|
|
Flush: flush,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return frontendFromProto(fi), nil
|
|
}
|
|
|
|
func (c *maglevClient) Start(ctx context.Context) {
|
|
go c.watchLoop(ctx)
|
|
go c.refreshLoop(ctx)
|
|
go c.healthLoop(ctx)
|
|
go c.lbStateLoop(ctx)
|
|
}
|
|
|
|
func (c *maglevClient) setConnected(ok bool, errMsg string) {
|
|
c.mu.Lock()
|
|
prev := c.connected
|
|
c.connected = ok
|
|
c.lastErr = errMsg
|
|
c.mu.Unlock()
|
|
if prev != ok {
|
|
payload, _ := json.Marshal(MaglevdStatusPayload{Connected: ok, LastError: errMsg})
|
|
c.broker.Publish(BrowserEvent{
|
|
Maglevd: c.name,
|
|
Type: "maglevd-status",
|
|
AtUnixNs: time.Now().UnixNano(),
|
|
Payload: payload,
|
|
})
|
|
}
|
|
}
|
|
|
|
// Info returns the current connection status for this maglevd.
|
|
func (c *maglevClient) Info() MaglevdInfo {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return MaglevdInfo{
|
|
Name: c.name,
|
|
Address: c.address,
|
|
Connected: c.connected,
|
|
LastError: c.lastErr,
|
|
}
|
|
}
|
|
|
|
// Snapshot returns a deep-ish copy of the cached state for REST handlers.
|
|
// Iteration order follows the corresponding *Order slice so the UI sees a
|
|
// stable, RPC-defined order across reloads.
|
|
func (c *maglevClient) Snapshot() *StateSnapshot {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
snap := &StateSnapshot{
|
|
Maglevd: MaglevdInfo{
|
|
Name: c.name,
|
|
Address: c.address,
|
|
Connected: c.connected,
|
|
LastError: c.lastErr,
|
|
},
|
|
Frontends: make([]*FrontendSnapshot, 0, len(c.cache.FrontendsOrder)),
|
|
Backends: make([]*BackendSnapshot, 0, len(c.cache.BackendsOrder)),
|
|
HealthChecks: make([]*HealthCheckSnapshot, 0, len(c.cache.HealthCheckOrder)),
|
|
VPPInfo: c.cache.VPPInfo,
|
|
VPPState: c.cache.VPPState,
|
|
LBState: c.cache.LBState,
|
|
}
|
|
for _, name := range c.cache.FrontendsOrder {
|
|
if f, ok := c.cache.Frontends[name]; ok {
|
|
snap.Frontends = append(snap.Frontends, f)
|
|
}
|
|
}
|
|
for _, name := range c.cache.BackendsOrder {
|
|
if b, ok := c.cache.Backends[name]; ok {
|
|
snap.Backends = append(snap.Backends, b)
|
|
}
|
|
}
|
|
for _, name := range c.cache.HealthCheckOrder {
|
|
if h, ok := c.cache.HealthChecks[name]; ok {
|
|
snap.HealthChecks = append(snap.HealthChecks, h)
|
|
}
|
|
}
|
|
return snap
|
|
}
|
|
|
|
// refreshAll pulls a full fresh view of the maglevd's state into the cache.
|
|
// Called from the refreshLoop every 30s and immediately after a successful
|
|
// reconnect.
|
|
func (c *maglevClient) refreshAll(ctx context.Context) error {
|
|
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
frontends := map[string]*FrontendSnapshot{}
|
|
fl, err := c.api.ListFrontends(rctx, &grpcapi.ListFrontendsRequest{})
|
|
if err != nil {
|
|
return fmt.Errorf("list frontends: %w", err)
|
|
}
|
|
// Sort alphabetically so the UI layout is stable across
|
|
// reloads/restarts. maglevd's checker.ListFrontends already sorts
|
|
// in current versions, but older builds don't — sort here too as
|
|
// a belt-and-braces guarantee.
|
|
frontendsOrder := append([]string(nil), fl.GetFrontendNames()...)
|
|
sort.Strings(frontendsOrder)
|
|
for _, name := range frontendsOrder {
|
|
fi, err := c.api.GetFrontend(rctx, &grpcapi.GetFrontendRequest{Name: name})
|
|
if err != nil {
|
|
return fmt.Errorf("get frontend %s: %w", name, err)
|
|
}
|
|
frontends[name] = frontendFromProto(fi)
|
|
}
|
|
|
|
backends := map[string]*BackendSnapshot{}
|
|
bl, err := c.api.ListBackends(rctx, &grpcapi.ListBackendsRequest{})
|
|
if err != nil {
|
|
return fmt.Errorf("list backends: %w", err)
|
|
}
|
|
backendsOrder := append([]string(nil), bl.GetBackendNames()...)
|
|
for _, name := range backendsOrder {
|
|
bi, err := c.api.GetBackend(rctx, &grpcapi.GetBackendRequest{Name: name})
|
|
if err != nil {
|
|
return fmt.Errorf("get backend %s: %w", name, err)
|
|
}
|
|
backends[name] = backendFromProto(bi)
|
|
}
|
|
|
|
healthchecks := map[string]*HealthCheckSnapshot{}
|
|
hl, err := c.api.ListHealthChecks(rctx, &grpcapi.ListHealthChecksRequest{})
|
|
if err != nil {
|
|
return fmt.Errorf("list healthchecks: %w", err)
|
|
}
|
|
healthCheckOrder := append([]string(nil), hl.GetNames()...)
|
|
for _, name := range healthCheckOrder {
|
|
hi, err := c.api.GetHealthCheck(rctx, &grpcapi.GetHealthCheckRequest{Name: name})
|
|
if err != nil {
|
|
return fmt.Errorf("get healthcheck %s: %w", name, err)
|
|
}
|
|
healthchecks[name] = healthCheckFromProto(hi)
|
|
}
|
|
|
|
var vppInfo *VPPInfoSnapshot
|
|
vppState := "disconnected"
|
|
if vi, err := c.api.GetVPPInfo(rctx, &grpcapi.GetVPPInfoRequest{}); err == nil {
|
|
vppInfo = &VPPInfoSnapshot{
|
|
Version: vi.GetVersion(),
|
|
BuildDate: vi.GetBuildDate(),
|
|
PID: vi.GetPid(),
|
|
BoottimeNs: vi.GetBoottimeNs(),
|
|
ConnecttimeNs: vi.GetConnecttimeNs(),
|
|
}
|
|
vppState = "connected"
|
|
}
|
|
|
|
c.mu.Lock()
|
|
// Frontend state comes from the FrontendEvent stream, not the
|
|
// FrontendInfo proto — carry any known state from the old cache over
|
|
// to the freshly-listed entries so a periodic refresh doesn't blank
|
|
// the state badges until the next live transition arrives.
|
|
for name, f := range frontends {
|
|
if old, ok := c.cache.Frontends[name]; ok && old.State != "" {
|
|
f.State = old.State
|
|
}
|
|
}
|
|
c.cache.Frontends = frontends
|
|
c.cache.FrontendsOrder = frontendsOrder
|
|
c.cache.Backends = backends
|
|
c.cache.BackendsOrder = backendsOrder
|
|
c.cache.HealthChecks = healthchecks
|
|
c.cache.HealthCheckOrder = healthCheckOrder
|
|
c.cache.VPPInfo = vppInfo
|
|
c.cache.VPPState = vppState
|
|
c.cache.LastRefresh = time.Now()
|
|
c.mu.Unlock()
|
|
// Best-effort LB state pull so /view/api/state served on a fresh
|
|
// page load already carries the bucket column. Errors are
|
|
// swallowed by fetchLBStateAndPublish (which clears the cache and
|
|
// emits an empty event so the SPA renders "—").
|
|
c.fetchLBStateAndPublish(ctx)
|
|
return nil
|
|
}
|
|
|
|
// watchLoop subscribes to WatchEvents and feeds the broker until the context
|
|
// is cancelled. Reconnects with exponential backoff on stream errors.
|
|
func (c *maglevClient) watchLoop(ctx context.Context) {
|
|
backoff := time.Second
|
|
maxBackoff := 30 * time.Second
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
if err := c.watchOnce(ctx); err != nil {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
slog.Warn("watch-disconnected", "maglevd", c.name, "err", err)
|
|
c.setConnected(false, err.Error())
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(backoff):
|
|
}
|
|
backoff *= 2
|
|
if backoff > maxBackoff {
|
|
backoff = maxBackoff
|
|
}
|
|
continue
|
|
}
|
|
backoff = time.Second
|
|
}
|
|
}
|
|
|
|
func (c *maglevClient) watchOnce(ctx context.Context) error {
|
|
logFlag := true
|
|
backendFlag := true
|
|
frontendFlag := true
|
|
req := &grpcapi.WatchRequest{
|
|
Log: &logFlag,
|
|
LogLevel: "debug",
|
|
Backend: &backendFlag,
|
|
Frontend: &frontendFlag,
|
|
}
|
|
stream, err := c.api.WatchEvents(ctx, req)
|
|
if err != nil {
|
|
return fmt.Errorf("open stream: %w", err)
|
|
}
|
|
// Successful subscribe: mark connected and pull a fresh snapshot so
|
|
// the REST cache is immediately ground-truth accurate. WatchEvents
|
|
// itself replays current state as synthetic from==to events, which
|
|
// will also update the cache as they arrive.
|
|
c.setConnected(true, "")
|
|
if err := c.refreshAll(ctx); err != nil {
|
|
slog.Warn("refresh-after-watch", "maglevd", c.name, "err", err)
|
|
}
|
|
for {
|
|
ev, err := stream.Recv()
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) || ctx.Err() != nil {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
c.handleEvent(ev)
|
|
}
|
|
}
|
|
|
|
// handleEvent applies an incoming gRPC event to the local cache and
|
|
// publishes a corresponding BrowserEvent on the broker.
|
|
func (c *maglevClient) handleEvent(ev *grpcapi.Event) {
|
|
switch body := ev.GetEvent().(type) {
|
|
case *grpcapi.Event_Log:
|
|
le := body.Log
|
|
if le == nil {
|
|
return
|
|
}
|
|
attrs := make(map[string]string, len(le.GetAttrs()))
|
|
for _, a := range le.GetAttrs() {
|
|
attrs[a.GetKey()] = a.GetValue()
|
|
}
|
|
c.applyVPPLogHeartbeat(le.GetMsg())
|
|
// A config reload on maglevd can shuffle anything: add or
|
|
// remove frontends, change pool membership, flip configured
|
|
// weights, move backends between pools. Rather than try to
|
|
// incrementally update the cache for every possible change,
|
|
// refresh the whole maglevd state and tell every connected
|
|
// browser to re-hydrate from the fresh snapshot. Only the
|
|
// "-done" event triggers this, not "-start": a failed reload
|
|
// (which never emits "-done") leaves the running config
|
|
// unchanged, so no refresh is needed.
|
|
if le.GetMsg() == "config-reload-done" {
|
|
c.triggerConfigResync()
|
|
}
|
|
payload, _ := json.Marshal(LogEventPayload{
|
|
Level: le.GetLevel(),
|
|
Msg: le.GetMsg(),
|
|
Attrs: attrs,
|
|
})
|
|
c.broker.Publish(BrowserEvent{
|
|
Maglevd: c.name,
|
|
Type: "log",
|
|
AtUnixNs: le.GetAtUnixNs(),
|
|
Payload: payload,
|
|
})
|
|
|
|
case *grpcapi.Event_Backend:
|
|
be := body.Backend
|
|
if be == nil || be.GetTransition() == nil {
|
|
return
|
|
}
|
|
tr := transitionFromProto(be.GetTransition())
|
|
// maglevd replays current state on WatchEvents subscribe as a
|
|
// synthetic event with from==to and at_unix_ns=0 (see
|
|
// internal/grpcapi/server.go). It is not a real transition — the
|
|
// in-process cache is already correct from refreshAll, so don't
|
|
// touch LastTransition (which would clobber it with at=0 and
|
|
// render as "55 years ago" in the browser) and don't forward to
|
|
// the broker.
|
|
if tr.From == tr.To {
|
|
return
|
|
}
|
|
c.applyBackendTransition(be.GetBackendName(), tr)
|
|
payload, _ := json.Marshal(BackendEventPayload{
|
|
Backend: be.GetBackendName(),
|
|
Transition: *tr,
|
|
})
|
|
c.broker.Publish(BrowserEvent{
|
|
Maglevd: c.name,
|
|
Type: "backend",
|
|
AtUnixNs: tr.AtUnixNs,
|
|
Payload: payload,
|
|
})
|
|
// A real transition means VPP is about to (or already did)
|
|
// reshuffle bucket allocations across the affected VIP. Wake
|
|
// the lb-state loop so the SPA's bucket column converges
|
|
// without waiting for the 30s refresh.
|
|
c.triggerLBStateFetch()
|
|
|
|
case *grpcapi.Event_Frontend:
|
|
fe := body.Frontend
|
|
if fe == nil || fe.GetTransition() == nil {
|
|
return
|
|
}
|
|
tr := transitionFromProto(fe.GetTransition())
|
|
// Always update the cached state — synthetic from==to events on
|
|
// subscribe are how we learn the initial frontend state (there's
|
|
// no equivalent field in the FrontendInfo proto). Only publish
|
|
// genuine transitions to the browser so the debug panel doesn't
|
|
// show 'up → up' spam on every gRPC reconnect.
|
|
c.applyFrontendState(fe.GetFrontendName(), tr.To)
|
|
if tr.From == tr.To {
|
|
return
|
|
}
|
|
payload, _ := json.Marshal(FrontendEventPayload{
|
|
Frontend: fe.GetFrontendName(),
|
|
Transition: *tr,
|
|
})
|
|
c.broker.Publish(BrowserEvent{
|
|
Maglevd: c.name,
|
|
Type: "frontend",
|
|
AtUnixNs: tr.AtUnixNs,
|
|
Payload: payload,
|
|
})
|
|
}
|
|
}
|
|
|
|
// triggerConfigResync runs refreshAll off the event-dispatch goroutine
|
|
// (so the stream.Recv loop isn't blocked while the full config refetch
|
|
// hits several gRPC calls) and then publishes a BrowserEvent of type
|
|
// "resync" so every connected browser re-fetches /view/api/state from
|
|
// the now-fresh cache. Fired in response to a maglevd "config-reload-
|
|
// done" log event.
|
|
//
|
|
// The refresh-then-publish order matters: if we published first, the
|
|
// SPA would fetchState from a stale cache and display old data until
|
|
// the next 30s refresh tick. Running refreshAll synchronously inside
|
|
// this goroutine closes that window.
|
|
//
|
|
// The resync event goes through the normal broker → ring buffer path,
|
|
// so a browser that reconnects shortly after the reload (within the
|
|
// 30s / 2000-event replay window) still sees the resync on its first
|
|
// live event and re-hydrates without needing a separate out-of-band
|
|
// signal.
|
|
func (c *maglevClient) triggerConfigResync() {
|
|
go func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
if err := c.refreshAll(ctx); err != nil {
|
|
slog.Warn("config-resync-refresh", "maglevd", c.name, "err", err)
|
|
// Publish anyway — the SPA's refetch will see the
|
|
// cache in whatever state refreshAll left it, and
|
|
// the periodic refreshLoop will retry. Better than
|
|
// silently dropping the signal.
|
|
}
|
|
c.broker.Publish(BrowserEvent{
|
|
Maglevd: c.name,
|
|
Type: "resync",
|
|
AtUnixNs: time.Now().UnixNano(),
|
|
Payload: json.RawMessage("{}"),
|
|
})
|
|
}()
|
|
}
|
|
|
|
// applyFrontendState writes the given state into the cached frontend
|
|
// snapshot. Called both by synthetic replay events on subscribe and by
|
|
// live transitions afterwards.
|
|
func (c *maglevClient) applyFrontendState(name, state string) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
f, ok := c.cache.Frontends[name]
|
|
if !ok {
|
|
return
|
|
}
|
|
f.State = state
|
|
}
|
|
|
|
// applyVPPLogHeartbeat flips the cache.VPPState field based on the
|
|
// event's msg. vpp-connect and vpp-api-{send,recv}* are treated as
|
|
// "VPP is up" signals; vpp-disconnect flips to "down". Unrelated log
|
|
// events are a no-op. Called from handleEvent under the client's
|
|
// event-dispatch goroutine, so contention on mu is single-writer.
|
|
func (c *maglevClient) applyVPPLogHeartbeat(msg string) {
|
|
var newState string
|
|
switch {
|
|
case msg == "vpp-connect":
|
|
newState = "connected"
|
|
case msg == "vpp-disconnect":
|
|
newState = "disconnected"
|
|
case strings.HasPrefix(msg, "vpp-api-send") || strings.HasPrefix(msg, "vpp-api-recv"):
|
|
newState = "connected"
|
|
default:
|
|
return
|
|
}
|
|
c.mu.Lock()
|
|
if c.cache.VPPState == newState {
|
|
c.mu.Unlock()
|
|
return
|
|
}
|
|
c.cache.VPPState = newState
|
|
c.mu.Unlock()
|
|
payload, _ := json.Marshal(VPPStatusPayload{State: newState})
|
|
c.broker.Publish(BrowserEvent{
|
|
Maglevd: c.name,
|
|
Type: "vpp-status",
|
|
AtUnixNs: time.Now().UnixNano(),
|
|
Payload: payload,
|
|
})
|
|
// VPP just came back: pull fresh LB state so the bucket column
|
|
// repopulates immediately instead of waiting up to 30s for the
|
|
// next refresh tick. On vpp-disconnect the next fetch will fail
|
|
// and clear the cache, which is also the right behaviour.
|
|
c.triggerLBStateFetch()
|
|
}
|
|
|
|
func (c *maglevClient) applyBackendTransition(name string, tr *TransitionRecord) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
b, ok := c.cache.Backends[name]
|
|
if !ok {
|
|
// Partial-create fallback for a transition that arrives before
|
|
// the first refreshAll has seen this backend. The real fields
|
|
// (address, healthcheck, pool memberships) are filled in on
|
|
// the next refresh tick; here we just stamp Name so the entry
|
|
// exists.
|
|
b = &BackendSnapshot{Name: name}
|
|
c.cache.Backends[name] = b
|
|
c.cache.BackendsOrder = append(c.cache.BackendsOrder, name)
|
|
}
|
|
b.State = tr.To
|
|
// Derive Enabled from State. In maglevd, state="disabled" and
|
|
// config.enabled=false are two ways of expressing the same
|
|
// condition — DisableBackend / EnableBackend flip both together,
|
|
// and no other state corresponds to enabled=false. Keeping them
|
|
// in sync in the reducer closes a race where the cache's cached
|
|
// Enabled could lag behind state by up to a refreshLoop tick,
|
|
// causing the SPA to render a bogus [disabled] tag next to an
|
|
// "up" badge on a freshly-re-enabled backend.
|
|
b.Enabled = tr.To != "disabled"
|
|
b.LastTransition = tr
|
|
b.Transitions = append(b.Transitions, tr)
|
|
// Cap history to the most recent 20 entries to mirror what maglevd
|
|
// returns from GetBackend.
|
|
if len(b.Transitions) > 20 {
|
|
b.Transitions = b.Transitions[len(b.Transitions)-20:]
|
|
}
|
|
}
|
|
|
|
// refreshLoop pulls a fresh snapshot every 30s to catch anything the live
|
|
// event stream may have missed (e.g. during a brief gRPC reconnect).
|
|
func (c *maglevClient) refreshLoop(ctx context.Context) {
|
|
t := time.NewTicker(30 * time.Second)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
if err := c.refreshAll(ctx); err != nil {
|
|
slog.Debug("refresh-all", "maglevd", c.name, "err", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// healthLoop issues a cheap GetVPPInfo every 5s to surface connection drops
|
|
// quickly. Errors flip the connection indicator; recoveries trigger a
|
|
// refreshAll so the cache catches up.
|
|
func (c *maglevClient) healthLoop(ctx context.Context) {
|
|
t := time.NewTicker(5 * time.Second)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
hctx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
|
_, err := c.api.GetVPPInfo(hctx, &grpcapi.GetVPPInfoRequest{})
|
|
cancel()
|
|
if err != nil {
|
|
c.setConnected(false, err.Error())
|
|
} else {
|
|
c.setConnected(true, "")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---- proto → JSON helpers --------------------------------------------------
|
|
|
|
func frontendFromProto(fi *grpcapi.FrontendInfo) *FrontendSnapshot {
|
|
out := &FrontendSnapshot{
|
|
Name: fi.GetName(),
|
|
Address: fi.GetAddress(),
|
|
Protocol: fi.GetProtocol(),
|
|
Port: fi.GetPort(),
|
|
Description: fi.GetDescription(),
|
|
SrcIPSticky: fi.GetSrcIpSticky(),
|
|
}
|
|
for _, p := range fi.GetPools() {
|
|
ps := &PoolSnapshot{Name: p.GetName()}
|
|
for _, pb := range p.GetBackends() {
|
|
ps.Backends = append(ps.Backends, &PoolBackendSnapshot{
|
|
Name: pb.GetName(),
|
|
Weight: pb.GetWeight(),
|
|
EffectiveWeight: pb.GetEffectiveWeight(),
|
|
})
|
|
}
|
|
out.Pools = append(out.Pools, ps)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func backendFromProto(bi *grpcapi.BackendInfo) *BackendSnapshot {
|
|
out := &BackendSnapshot{
|
|
Name: bi.GetName(),
|
|
Address: bi.GetAddress(),
|
|
State: bi.GetState(),
|
|
Enabled: bi.GetEnabled(),
|
|
HealthCheck: bi.GetHealthcheck(),
|
|
}
|
|
// maglevd stores and returns transitions newest-first (it prepends
|
|
// in health.Backend.transition()). The client stores them
|
|
// oldest-first so applyBackendTransition can simply append new
|
|
// events to the end. Reverse on read to reconcile the two
|
|
// conventions — then out.Transitions[n-1] is the newest, which is
|
|
// the correct LastTransition.
|
|
trs := bi.GetTransitions()
|
|
out.Transitions = make([]*TransitionRecord, len(trs))
|
|
for i, t := range trs {
|
|
out.Transitions[len(trs)-1-i] = transitionFromProto(t)
|
|
}
|
|
if n := len(out.Transitions); n > 0 {
|
|
out.LastTransition = out.Transitions[n-1]
|
|
}
|
|
return out
|
|
}
|
|
|
|
func transitionFromProto(t *grpcapi.TransitionRecord) *TransitionRecord {
|
|
return &TransitionRecord{
|
|
From: t.GetFrom(),
|
|
To: t.GetTo(),
|
|
AtUnixNs: t.GetAtUnixNs(),
|
|
}
|
|
}
|
|
|
|
// triggerLBStateFetch sends a non-blocking wake to lbStateLoop. The
|
|
// channel has buffer 1 so coalesced bursts never block the publisher.
|
|
func (c *maglevClient) triggerLBStateFetch() {
|
|
select {
|
|
case c.lbWakeCh <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// lbStateLoop consumes wake signals and calls GetVPPLBState, with a
|
|
// leading+trailing-edge debounce so we never exceed one fetch per
|
|
// minLBInterval (1s). The leading edge means the very first wake after
|
|
// an idle period fires immediately — important so a single isolated
|
|
// transition isn't artificially delayed by a second. The trailing edge
|
|
// means a burst of wakes during the cool-down still gets one final
|
|
// fetch right after the gate opens, so the SPA always converges to a
|
|
// post-burst snapshot rather than missing the last update.
|
|
func (c *maglevClient) lbStateLoop(ctx context.Context) {
|
|
const minLBInterval = time.Second
|
|
var (
|
|
timer *time.Timer
|
|
lastFetch time.Time
|
|
)
|
|
timerCh := func() <-chan time.Time {
|
|
if timer == nil {
|
|
return nil
|
|
}
|
|
return timer.C
|
|
}
|
|
fire := func() {
|
|
if timer != nil {
|
|
if !timer.Stop() {
|
|
select {
|
|
case <-timer.C:
|
|
default:
|
|
}
|
|
}
|
|
timer = nil
|
|
}
|
|
c.fetchLBStateAndPublish(ctx)
|
|
lastFetch = time.Now()
|
|
}
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-c.lbWakeCh:
|
|
wait := minLBInterval - time.Since(lastFetch)
|
|
if wait <= 0 {
|
|
fire()
|
|
} else if timer == nil {
|
|
timer = time.NewTimer(wait)
|
|
}
|
|
case <-timerCh():
|
|
timer = nil
|
|
fire()
|
|
}
|
|
}
|
|
}
|
|
|
|
// fetchLBStateAndPublish runs one GetVPPLBState round-trip, rebuilds
|
|
// the per-frontend bucket map, swaps it into the cache, and broadcasts
|
|
// a "lb-state" BrowserEvent. On error the cache is cleared and an
|
|
// empty event is published so the SPA can switch the bucket column to
|
|
// em-dashes — clear-on-error is simpler than stale-but-visible and
|
|
// doesn't risk showing a confusing snapshot from before VPP died.
|
|
func (c *maglevClient) fetchLBStateAndPublish(ctx context.Context) {
|
|
fctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
lbs, err := c.api.GetVPPLBState(fctx, &grpcapi.GetVPPLBStateRequest{})
|
|
if err != nil {
|
|
c.mu.Lock()
|
|
had := c.cache.LBState != nil
|
|
c.cache.LBState = nil
|
|
c.mu.Unlock()
|
|
slog.Debug("lb-state-fetch", "maglevd", c.name, "err", err)
|
|
if had {
|
|
c.publishLBState(nil)
|
|
}
|
|
return
|
|
}
|
|
snap := c.buildLBStateSnapshot(lbs)
|
|
c.mu.Lock()
|
|
c.cache.LBState = snap
|
|
c.mu.Unlock()
|
|
c.publishLBState(snap.PerFrontend)
|
|
}
|
|
|
|
func (c *maglevClient) publishLBState(perFrontend map[string]map[string]int32) {
|
|
payload, _ := json.Marshal(LBStatePayload{PerFrontend: perFrontend})
|
|
c.broker.Publish(BrowserEvent{
|
|
Maglevd: c.name,
|
|
Type: "lb-state",
|
|
AtUnixNs: time.Now().UnixNano(),
|
|
Payload: payload,
|
|
})
|
|
}
|
|
|
|
// buildLBStateSnapshot translates a VPP-side state record (keyed by
|
|
// CIDR/protocol/port and AS address) into a maglev-side record (keyed
|
|
// by frontend name and backend name). Unmatched VIPs and unmatched AS
|
|
// addresses are silently skipped — they're benign side effects of a
|
|
// transient sync gap or a backend address that's only present in one
|
|
// of the two universes.
|
|
func (c *maglevClient) buildLBStateSnapshot(lbs *grpcapi.VPPLBState) *LBStateSnapshot {
|
|
c.mu.RLock()
|
|
feByVIP := make(map[string]string, len(c.cache.Frontends))
|
|
for _, f := range c.cache.Frontends {
|
|
feByVIP[lbVIPKey(f.Address, f.Protocol, f.Port)] = f.Name
|
|
}
|
|
backendByAddr := make(map[string]string, len(c.cache.Backends))
|
|
for _, b := range c.cache.Backends {
|
|
backendByAddr[b.Address] = b.Name
|
|
}
|
|
c.mu.RUnlock()
|
|
|
|
out := &LBStateSnapshot{PerFrontend: map[string]map[string]int32{}}
|
|
for _, v := range lbs.GetVips() {
|
|
feName, ok := feByVIP[lbVIPKey(stripLBHostMask(v.GetPrefix()), lbProtoString(v.GetProtocol()), v.GetPort())]
|
|
if !ok {
|
|
continue
|
|
}
|
|
row := out.PerFrontend[feName]
|
|
if row == nil {
|
|
row = map[string]int32{}
|
|
out.PerFrontend[feName] = row
|
|
}
|
|
for _, as := range v.GetApplicationServers() {
|
|
bname, ok := backendByAddr[as.GetAddress()]
|
|
if !ok {
|
|
continue
|
|
}
|
|
row[bname] = int32(as.GetNumBuckets())
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
// lbVIPKey is the join key between a maglev FrontendSnapshot and a
|
|
// VPP-side VPPLBVIP record. Stripping the mask and lower-casing the
|
|
// protocol gives a canonical form that both sides can produce.
|
|
func lbVIPKey(addr, proto string, port uint32) string {
|
|
return fmt.Sprintf("%s/%s/%d", addr, strings.ToLower(proto), port)
|
|
}
|
|
|
|
// lbProtoString mirrors maglevc's protoString — kept local to avoid a
|
|
// cross-package import for two trivial helpers.
|
|
func lbProtoString(p uint32) string {
|
|
switch p {
|
|
case 6:
|
|
return "tcp"
|
|
case 17:
|
|
return "udp"
|
|
case 255:
|
|
return "any"
|
|
}
|
|
return fmt.Sprintf("%d", p)
|
|
}
|
|
|
|
// stripLBHostMask trims "/32" or "/128" from a VPP host-prefix VIP so
|
|
// it can be compared against a maglev FrontendSnapshot.Address (which
|
|
// is bare). Other shapes are returned unchanged.
|
|
func stripLBHostMask(prefix string) string {
|
|
if strings.HasSuffix(prefix, "/32") || strings.HasSuffix(prefix, "/128") {
|
|
return prefix[:strings.LastIndexByte(prefix, '/')]
|
|
}
|
|
return prefix
|
|
}
|
|
|
|
func healthCheckFromProto(h *grpcapi.HealthCheckInfo) *HealthCheckSnapshot {
|
|
return &HealthCheckSnapshot{
|
|
Name: h.GetName(),
|
|
Type: h.GetType(),
|
|
Port: h.GetPort(),
|
|
IntervalNs: h.GetIntervalNs(),
|
|
FastIntervalNs: h.GetFastIntervalNs(),
|
|
DownIntervalNs: h.GetDownIntervalNs(),
|
|
TimeoutNs: h.GetTimeoutNs(),
|
|
Rise: h.GetRise(),
|
|
Fall: h.GetFall(),
|
|
}
|
|
}
|