887 lines
27 KiB
Go
887 lines
27 KiB
Go
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
|
|
"git.ipng.ch/ipng/vpp-maglev/internal/grpcapi"
|
|
)
|
|
|
|
// maglevClient is a per-maglevd gRPC client plus cache and background loops.
|
|
type maglevClient struct {
|
|
name string
|
|
address string
|
|
conn *grpc.ClientConn
|
|
api grpcapi.MaglevClient
|
|
broker *Broker
|
|
|
|
mu sync.RWMutex
|
|
connected bool
|
|
lastErr string
|
|
cache cachedState
|
|
|
|
// lbWakeCh is a buffer-1 trigger channel feeding lbStateLoop. Every
|
|
// backend transition (and a few other events) does a non-blocking send
|
|
// here; the loop coalesces bursts into at most one GetVPPLBState call
|
|
// per second. See lbStateLoop for the leading+trailing-edge debounce.
|
|
lbWakeCh chan struct{}
|
|
}
|
|
|
|
// cachedState is the per-maglevd snapshot served via the REST handlers.
|
|
// Frontends / Backends / HealthChecks are maps for O(1) lookup from the
|
|
// event path, and the *Order slices preserve the order returned by the
|
|
// corresponding List* RPC so the UI renders in a stable order across
|
|
// reloads instead of Go map iteration's randomised order.
|
|
type cachedState struct {
|
|
Frontends map[string]*FrontendSnapshot
|
|
FrontendsOrder []string
|
|
Backends map[string]*BackendSnapshot
|
|
BackendsOrder []string
|
|
HealthChecks map[string]*HealthCheckSnapshot
|
|
HealthCheckOrder []string
|
|
VPPInfo *VPPInfoSnapshot
|
|
VPPState string // "", "connected", "disconnected"
|
|
LBState *LBStateSnapshot
|
|
LastRefresh time.Time
|
|
}
|
|
|
|
func newMaglevClient(address string, broker *Broker) (*maglevClient, error) {
|
|
conn, err := grpc.NewClient(address,
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &maglevClient{
|
|
name: hostnameOf(address),
|
|
address: address,
|
|
conn: conn,
|
|
api: grpcapi.NewMaglevClient(conn),
|
|
broker: broker,
|
|
cache: cachedState{
|
|
Frontends: map[string]*FrontendSnapshot{},
|
|
Backends: map[string]*BackendSnapshot{},
|
|
HealthChecks: map[string]*HealthCheckSnapshot{},
|
|
},
|
|
lbWakeCh: make(chan struct{}, 1),
|
|
}, nil
|
|
}
|
|
|
|
// hostnameOf strips the port from an address and returns a short display
|
|
// name. For DNS names we take the first label ("lb-ams.internal:9090" →
|
|
// "lb-ams"). For IP literals we return the full address so we don't
|
|
// accidentally truncate "127.0.0.1" to "127".
|
|
func hostnameOf(address string) string {
|
|
host := address
|
|
if h, _, err := net.SplitHostPort(address); err == nil {
|
|
host = h
|
|
}
|
|
host = strings.TrimPrefix(strings.TrimSuffix(host, "]"), "[")
|
|
if net.ParseIP(host) != nil {
|
|
return host
|
|
}
|
|
if i := strings.Index(host, "."); i >= 0 {
|
|
return host[:i]
|
|
}
|
|
return host
|
|
}
|
|
|
|
func (c *maglevClient) Close() {
|
|
_ = c.conn.Close()
|
|
}
|
|
|
|
// BackendAction runs one of the four lifecycle operations on a backend.
|
|
// Valid actions are "pause", "resume", "enable", and "disable". The
|
|
// fresh backend snapshot returned by maglevd is converted and sent
|
|
// back to the caller so the admin API handler can reply with the
|
|
// post-mutation state in a single round-trip. The broadcast
|
|
// WatchEvents stream will also deliver a transition event which the
|
|
// local cache and every connected browser apply through the normal
|
|
// reducer path — so the UI converges even if the HTTP response is
|
|
// slow or dropped in flight.
|
|
func (c *maglevClient) BackendAction(ctx context.Context, name, action string) (*BackendSnapshot, error) {
|
|
req := &grpcapi.BackendRequest{Name: name}
|
|
var bi *grpcapi.BackendInfo
|
|
var err error
|
|
switch action {
|
|
case "pause":
|
|
bi, err = c.api.PauseBackend(ctx, req)
|
|
case "resume":
|
|
bi, err = c.api.ResumeBackend(ctx, req)
|
|
case "enable":
|
|
bi, err = c.api.EnableBackend(ctx, req)
|
|
case "disable":
|
|
bi, err = c.api.DisableBackend(ctx, req)
|
|
default:
|
|
return nil, fmt.Errorf("unknown action %q", action)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return backendFromProto(bi), nil
|
|
}
|
|
|
|
// SetBackendWeight runs the SetFrontendPoolBackendWeight gRPC call. A
|
|
// fresh FrontendSnapshot is returned so admin callers get the
|
|
// post-mutation effective weights in one round-trip.
|
|
func (c *maglevClient) SetBackendWeight(ctx context.Context, frontend, pool, backend string, weight int32, flush bool) (*FrontendSnapshot, error) {
|
|
fi, err := c.api.SetFrontendPoolBackendWeight(ctx, &grpcapi.SetWeightRequest{
|
|
Frontend: frontend,
|
|
Pool: pool,
|
|
Backend: backend,
|
|
Weight: weight,
|
|
Flush: flush,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return frontendFromProto(fi), nil
|
|
}
|
|
|
|
func (c *maglevClient) Start(ctx context.Context) {
|
|
go c.watchLoop(ctx)
|
|
go c.refreshLoop(ctx)
|
|
go c.healthLoop(ctx)
|
|
go c.lbStateLoop(ctx)
|
|
}
|
|
|
|
func (c *maglevClient) setConnected(ok bool, errMsg string) {
|
|
c.mu.Lock()
|
|
prev := c.connected
|
|
c.connected = ok
|
|
c.lastErr = errMsg
|
|
c.mu.Unlock()
|
|
if prev != ok {
|
|
payload, _ := json.Marshal(MaglevdStatusPayload{Connected: ok, LastError: errMsg})
|
|
c.broker.Publish(BrowserEvent{
|
|
Maglevd: c.name,
|
|
Type: "maglevd-status",
|
|
AtUnixNs: time.Now().UnixNano(),
|
|
Payload: payload,
|
|
})
|
|
}
|
|
}
|
|
|
|
// Info returns the current connection status for this maglevd.
|
|
func (c *maglevClient) Info() MaglevdInfo {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return MaglevdInfo{
|
|
Name: c.name,
|
|
Address: c.address,
|
|
Connected: c.connected,
|
|
LastError: c.lastErr,
|
|
}
|
|
}
|
|
|
|
// Snapshot returns a deep-ish copy of the cached state for REST handlers.
|
|
// Iteration order follows the corresponding *Order slice so the UI sees a
|
|
// stable, RPC-defined order across reloads.
|
|
func (c *maglevClient) Snapshot() *StateSnapshot {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
snap := &StateSnapshot{
|
|
Maglevd: MaglevdInfo{
|
|
Name: c.name,
|
|
Address: c.address,
|
|
Connected: c.connected,
|
|
LastError: c.lastErr,
|
|
},
|
|
Frontends: make([]*FrontendSnapshot, 0, len(c.cache.FrontendsOrder)),
|
|
Backends: make([]*BackendSnapshot, 0, len(c.cache.BackendsOrder)),
|
|
HealthChecks: make([]*HealthCheckSnapshot, 0, len(c.cache.HealthCheckOrder)),
|
|
VPPInfo: c.cache.VPPInfo,
|
|
VPPState: c.cache.VPPState,
|
|
LBState: c.cache.LBState,
|
|
}
|
|
for _, name := range c.cache.FrontendsOrder {
|
|
if f, ok := c.cache.Frontends[name]; ok {
|
|
snap.Frontends = append(snap.Frontends, f)
|
|
}
|
|
}
|
|
for _, name := range c.cache.BackendsOrder {
|
|
if b, ok := c.cache.Backends[name]; ok {
|
|
snap.Backends = append(snap.Backends, b)
|
|
}
|
|
}
|
|
for _, name := range c.cache.HealthCheckOrder {
|
|
if h, ok := c.cache.HealthChecks[name]; ok {
|
|
snap.HealthChecks = append(snap.HealthChecks, h)
|
|
}
|
|
}
|
|
return snap
|
|
}
|
|
|
|
// refreshAll pulls a full fresh view of the maglevd's state into the cache.
|
|
// Called from the refreshLoop every 30s and immediately after a successful
|
|
// reconnect.
|
|
func (c *maglevClient) refreshAll(ctx context.Context) error {
|
|
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
frontends := map[string]*FrontendSnapshot{}
|
|
fl, err := c.api.ListFrontends(rctx, &grpcapi.ListFrontendsRequest{})
|
|
if err != nil {
|
|
return fmt.Errorf("list frontends: %w", err)
|
|
}
|
|
// Sort alphabetically so the UI layout is stable across
|
|
// reloads/restarts. maglevd's checker.ListFrontends already sorts
|
|
// in current versions, but older builds don't — sort here too as
|
|
// a belt-and-braces guarantee.
|
|
frontendsOrder := append([]string(nil), fl.GetFrontendNames()...)
|
|
sort.Strings(frontendsOrder)
|
|
for _, name := range frontendsOrder {
|
|
fi, err := c.api.GetFrontend(rctx, &grpcapi.GetFrontendRequest{Name: name})
|
|
if err != nil {
|
|
return fmt.Errorf("get frontend %s: %w", name, err)
|
|
}
|
|
frontends[name] = frontendFromProto(fi)
|
|
}
|
|
|
|
backends := map[string]*BackendSnapshot{}
|
|
bl, err := c.api.ListBackends(rctx, &grpcapi.ListBackendsRequest{})
|
|
if err != nil {
|
|
return fmt.Errorf("list backends: %w", err)
|
|
}
|
|
backendsOrder := append([]string(nil), bl.GetBackendNames()...)
|
|
for _, name := range backendsOrder {
|
|
bi, err := c.api.GetBackend(rctx, &grpcapi.GetBackendRequest{Name: name})
|
|
if err != nil {
|
|
return fmt.Errorf("get backend %s: %w", name, err)
|
|
}
|
|
backends[name] = backendFromProto(bi)
|
|
}
|
|
|
|
healthchecks := map[string]*HealthCheckSnapshot{}
|
|
hl, err := c.api.ListHealthChecks(rctx, &grpcapi.ListHealthChecksRequest{})
|
|
if err != nil {
|
|
return fmt.Errorf("list healthchecks: %w", err)
|
|
}
|
|
healthCheckOrder := append([]string(nil), hl.GetNames()...)
|
|
for _, name := range healthCheckOrder {
|
|
hi, err := c.api.GetHealthCheck(rctx, &grpcapi.GetHealthCheckRequest{Name: name})
|
|
if err != nil {
|
|
return fmt.Errorf("get healthcheck %s: %w", name, err)
|
|
}
|
|
healthchecks[name] = healthCheckFromProto(hi)
|
|
}
|
|
|
|
var vppInfo *VPPInfoSnapshot
|
|
vppState := "disconnected"
|
|
if vi, err := c.api.GetVPPInfo(rctx, &grpcapi.GetVPPInfoRequest{}); err == nil {
|
|
vppInfo = &VPPInfoSnapshot{
|
|
Version: vi.GetVersion(),
|
|
BuildDate: vi.GetBuildDate(),
|
|
PID: vi.GetPid(),
|
|
BoottimeNs: vi.GetBoottimeNs(),
|
|
ConnecttimeNs: vi.GetConnecttimeNs(),
|
|
}
|
|
vppState = "connected"
|
|
}
|
|
|
|
c.mu.Lock()
|
|
// Frontend state comes from the FrontendEvent stream, not the
|
|
// FrontendInfo proto — carry any known state from the old cache over
|
|
// to the freshly-listed entries so a periodic refresh doesn't blank
|
|
// the state badges until the next live transition arrives.
|
|
for name, f := range frontends {
|
|
if old, ok := c.cache.Frontends[name]; ok && old.State != "" {
|
|
f.State = old.State
|
|
}
|
|
}
|
|
c.cache.Frontends = frontends
|
|
c.cache.FrontendsOrder = frontendsOrder
|
|
c.cache.Backends = backends
|
|
c.cache.BackendsOrder = backendsOrder
|
|
c.cache.HealthChecks = healthchecks
|
|
c.cache.HealthCheckOrder = healthCheckOrder
|
|
c.cache.VPPInfo = vppInfo
|
|
c.cache.VPPState = vppState
|
|
c.cache.LastRefresh = time.Now()
|
|
c.mu.Unlock()
|
|
// Best-effort LB state pull so /view/api/state served on a fresh
|
|
// page load already carries the bucket column. Errors are
|
|
// swallowed by fetchLBStateAndPublish (which clears the cache and
|
|
// emits an empty event so the SPA renders "—").
|
|
c.fetchLBStateAndPublish(ctx)
|
|
return nil
|
|
}
|
|
|
|
// watchLoop subscribes to WatchEvents and feeds the broker until the context
|
|
// is cancelled. Reconnects with exponential backoff on stream errors.
|
|
func (c *maglevClient) watchLoop(ctx context.Context) {
|
|
backoff := time.Second
|
|
maxBackoff := 30 * time.Second
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
if err := c.watchOnce(ctx); err != nil {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
slog.Warn("watch-disconnected", "maglevd", c.name, "err", err)
|
|
c.setConnected(false, err.Error())
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(backoff):
|
|
}
|
|
backoff *= 2
|
|
if backoff > maxBackoff {
|
|
backoff = maxBackoff
|
|
}
|
|
continue
|
|
}
|
|
backoff = time.Second
|
|
}
|
|
}
|
|
|
|
func (c *maglevClient) watchOnce(ctx context.Context) error {
|
|
logFlag := true
|
|
backendFlag := true
|
|
frontendFlag := true
|
|
req := &grpcapi.WatchRequest{
|
|
Log: &logFlag,
|
|
LogLevel: "debug",
|
|
Backend: &backendFlag,
|
|
Frontend: &frontendFlag,
|
|
}
|
|
stream, err := c.api.WatchEvents(ctx, req)
|
|
if err != nil {
|
|
return fmt.Errorf("open stream: %w", err)
|
|
}
|
|
// Successful subscribe: mark connected and pull a fresh snapshot so
|
|
// the REST cache is immediately ground-truth accurate. WatchEvents
|
|
// itself replays current state as synthetic from==to events, which
|
|
// will also update the cache as they arrive.
|
|
c.setConnected(true, "")
|
|
if err := c.refreshAll(ctx); err != nil {
|
|
slog.Warn("refresh-after-watch", "maglevd", c.name, "err", err)
|
|
}
|
|
for {
|
|
ev, err := stream.Recv()
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) || ctx.Err() != nil {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
c.handleEvent(ev)
|
|
}
|
|
}
|
|
|
|
// handleEvent applies an incoming gRPC event to the local cache and
|
|
// publishes a corresponding BrowserEvent on the broker.
|
|
func (c *maglevClient) handleEvent(ev *grpcapi.Event) {
|
|
switch body := ev.GetEvent().(type) {
|
|
case *grpcapi.Event_Log:
|
|
le := body.Log
|
|
if le == nil {
|
|
return
|
|
}
|
|
attrs := make(map[string]string, len(le.GetAttrs()))
|
|
for _, a := range le.GetAttrs() {
|
|
attrs[a.GetKey()] = a.GetValue()
|
|
}
|
|
c.applyVPPLogHeartbeat(le.GetMsg())
|
|
// A config reload on maglevd can shuffle anything: add or
|
|
// remove frontends, change pool membership, flip configured
|
|
// weights, move backends between pools. Rather than try to
|
|
// incrementally update the cache for every possible change,
|
|
// refresh the whole maglevd state and tell every connected
|
|
// browser to re-hydrate from the fresh snapshot. Only the
|
|
// "-done" event triggers this, not "-start": a failed reload
|
|
// (which never emits "-done") leaves the running config
|
|
// unchanged, so no refresh is needed.
|
|
if le.GetMsg() == "config-reload-done" {
|
|
c.triggerConfigResync()
|
|
}
|
|
payload, _ := json.Marshal(LogEventPayload{
|
|
Level: le.GetLevel(),
|
|
Msg: le.GetMsg(),
|
|
Attrs: attrs,
|
|
})
|
|
c.broker.Publish(BrowserEvent{
|
|
Maglevd: c.name,
|
|
Type: "log",
|
|
AtUnixNs: le.GetAtUnixNs(),
|
|
Payload: payload,
|
|
})
|
|
|
|
case *grpcapi.Event_Backend:
|
|
be := body.Backend
|
|
if be == nil || be.GetTransition() == nil {
|
|
return
|
|
}
|
|
tr := transitionFromProto(be.GetTransition())
|
|
// maglevd replays current state on WatchEvents subscribe as a
|
|
// synthetic event with from==to and at_unix_ns=0 (see
|
|
// internal/grpcapi/server.go). It is not a real transition — the
|
|
// in-process cache is already correct from refreshAll, so don't
|
|
// touch LastTransition (which would clobber it with at=0 and
|
|
// render as "55 years ago" in the browser) and don't forward to
|
|
// the broker.
|
|
if tr.From == tr.To {
|
|
return
|
|
}
|
|
c.applyBackendTransition(be.GetBackendName(), tr)
|
|
payload, _ := json.Marshal(BackendEventPayload{
|
|
Backend: be.GetBackendName(),
|
|
Transition: *tr,
|
|
})
|
|
c.broker.Publish(BrowserEvent{
|
|
Maglevd: c.name,
|
|
Type: "backend",
|
|
AtUnixNs: tr.AtUnixNs,
|
|
Payload: payload,
|
|
})
|
|
// A real transition means VPP is about to (or already did)
|
|
// reshuffle bucket allocations across the affected VIP. Wake
|
|
// the lb-state loop so the SPA's bucket column converges
|
|
// without waiting for the 30s refresh.
|
|
c.triggerLBStateFetch()
|
|
|
|
case *grpcapi.Event_Frontend:
|
|
fe := body.Frontend
|
|
if fe == nil || fe.GetTransition() == nil {
|
|
return
|
|
}
|
|
tr := transitionFromProto(fe.GetTransition())
|
|
// Always update the cached state — synthetic from==to events on
|
|
// subscribe are how we learn the initial frontend state (there's
|
|
// no equivalent field in the FrontendInfo proto). Only publish
|
|
// genuine transitions to the browser so the debug panel doesn't
|
|
// show 'up → up' spam on every gRPC reconnect.
|
|
c.applyFrontendState(fe.GetFrontendName(), tr.To)
|
|
if tr.From == tr.To {
|
|
return
|
|
}
|
|
payload, _ := json.Marshal(FrontendEventPayload{
|
|
Frontend: fe.GetFrontendName(),
|
|
Transition: *tr,
|
|
})
|
|
c.broker.Publish(BrowserEvent{
|
|
Maglevd: c.name,
|
|
Type: "frontend",
|
|
AtUnixNs: tr.AtUnixNs,
|
|
Payload: payload,
|
|
})
|
|
}
|
|
}
|
|
|
|
// triggerConfigResync runs refreshAll off the event-dispatch goroutine
|
|
// (so the stream.Recv loop isn't blocked while the full config refetch
|
|
// hits several gRPC calls) and then publishes a BrowserEvent of type
|
|
// "resync" so every connected browser re-fetches /view/api/state from
|
|
// the now-fresh cache. Fired in response to a maglevd "config-reload-
|
|
// done" log event.
|
|
//
|
|
// The refresh-then-publish order matters: if we published first, the
|
|
// SPA would fetchState from a stale cache and display old data until
|
|
// the next 30s refresh tick. Running refreshAll synchronously inside
|
|
// this goroutine closes that window.
|
|
//
|
|
// The resync event goes through the normal broker → ring buffer path,
|
|
// so a browser that reconnects shortly after the reload (within the
|
|
// 30s / 2000-event replay window) still sees the resync on its first
|
|
// live event and re-hydrates without needing a separate out-of-band
|
|
// signal.
|
|
func (c *maglevClient) triggerConfigResync() {
|
|
go func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
if err := c.refreshAll(ctx); err != nil {
|
|
slog.Warn("config-resync-refresh", "maglevd", c.name, "err", err)
|
|
// Publish anyway — the SPA's refetch will see the
|
|
// cache in whatever state refreshAll left it, and
|
|
// the periodic refreshLoop will retry. Better than
|
|
// silently dropping the signal.
|
|
}
|
|
c.broker.Publish(BrowserEvent{
|
|
Maglevd: c.name,
|
|
Type: "resync",
|
|
AtUnixNs: time.Now().UnixNano(),
|
|
Payload: json.RawMessage("{}"),
|
|
})
|
|
}()
|
|
}
|
|
|
|
// applyFrontendState writes the given state into the cached frontend
|
|
// snapshot. Called both by synthetic replay events on subscribe and by
|
|
// live transitions afterwards.
|
|
func (c *maglevClient) applyFrontendState(name, state string) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
f, ok := c.cache.Frontends[name]
|
|
if !ok {
|
|
return
|
|
}
|
|
f.State = state
|
|
}
|
|
|
|
// applyVPPLogHeartbeat flips the cache.VPPState field based on the
|
|
// event's msg. vpp-connect and vpp-api-{send,recv}* are treated as
|
|
// "VPP is up" signals; vpp-disconnect flips to "down". Unrelated log
|
|
// events are a no-op. Called from handleEvent under the client's
|
|
// event-dispatch goroutine, so contention on mu is single-writer.
|
|
func (c *maglevClient) applyVPPLogHeartbeat(msg string) {
|
|
var newState string
|
|
switch {
|
|
case msg == "vpp-connect":
|
|
newState = "connected"
|
|
case msg == "vpp-disconnect":
|
|
newState = "disconnected"
|
|
case strings.HasPrefix(msg, "vpp-api-send") || strings.HasPrefix(msg, "vpp-api-recv"):
|
|
newState = "connected"
|
|
default:
|
|
return
|
|
}
|
|
c.mu.Lock()
|
|
if c.cache.VPPState == newState {
|
|
c.mu.Unlock()
|
|
return
|
|
}
|
|
c.cache.VPPState = newState
|
|
c.mu.Unlock()
|
|
payload, _ := json.Marshal(VPPStatusPayload{State: newState})
|
|
c.broker.Publish(BrowserEvent{
|
|
Maglevd: c.name,
|
|
Type: "vpp-status",
|
|
AtUnixNs: time.Now().UnixNano(),
|
|
Payload: payload,
|
|
})
|
|
// VPP just came back: pull fresh LB state so the bucket column
|
|
// repopulates immediately instead of waiting up to 30s for the
|
|
// next refresh tick. On vpp-disconnect the next fetch will fail
|
|
// and clear the cache, which is also the right behaviour.
|
|
c.triggerLBStateFetch()
|
|
}
|
|
|
|
func (c *maglevClient) applyBackendTransition(name string, tr *TransitionRecord) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
b, ok := c.cache.Backends[name]
|
|
if !ok {
|
|
// Partial-create fallback for a transition that arrives before
|
|
// the first refreshAll has seen this backend. The real fields
|
|
// (address, healthcheck, pool memberships) are filled in on
|
|
// the next refresh tick; here we just stamp Name so the entry
|
|
// exists.
|
|
b = &BackendSnapshot{Name: name}
|
|
c.cache.Backends[name] = b
|
|
c.cache.BackendsOrder = append(c.cache.BackendsOrder, name)
|
|
}
|
|
b.State = tr.To
|
|
// Derive Enabled from State. In maglevd, state="disabled" and
|
|
// config.enabled=false are two ways of expressing the same
|
|
// condition — DisableBackend / EnableBackend flip both together,
|
|
// and no other state corresponds to enabled=false. Keeping them
|
|
// in sync in the reducer closes a race where the cache's cached
|
|
// Enabled could lag behind state by up to a refreshLoop tick,
|
|
// causing the SPA to render a bogus [disabled] tag next to an
|
|
// "up" badge on a freshly-re-enabled backend.
|
|
b.Enabled = tr.To != "disabled"
|
|
b.LastTransition = tr
|
|
b.Transitions = append(b.Transitions, tr)
|
|
// Cap history to the most recent 20 entries to mirror what maglevd
|
|
// returns from GetBackend.
|
|
if len(b.Transitions) > 20 {
|
|
b.Transitions = b.Transitions[len(b.Transitions)-20:]
|
|
}
|
|
}
|
|
|
|
// refreshLoop pulls a fresh snapshot every 30s to catch anything the live
|
|
// event stream may have missed (e.g. during a brief gRPC reconnect).
|
|
func (c *maglevClient) refreshLoop(ctx context.Context) {
|
|
t := time.NewTicker(30 * time.Second)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
if err := c.refreshAll(ctx); err != nil {
|
|
slog.Debug("refresh-all", "maglevd", c.name, "err", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// healthLoop issues a cheap GetVPPInfo every 5s to surface connection drops
|
|
// quickly. Errors flip the connection indicator; recoveries trigger a
|
|
// refreshAll so the cache catches up.
|
|
func (c *maglevClient) healthLoop(ctx context.Context) {
|
|
t := time.NewTicker(5 * time.Second)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
hctx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
|
_, err := c.api.GetVPPInfo(hctx, &grpcapi.GetVPPInfoRequest{})
|
|
cancel()
|
|
if err != nil {
|
|
c.setConnected(false, err.Error())
|
|
} else {
|
|
c.setConnected(true, "")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---- proto → JSON helpers --------------------------------------------------
|
|
|
|
func frontendFromProto(fi *grpcapi.FrontendInfo) *FrontendSnapshot {
|
|
out := &FrontendSnapshot{
|
|
Name: fi.GetName(),
|
|
Address: fi.GetAddress(),
|
|
Protocol: fi.GetProtocol(),
|
|
Port: fi.GetPort(),
|
|
Description: fi.GetDescription(),
|
|
SrcIPSticky: fi.GetSrcIpSticky(),
|
|
}
|
|
for _, p := range fi.GetPools() {
|
|
ps := &PoolSnapshot{Name: p.GetName()}
|
|
for _, pb := range p.GetBackends() {
|
|
ps.Backends = append(ps.Backends, &PoolBackendSnapshot{
|
|
Name: pb.GetName(),
|
|
Weight: pb.GetWeight(),
|
|
EffectiveWeight: pb.GetEffectiveWeight(),
|
|
})
|
|
}
|
|
out.Pools = append(out.Pools, ps)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func backendFromProto(bi *grpcapi.BackendInfo) *BackendSnapshot {
|
|
out := &BackendSnapshot{
|
|
Name: bi.GetName(),
|
|
Address: bi.GetAddress(),
|
|
State: bi.GetState(),
|
|
Enabled: bi.GetEnabled(),
|
|
HealthCheck: bi.GetHealthcheck(),
|
|
}
|
|
// maglevd stores and returns transitions newest-first (it prepends
|
|
// in health.Backend.transition()). The client stores them
|
|
// oldest-first so applyBackendTransition can simply append new
|
|
// events to the end. Reverse on read to reconcile the two
|
|
// conventions — then out.Transitions[n-1] is the newest, which is
|
|
// the correct LastTransition.
|
|
trs := bi.GetTransitions()
|
|
out.Transitions = make([]*TransitionRecord, len(trs))
|
|
for i, t := range trs {
|
|
out.Transitions[len(trs)-1-i] = transitionFromProto(t)
|
|
}
|
|
if n := len(out.Transitions); n > 0 {
|
|
out.LastTransition = out.Transitions[n-1]
|
|
}
|
|
return out
|
|
}
|
|
|
|
func transitionFromProto(t *grpcapi.TransitionRecord) *TransitionRecord {
|
|
return &TransitionRecord{
|
|
From: t.GetFrom(),
|
|
To: t.GetTo(),
|
|
AtUnixNs: t.GetAtUnixNs(),
|
|
}
|
|
}
|
|
|
|
// triggerLBStateFetch sends a non-blocking wake to lbStateLoop. The
|
|
// channel has buffer 1 so coalesced bursts never block the publisher.
|
|
func (c *maglevClient) triggerLBStateFetch() {
|
|
select {
|
|
case c.lbWakeCh <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// lbStateLoop consumes wake signals and calls GetVPPLBState, with a
|
|
// leading+trailing-edge debounce so we never exceed one fetch per
|
|
// minLBInterval (1s). The leading edge means the very first wake after
|
|
// an idle period fires immediately — important so a single isolated
|
|
// transition isn't artificially delayed by a second. The trailing edge
|
|
// means a burst of wakes during the cool-down still gets one final
|
|
// fetch right after the gate opens, so the SPA always converges to a
|
|
// post-burst snapshot rather than missing the last update.
|
|
func (c *maglevClient) lbStateLoop(ctx context.Context) {
|
|
const minLBInterval = time.Second
|
|
var (
|
|
timer *time.Timer
|
|
lastFetch time.Time
|
|
)
|
|
timerCh := func() <-chan time.Time {
|
|
if timer == nil {
|
|
return nil
|
|
}
|
|
return timer.C
|
|
}
|
|
fire := func() {
|
|
if timer != nil {
|
|
if !timer.Stop() {
|
|
select {
|
|
case <-timer.C:
|
|
default:
|
|
}
|
|
}
|
|
timer = nil
|
|
}
|
|
c.fetchLBStateAndPublish(ctx)
|
|
lastFetch = time.Now()
|
|
}
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-c.lbWakeCh:
|
|
wait := minLBInterval - time.Since(lastFetch)
|
|
if wait <= 0 {
|
|
fire()
|
|
} else if timer == nil {
|
|
timer = time.NewTimer(wait)
|
|
}
|
|
case <-timerCh():
|
|
timer = nil
|
|
fire()
|
|
}
|
|
}
|
|
}
|
|
|
|
// fetchLBStateAndPublish runs one GetVPPLBState round-trip, rebuilds
|
|
// the per-frontend bucket map, swaps it into the cache, and broadcasts
|
|
// a "lb-state" BrowserEvent. On error the cache is cleared and an
|
|
// empty event is published so the SPA can switch the bucket column to
|
|
// em-dashes — clear-on-error is simpler than stale-but-visible and
|
|
// doesn't risk showing a confusing snapshot from before VPP died.
|
|
func (c *maglevClient) fetchLBStateAndPublish(ctx context.Context) {
|
|
fctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
lbs, err := c.api.GetVPPLBState(fctx, &grpcapi.GetVPPLBStateRequest{})
|
|
if err != nil {
|
|
c.mu.Lock()
|
|
had := c.cache.LBState != nil
|
|
c.cache.LBState = nil
|
|
c.mu.Unlock()
|
|
slog.Debug("lb-state-fetch", "maglevd", c.name, "err", err)
|
|
if had {
|
|
c.publishLBState(nil)
|
|
}
|
|
return
|
|
}
|
|
snap := c.buildLBStateSnapshot(lbs)
|
|
c.mu.Lock()
|
|
c.cache.LBState = snap
|
|
c.mu.Unlock()
|
|
c.publishLBState(snap.PerFrontend)
|
|
}
|
|
|
|
func (c *maglevClient) publishLBState(perFrontend map[string]map[string]int32) {
|
|
payload, _ := json.Marshal(LBStatePayload{PerFrontend: perFrontend})
|
|
c.broker.Publish(BrowserEvent{
|
|
Maglevd: c.name,
|
|
Type: "lb-state",
|
|
AtUnixNs: time.Now().UnixNano(),
|
|
Payload: payload,
|
|
})
|
|
}
|
|
|
|
// buildLBStateSnapshot translates a VPP-side state record (keyed by
|
|
// CIDR/protocol/port and AS address) into a maglev-side record (keyed
|
|
// by frontend name and backend name). Unmatched VIPs and unmatched AS
|
|
// addresses are silently skipped — they're benign side effects of a
|
|
// transient sync gap or a backend address that's only present in one
|
|
// of the two universes.
|
|
func (c *maglevClient) buildLBStateSnapshot(lbs *grpcapi.VPPLBState) *LBStateSnapshot {
|
|
c.mu.RLock()
|
|
feByVIP := make(map[string]string, len(c.cache.Frontends))
|
|
for _, f := range c.cache.Frontends {
|
|
feByVIP[lbVIPKey(f.Address, f.Protocol, f.Port)] = f.Name
|
|
}
|
|
backendByAddr := make(map[string]string, len(c.cache.Backends))
|
|
for _, b := range c.cache.Backends {
|
|
backendByAddr[b.Address] = b.Name
|
|
}
|
|
c.mu.RUnlock()
|
|
|
|
out := &LBStateSnapshot{PerFrontend: map[string]map[string]int32{}}
|
|
for _, v := range lbs.GetVips() {
|
|
feName, ok := feByVIP[lbVIPKey(stripLBHostMask(v.GetPrefix()), lbProtoString(v.GetProtocol()), v.GetPort())]
|
|
if !ok {
|
|
continue
|
|
}
|
|
row := out.PerFrontend[feName]
|
|
if row == nil {
|
|
row = map[string]int32{}
|
|
out.PerFrontend[feName] = row
|
|
}
|
|
for _, as := range v.GetApplicationServers() {
|
|
bname, ok := backendByAddr[as.GetAddress()]
|
|
if !ok {
|
|
continue
|
|
}
|
|
row[bname] = int32(as.GetNumBuckets())
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
// lbVIPKey is the join key between a maglev FrontendSnapshot and a
|
|
// VPP-side VPPLBVIP record. Stripping the mask and lower-casing the
|
|
// protocol gives a canonical form that both sides can produce.
|
|
func lbVIPKey(addr, proto string, port uint32) string {
|
|
return fmt.Sprintf("%s/%s/%d", addr, strings.ToLower(proto), port)
|
|
}
|
|
|
|
// lbProtoString mirrors maglevc's protoString — kept local to avoid a
|
|
// cross-package import for two trivial helpers.
|
|
func lbProtoString(p uint32) string {
|
|
switch p {
|
|
case 6:
|
|
return "tcp"
|
|
case 17:
|
|
return "udp"
|
|
case 255:
|
|
return "any"
|
|
}
|
|
return fmt.Sprintf("%d", p)
|
|
}
|
|
|
|
// stripLBHostMask trims "/32" or "/128" from a VPP host-prefix VIP so
|
|
// it can be compared against a maglev FrontendSnapshot.Address (which
|
|
// is bare). Other shapes are returned unchanged.
|
|
func stripLBHostMask(prefix string) string {
|
|
if strings.HasSuffix(prefix, "/32") || strings.HasSuffix(prefix, "/128") {
|
|
return prefix[:strings.LastIndexByte(prefix, '/')]
|
|
}
|
|
return prefix
|
|
}
|
|
|
|
func healthCheckFromProto(h *grpcapi.HealthCheckInfo) *HealthCheckSnapshot {
|
|
return &HealthCheckSnapshot{
|
|
Name: h.GetName(),
|
|
Type: h.GetType(),
|
|
Port: h.GetPort(),
|
|
IntervalNs: h.GetIntervalNs(),
|
|
FastIntervalNs: h.GetFastIntervalNs(),
|
|
DownIntervalNs: h.GetDownIntervalNs(),
|
|
TimeoutNs: h.GetTimeoutNs(),
|
|
Rise: h.GetRise(),
|
|
Fall: h.GetFall(),
|
|
}
|
|
}
|