114 lines
2.9 KiB
Go
114 lines
2.9 KiB
Go
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func mkEvent(i int) BrowserEvent {
|
|
p, _ := json.Marshal(map[string]int{"i": i})
|
|
return BrowserEvent{
|
|
Maglevd: "lb",
|
|
Type: "backend",
|
|
AtUnixNs: time.Now().UnixNano(),
|
|
Payload: p,
|
|
}
|
|
}
|
|
|
|
func TestBrokerSubscribeNoHeaderResync(t *testing.T) {
|
|
b := NewBroker()
|
|
b.Publish(mkEvent(1))
|
|
|
|
res := b.Subscribe("")
|
|
defer b.Unsubscribe(res.Channel)
|
|
if !res.NeedResync {
|
|
t.Errorf("expected NeedResync=true when Last-Event-ID is empty")
|
|
}
|
|
if len(res.ReplayEvents) != 0 {
|
|
t.Errorf("expected no replay events, got %d", len(res.ReplayEvents))
|
|
}
|
|
}
|
|
|
|
func TestBrokerReplayMatchingEpoch(t *testing.T) {
|
|
b := NewBroker()
|
|
b.Publish(mkEvent(1))
|
|
b.Publish(mkEvent(2))
|
|
b.Publish(mkEvent(3))
|
|
|
|
// Publishes used seqs 0,1,2. Client last saw seq 0, so we expect
|
|
// replay of seqs 1 and 2.
|
|
lastID := fmt.Sprintf("%d-0", b.Epoch())
|
|
res := b.Subscribe(lastID)
|
|
defer b.Unsubscribe(res.Channel)
|
|
if res.NeedResync {
|
|
t.Errorf("expected no resync when seqs are in buffer")
|
|
}
|
|
if len(res.ReplayEvents) != 2 {
|
|
t.Fatalf("expected 2 replay events (seqs 1,2), got %d", len(res.ReplayEvents))
|
|
}
|
|
if res.ReplayEvents[0].ID != fmt.Sprintf("%d-1", b.Epoch()) {
|
|
t.Errorf("replay[0] ID = %q, want epoch-1", res.ReplayEvents[0].ID)
|
|
}
|
|
}
|
|
|
|
func TestBrokerEpochMismatchResyncs(t *testing.T) {
|
|
b := NewBroker()
|
|
b.Publish(mkEvent(1))
|
|
|
|
res := b.Subscribe("9999-0")
|
|
defer b.Unsubscribe(res.Channel)
|
|
if !res.NeedResync {
|
|
t.Errorf("expected resync on epoch mismatch")
|
|
}
|
|
}
|
|
|
|
func TestBrokerLiveDelivery(t *testing.T) {
|
|
b := NewBroker()
|
|
res := b.Subscribe("")
|
|
defer b.Unsubscribe(res.Channel)
|
|
b.Publish(mkEvent(42))
|
|
select {
|
|
case ev := <-res.Channel:
|
|
if ev.ID == "" {
|
|
t.Errorf("live event should have an ID")
|
|
}
|
|
case <-time.After(500 * time.Millisecond):
|
|
t.Fatalf("timed out waiting for live event delivery")
|
|
}
|
|
}
|
|
|
|
func TestBrokerExactlyOnceOverSubscribeBoundary(t *testing.T) {
|
|
// Invariant: an event published while a subscriber is mid-subscribe
|
|
// should be delivered exactly once — either via replay or via the
|
|
// live channel. We approximate by publishing, subscribing with the
|
|
// previous event ID, and checking that exactly one delivery happens.
|
|
b := NewBroker()
|
|
b.Publish(mkEvent(1)) // seq 0
|
|
b.Publish(mkEvent(2)) // seq 1
|
|
lastID := fmt.Sprintf("%d-0", b.Epoch())
|
|
res := b.Subscribe(lastID)
|
|
defer b.Unsubscribe(res.Channel)
|
|
|
|
// Expect seq 1 in replay, nothing on the live channel yet.
|
|
if len(res.ReplayEvents) != 1 {
|
|
t.Fatalf("expected 1 replay event, got %d", len(res.ReplayEvents))
|
|
}
|
|
select {
|
|
case <-res.Channel:
|
|
t.Fatalf("unexpected live event without any publish")
|
|
case <-time.After(50 * time.Millisecond):
|
|
}
|
|
|
|
// New publish should arrive live.
|
|
b.Publish(mkEvent(3))
|
|
select {
|
|
case <-res.Channel:
|
|
case <-time.After(500 * time.Millisecond):
|
|
t.Fatalf("new publish not delivered live")
|
|
}
|
|
}
|