Files
vpp-maglev/cmd/frontend/broker_test.go

114 lines
2.9 KiB
Go

// SPDX-License-Identifier: Apache-2.0
package main
import (
"encoding/json"
"fmt"
"testing"
"time"
)
func mkEvent(i int) BrowserEvent {
p, _ := json.Marshal(map[string]int{"i": i})
return BrowserEvent{
Maglevd: "lb",
Type: "backend",
AtUnixNs: time.Now().UnixNano(),
Payload: p,
}
}
func TestBrokerSubscribeNoHeaderResync(t *testing.T) {
b := NewBroker()
b.Publish(mkEvent(1))
res := b.Subscribe("")
defer b.Unsubscribe(res.Channel)
if !res.NeedResync {
t.Errorf("expected NeedResync=true when Last-Event-ID is empty")
}
if len(res.ReplayEvents) != 0 {
t.Errorf("expected no replay events, got %d", len(res.ReplayEvents))
}
}
func TestBrokerReplayMatchingEpoch(t *testing.T) {
b := NewBroker()
b.Publish(mkEvent(1))
b.Publish(mkEvent(2))
b.Publish(mkEvent(3))
// Publishes used seqs 0,1,2. Client last saw seq 0, so we expect
// replay of seqs 1 and 2.
lastID := fmt.Sprintf("%d-0", b.Epoch())
res := b.Subscribe(lastID)
defer b.Unsubscribe(res.Channel)
if res.NeedResync {
t.Errorf("expected no resync when seqs are in buffer")
}
if len(res.ReplayEvents) != 2 {
t.Fatalf("expected 2 replay events (seqs 1,2), got %d", len(res.ReplayEvents))
}
if res.ReplayEvents[0].ID != fmt.Sprintf("%d-1", b.Epoch()) {
t.Errorf("replay[0] ID = %q, want epoch-1", res.ReplayEvents[0].ID)
}
}
func TestBrokerEpochMismatchResyncs(t *testing.T) {
b := NewBroker()
b.Publish(mkEvent(1))
res := b.Subscribe("9999-0")
defer b.Unsubscribe(res.Channel)
if !res.NeedResync {
t.Errorf("expected resync on epoch mismatch")
}
}
func TestBrokerLiveDelivery(t *testing.T) {
b := NewBroker()
res := b.Subscribe("")
defer b.Unsubscribe(res.Channel)
b.Publish(mkEvent(42))
select {
case ev := <-res.Channel:
if ev.ID == "" {
t.Errorf("live event should have an ID")
}
case <-time.After(500 * time.Millisecond):
t.Fatalf("timed out waiting for live event delivery")
}
}
func TestBrokerExactlyOnceOverSubscribeBoundary(t *testing.T) {
// Invariant: an event published while a subscriber is mid-subscribe
// should be delivered exactly once — either via replay or via the
// live channel. We approximate by publishing, subscribing with the
// previous event ID, and checking that exactly one delivery happens.
b := NewBroker()
b.Publish(mkEvent(1)) // seq 0
b.Publish(mkEvent(2)) // seq 1
lastID := fmt.Sprintf("%d-0", b.Epoch())
res := b.Subscribe(lastID)
defer b.Unsubscribe(res.Channel)
// Expect seq 1 in replay, nothing on the live channel yet.
if len(res.ReplayEvents) != 1 {
t.Fatalf("expected 1 replay event, got %d", len(res.ReplayEvents))
}
select {
case <-res.Channel:
t.Fatalf("unexpected live event without any publish")
case <-time.After(50 * time.Millisecond):
}
// New publish should arrive live.
b.Publish(mkEvent(3))
select {
case <-res.Channel:
case <-time.After(500 * time.Millisecond):
t.Fatalf("new publish not delivered live")
}
}