Files
vpp-maglev/cmd/frontend/broker.go

198 lines
5.3 KiB
Go

// SPDX-License-Identifier: Apache-2.0
package main
import (
"fmt"
"strconv"
"strings"
"sync"
"time"
)
const (
bufferMaxAge = 30 * time.Second
bufferMaxCount = 2000
subscriberBuf = 256
)
// Broker is a pub/sub fan-out with a bounded replay ring. The watchLoop
// goroutines publish; SSE handlers subscribe. The ring lets reconnecting
// browsers replay events they missed during short disconnects (the common
// nginx-idle-reconnect / wifi-flap / laptop-wake case) without triggering a
// full state refetch.
type Broker struct {
mu sync.Mutex
epoch int64 // ns at broker start; folded into every event ID
nextSeq uint64 // monotonic within this epoch
buffer []bufferedEvent
subs map[chan deliveredEvent]struct{}
}
type bufferedEvent struct {
ID string
AtUnixNs int64
Event BrowserEvent
}
// deliveredEvent is what subscribers receive: the event plus its ring ID
// so the SSE handler can emit `id:` lines without re-deriving it.
type deliveredEvent struct {
ID string
Event BrowserEvent
}
// NewBroker creates a Broker with a fresh epoch.
func NewBroker() *Broker {
return &Broker{
epoch: time.Now().UnixNano(),
subs: map[chan deliveredEvent]struct{}{},
}
}
// Epoch returns the broker's epoch (ns at startup). Exposed for testing.
func (b *Broker) Epoch() int64 { return b.epoch }
// Publish assigns the event an ID, evicts aged/overflow entries from the
// ring, appends it, and fans out to every live subscriber.
func (b *Broker) Publish(ev BrowserEvent) {
b.mu.Lock()
defer b.mu.Unlock()
seq := b.nextSeq
b.nextSeq++
id := fmt.Sprintf("%d-%d", b.epoch, seq)
at := ev.AtUnixNs
if at == 0 {
at = time.Now().UnixNano()
}
entry := bufferedEvent{ID: id, AtUnixNs: at, Event: ev}
cutoff := time.Now().UnixNano() - int64(bufferMaxAge)
i := 0
for i < len(b.buffer) && b.buffer[i].AtUnixNs < cutoff {
i++
}
if i > 0 {
b.buffer = b.buffer[i:]
}
if len(b.buffer) >= bufferMaxCount {
b.buffer = b.buffer[len(b.buffer)-bufferMaxCount+1:]
}
b.buffer = append(b.buffer, entry)
delivered := deliveredEvent{ID: id, Event: ev}
for ch := range b.subs {
select {
case ch <- delivered:
default:
// Drop the oldest queued event for this subscriber to make
// room. If we're still wedged, give up for this publish —
// the next replay on reconnect will heal the gap.
select {
case <-ch:
default:
}
select {
case ch <- delivered:
default:
}
}
}
}
// subscribeResult is what Subscribe returns to the SSE handler.
type subscribeResult struct {
// ReplayEvents contains any buffered events newer than the client's
// Last-Event-ID, ordered oldest-first. The handler must emit these
// before streaming from Channel to preserve ordering.
ReplayEvents []deliveredEvent
// NeedResync is true when the handler should emit a "resync" event
// telling the client to re-fetch state (no header, epoch mismatch, or
// seq fell off the ring).
NeedResync bool
// Channel delivers subsequent live events.
Channel chan deliveredEvent
}
// Subscribe registers a new subscriber and returns any replay events plus a
// live delivery channel. The caller must call Unsubscribe when done.
//
// Holding b.mu across "collect replay + register channel" is the critical
// invariant: it ensures every event is delivered exactly once, either via
// replay or via the live channel. Dropping the lock between those two steps
// would open a race where a publish is lost.
func (b *Broker) Subscribe(lastEventID string) subscribeResult {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan deliveredEvent, subscriberBuf)
b.subs[ch] = struct{}{}
replay, needResync := b.collectReplay(lastEventID)
return subscribeResult{
ReplayEvents: replay,
NeedResync: needResync,
Channel: ch,
}
}
// collectReplay returns any events newer than lastEventID, or signals that a
// resync is required. Must be called with b.mu held.
func (b *Broker) collectReplay(lastEventID string) ([]deliveredEvent, bool) {
if lastEventID == "" {
return nil, true
}
epochStr, seqStr, ok := strings.Cut(lastEventID, "-")
if !ok {
return nil, true
}
epoch, err := strconv.ParseInt(epochStr, 10, 64)
if err != nil || epoch != b.epoch {
return nil, true
}
lastSeq, err := strconv.ParseUint(seqStr, 10, 64)
if err != nil {
return nil, true
}
if len(b.buffer) == 0 {
// No events since last seen. Safe to continue without resync if
// lastSeq is at or behind nextSeq; otherwise something is wrong
// and we resync.
if lastSeq+1 >= b.nextSeq {
return nil, false
}
return nil, true
}
// Parse the oldest buffered seq to decide whether lastSeq is still
// within range.
_, oldestSeqStr, _ := strings.Cut(b.buffer[0].ID, "-")
oldestSeq, err := strconv.ParseUint(oldestSeqStr, 10, 64)
if err != nil {
return nil, true
}
if lastSeq+1 < oldestSeq {
// Gap: client missed events that have since been evicted.
return nil, true
}
out := make([]deliveredEvent, 0, len(b.buffer))
for _, e := range b.buffer {
_, seqStr, _ := strings.Cut(e.ID, "-")
seq, _ := strconv.ParseUint(seqStr, 10, 64)
if seq > lastSeq {
out = append(out, deliveredEvent{ID: e.ID, Event: e.Event})
}
}
return out, false
}
// Unsubscribe removes a subscriber and closes its channel.
func (b *Broker) Unsubscribe(ch chan deliveredEvent) {
b.mu.Lock()
defer b.mu.Unlock()
if _, ok := b.subs[ch]; ok {
delete(b.subs, ch)
close(ch)
}
}