// Copyright (c) 2026, Pim van Pelt package main import ( "fmt" "strconv" "strings" "sync" "time" ) const ( bufferMaxAge = 30 * time.Second bufferMaxCount = 2000 subscriberBuf = 256 ) // Broker is a pub/sub fan-out with a bounded replay ring. The watchLoop // goroutines publish; SSE handlers subscribe. The ring lets reconnecting // browsers replay events they missed during short disconnects (the common // nginx-idle-reconnect / wifi-flap / laptop-wake case) without triggering a // full state refetch. type Broker struct { mu sync.Mutex epoch int64 // ns at broker start; folded into every event ID nextSeq uint64 // monotonic within this epoch buffer []bufferedEvent subs map[chan deliveredEvent]struct{} } type bufferedEvent struct { ID string AtUnixNs int64 Event BrowserEvent } // deliveredEvent is what subscribers receive: the event plus its ring ID // so the SSE handler can emit `id:` lines without re-deriving it. type deliveredEvent struct { ID string Event BrowserEvent } // NewBroker creates a Broker with a fresh epoch. func NewBroker() *Broker { return &Broker{ epoch: time.Now().UnixNano(), subs: map[chan deliveredEvent]struct{}{}, } } // Epoch returns the broker's epoch (ns at startup). Exposed for testing. func (b *Broker) Epoch() int64 { return b.epoch } // Publish assigns the event an ID, evicts aged/overflow entries from the // ring, appends it, and fans out to every live subscriber. func (b *Broker) Publish(ev BrowserEvent) { b.mu.Lock() defer b.mu.Unlock() seq := b.nextSeq b.nextSeq++ id := fmt.Sprintf("%d-%d", b.epoch, seq) at := ev.AtUnixNs if at == 0 { at = time.Now().UnixNano() } entry := bufferedEvent{ID: id, AtUnixNs: at, Event: ev} cutoff := time.Now().UnixNano() - int64(bufferMaxAge) i := 0 for i < len(b.buffer) && b.buffer[i].AtUnixNs < cutoff { i++ } if i > 0 { b.buffer = b.buffer[i:] } if len(b.buffer) >= bufferMaxCount { b.buffer = b.buffer[len(b.buffer)-bufferMaxCount+1:] } b.buffer = append(b.buffer, entry) delivered := deliveredEvent{ID: id, Event: ev} for ch := range b.subs { select { case ch <- delivered: default: // Drop the oldest queued event for this subscriber to make // room. If we're still wedged, give up for this publish — // the next replay on reconnect will heal the gap. select { case <-ch: default: } select { case ch <- delivered: default: } } } } // subscribeResult is what Subscribe returns to the SSE handler. type subscribeResult struct { // ReplayEvents contains any buffered events newer than the client's // Last-Event-ID, ordered oldest-first. The handler must emit these // before streaming from Channel to preserve ordering. ReplayEvents []deliveredEvent // NeedResync is true when the handler should emit a "resync" event // telling the client to re-fetch state (no header, epoch mismatch, or // seq fell off the ring). NeedResync bool // Channel delivers subsequent live events. Channel chan deliveredEvent } // Subscribe registers a new subscriber and returns any replay events plus a // live delivery channel. The caller must call Unsubscribe when done. // // Holding b.mu across "collect replay + register channel" is the critical // invariant: it ensures every event is delivered exactly once, either via // replay or via the live channel. Dropping the lock between those two steps // would open a race where a publish is lost. func (b *Broker) Subscribe(lastEventID string) subscribeResult { b.mu.Lock() defer b.mu.Unlock() ch := make(chan deliveredEvent, subscriberBuf) b.subs[ch] = struct{}{} replay, needResync := b.collectReplay(lastEventID) return subscribeResult{ ReplayEvents: replay, NeedResync: needResync, Channel: ch, } } // collectReplay returns any events newer than lastEventID, or signals that a // resync is required. Must be called with b.mu held. func (b *Broker) collectReplay(lastEventID string) ([]deliveredEvent, bool) { if lastEventID == "" { return nil, true } epochStr, seqStr, ok := strings.Cut(lastEventID, "-") if !ok { return nil, true } epoch, err := strconv.ParseInt(epochStr, 10, 64) if err != nil || epoch != b.epoch { return nil, true } lastSeq, err := strconv.ParseUint(seqStr, 10, 64) if err != nil { return nil, true } if len(b.buffer) == 0 { // No events since last seen. Safe to continue without resync if // lastSeq is at or behind nextSeq; otherwise something is wrong // and we resync. if lastSeq+1 >= b.nextSeq { return nil, false } return nil, true } // Parse the oldest buffered seq to decide whether lastSeq is still // within range. _, oldestSeqStr, _ := strings.Cut(b.buffer[0].ID, "-") oldestSeq, err := strconv.ParseUint(oldestSeqStr, 10, 64) if err != nil { return nil, true } if lastSeq+1 < oldestSeq { // Gap: client missed events that have since been evicted. return nil, true } out := make([]deliveredEvent, 0, len(b.buffer)) for _, e := range b.buffer { _, seqStr, _ := strings.Cut(e.ID, "-") seq, _ := strconv.ParseUint(seqStr, 10, 64) if seq > lastSeq { out = append(out, deliveredEvent{ID: e.ID, Event: e.Event}) } } return out, false } // Unsubscribe removes a subscriber and closes its channel. func (b *Broker) Unsubscribe(ch chan deliveredEvent) { b.mu.Lock() defer b.mu.Unlock() if _, ok := b.subs[ch]; ok { delete(b.subs, ch) close(ch) } }