Revision: Rename to 'maglevd'; Refactor config structure
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"net"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -12,30 +13,35 @@ import (
|
||||
"git.ipng.ch/ipng/vpp-maglev/internal/prober"
|
||||
)
|
||||
|
||||
// Event is emitted on every backend state transition.
|
||||
type Event struct {
|
||||
VIPName string
|
||||
Backend net.IP
|
||||
Transition health.Transition
|
||||
// BackendSnapshot combines the live health state with the config entry for a backend.
|
||||
type BackendSnapshot struct {
|
||||
Health *health.Backend
|
||||
Config config.Backend
|
||||
}
|
||||
|
||||
type backendKey struct {
|
||||
VIPName string
|
||||
Backend string // net.IP.String()
|
||||
// Event is emitted on every backend state transition, once per frontend that
|
||||
// references the backend.
|
||||
type Event struct {
|
||||
FrontendName string
|
||||
BackendName string
|
||||
Backend net.IP
|
||||
Transition health.Transition
|
||||
}
|
||||
|
||||
type worker struct {
|
||||
backend *health.Backend
|
||||
hc config.HealthCheck
|
||||
vip config.VIP
|
||||
entry config.Backend
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// Checker orchestrates health probing for all VIP:backend tuples.
|
||||
// Checker orchestrates health probing for all backends.
|
||||
// Each backend is probed exactly once, regardless of how many frontends
|
||||
// reference it.
|
||||
type Checker struct {
|
||||
cfg *config.Frontend
|
||||
cfg *config.Config
|
||||
mu sync.RWMutex
|
||||
workers map[backendKey]*worker
|
||||
workers map[string]*worker // keyed by backend name
|
||||
|
||||
subsMu sync.Mutex
|
||||
nextID int
|
||||
@@ -44,10 +50,10 @@ type Checker struct {
|
||||
}
|
||||
|
||||
// New creates a Checker. Call Run to start probing.
|
||||
func New(cfg *config.Frontend) *Checker {
|
||||
func New(cfg *config.Config) *Checker {
|
||||
return &Checker{
|
||||
cfg: cfg,
|
||||
workers: make(map[backendKey]*worker),
|
||||
workers: make(map[string]*worker),
|
||||
subs: make(map[int]chan Event),
|
||||
eventCh: make(chan Event, 256),
|
||||
}
|
||||
@@ -58,13 +64,11 @@ func (c *Checker) Run(ctx context.Context) error {
|
||||
go c.fanOut(ctx)
|
||||
|
||||
c.mu.Lock()
|
||||
total := totalBackends(c.cfg)
|
||||
pos := 0
|
||||
for vipName, vip := range c.cfg.VIPs {
|
||||
for _, backend := range vip.Backends {
|
||||
c.startWorker(ctx, vipName, vip, backend, pos, total)
|
||||
pos++
|
||||
}
|
||||
names := activeBackendNames(c.cfg)
|
||||
for i, name := range names {
|
||||
b := c.cfg.Backends[name]
|
||||
hc := c.cfg.HealthChecks[b.HealthCheck]
|
||||
c.startWorker(ctx, name, b, hc, i, len(names))
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
@@ -73,55 +77,45 @@ func (c *Checker) Run(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// Reload applies a new config without restarting the process.
|
||||
// New tuples are added, removed tuples are stopped, changed tuples are restarted.
|
||||
// Existing tuples with unchanged healthcheck config continue uninterrupted.
|
||||
func (c *Checker) Reload(ctx context.Context, cfg *config.Frontend) error {
|
||||
// New backends are added, removed backends are stopped, changed backends are
|
||||
// restarted. Backends whose healthcheck config is unchanged continue
|
||||
// uninterrupted, even if the set of frontends referencing them changes.
|
||||
func (c *Checker) Reload(ctx context.Context, cfg *config.Config) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
type desired struct {
|
||||
vipName string
|
||||
vip config.VIP
|
||||
backend net.IP
|
||||
}
|
||||
desiredMap := map[backendKey]desired{}
|
||||
for vipName, vip := range cfg.VIPs {
|
||||
for _, backend := range vip.Backends {
|
||||
key := backendKey{VIPName: vipName, Backend: backend.String()}
|
||||
desiredMap[key] = desired{vipName: vipName, vip: vip, backend: backend}
|
||||
}
|
||||
desired := map[string]struct{}{}
|
||||
for _, name := range activeBackendNames(cfg) {
|
||||
desired[name] = struct{}{}
|
||||
}
|
||||
|
||||
// Stop workers no longer in config.
|
||||
for key, w := range c.workers {
|
||||
if _, ok := desiredMap[key]; !ok {
|
||||
slog.Info("backend-stop", "vip", key.VIPName, "backend", key.Backend)
|
||||
// Stop workers no longer needed.
|
||||
for name, w := range c.workers {
|
||||
if _, ok := desired[name]; !ok {
|
||||
slog.Info("backend-stop", "backend", name)
|
||||
w.cancel()
|
||||
delete(c.workers, key)
|
||||
delete(c.workers, name)
|
||||
}
|
||||
}
|
||||
|
||||
// Add new or restart changed workers.
|
||||
total := len(desiredMap)
|
||||
pos := 0
|
||||
for key, d := range desiredMap {
|
||||
if w, ok := c.workers[key]; ok {
|
||||
if healthCheckEqual(w.hc, d.vip.HealthCheck) {
|
||||
pos++
|
||||
names := activeBackendNames(cfg)
|
||||
for i, name := range names {
|
||||
b := cfg.Backends[name]
|
||||
hc := cfg.HealthChecks[b.HealthCheck]
|
||||
if w, ok := c.workers[name]; ok {
|
||||
if healthCheckEqual(w.hc, hc) {
|
||||
// Update entry metadata (weight, etc.) in place without restart.
|
||||
w.entry = b
|
||||
continue
|
||||
}
|
||||
slog.Info("backend-restart", "vip", key.VIPName, "backend", key.Backend)
|
||||
slog.Info("backend-restart", "backend", name)
|
||||
w.cancel()
|
||||
w.hc = d.vip.HealthCheck
|
||||
w.vip = d.vip
|
||||
wCtx, cancel := context.WithCancel(ctx)
|
||||
w.cancel = cancel
|
||||
go c.runProbe(wCtx, key, 0, 1) // no stagger on reload
|
||||
c.startWorker(ctx, name, b, hc, i, len(names))
|
||||
} else {
|
||||
slog.Info("backend-start", "vip", d.vipName, "backend", d.backend)
|
||||
c.startWorker(ctx, d.vipName, d.vip, d.backend, pos, total)
|
||||
slog.Info("backend-start", "backend", name)
|
||||
c.startWorker(ctx, name, b, hc, i, len(names))
|
||||
}
|
||||
pos++
|
||||
}
|
||||
|
||||
c.cfg = cfg
|
||||
@@ -145,109 +139,142 @@ func (c *Checker) Subscribe() (<-chan Event, func()) {
|
||||
}
|
||||
}
|
||||
|
||||
// ListVIPs returns the names of all configured VIPs.
|
||||
func (c *Checker) ListVIPs() []string {
|
||||
// ListFrontends returns the names of all configured frontends.
|
||||
func (c *Checker) ListFrontends() []string {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
names := make([]string, 0, len(c.cfg.VIPs))
|
||||
for name := range c.cfg.VIPs {
|
||||
names := make([]string, 0, len(c.cfg.Frontends))
|
||||
for name := range c.cfg.Frontends {
|
||||
names = append(names, name)
|
||||
}
|
||||
return names
|
||||
}
|
||||
|
||||
// GetVIP returns the VIP config for the given name.
|
||||
func (c *Checker) GetVIP(name string) (config.VIP, bool) {
|
||||
// GetFrontend returns the frontend config for the given name.
|
||||
func (c *Checker) GetFrontend(name string) (config.Frontend, bool) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
v, ok := c.cfg.VIPs[name]
|
||||
v, ok := c.cfg.Frontends[name]
|
||||
return v, ok
|
||||
}
|
||||
|
||||
// ListBackends returns the backend states for all backends of a VIP.
|
||||
func (c *Checker) ListBackends(vipName string) []*health.Backend {
|
||||
// ListHealthChecks returns the names of all configured health checks, sorted.
|
||||
func (c *Checker) ListHealthChecks() []string {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
names := make([]string, 0, len(c.cfg.HealthChecks))
|
||||
for name := range c.cfg.HealthChecks {
|
||||
names = append(names, name)
|
||||
}
|
||||
sort.Strings(names)
|
||||
return names
|
||||
}
|
||||
|
||||
// GetHealthCheck returns the config for a health check by name.
|
||||
func (c *Checker) GetHealthCheck(name string) (config.HealthCheck, bool) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
hc, ok := c.cfg.HealthChecks[name]
|
||||
return hc, ok
|
||||
}
|
||||
|
||||
// ListBackends returns the names of all active backends.
|
||||
func (c *Checker) ListBackends() []string {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
names := make([]string, 0, len(c.workers))
|
||||
for name := range c.workers {
|
||||
names = append(names, name)
|
||||
}
|
||||
sort.Strings(names)
|
||||
return names
|
||||
}
|
||||
|
||||
// ListFrontendBackends returns the backend health states for all backends of a frontend.
|
||||
func (c *Checker) ListFrontendBackends(frontendName string) []*health.Backend {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
fe, ok := c.cfg.Frontends[frontendName]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
var out []*health.Backend
|
||||
for key, w := range c.workers {
|
||||
if key.VIPName == vipName {
|
||||
for _, name := range fe.Backends {
|
||||
if w, ok := c.workers[name]; ok {
|
||||
out = append(out, w.backend)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// GetBackend returns the backend state for a specific VIP:backend tuple.
|
||||
func (c *Checker) GetBackend(vipName, backendAddr string) (*health.Backend, bool) {
|
||||
// GetBackend returns a snapshot of the health state and config for a backend by name.
|
||||
func (c *Checker) GetBackend(name string) (BackendSnapshot, bool) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
key := backendKey{VIPName: vipName, Backend: backendAddr}
|
||||
w, ok := c.workers[key]
|
||||
w, ok := c.workers[name]
|
||||
if !ok {
|
||||
return nil, false
|
||||
return BackendSnapshot{}, false
|
||||
}
|
||||
return w.backend, true
|
||||
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
|
||||
}
|
||||
|
||||
// PauseBackend pauses health checking for a specific backend.
|
||||
func (c *Checker) PauseBackend(vipName, backendAddr string) (*health.Backend, bool) {
|
||||
// PauseBackend pauses health checking for a backend by name.
|
||||
func (c *Checker) PauseBackend(name string) (BackendSnapshot, bool) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
key := backendKey{VIPName: vipName, Backend: backendAddr}
|
||||
w, ok := c.workers[key]
|
||||
w, ok := c.workers[name]
|
||||
if !ok {
|
||||
return nil, false
|
||||
return BackendSnapshot{}, false
|
||||
}
|
||||
maxHistory := c.cfg.HealthChecker.TransitionHistory
|
||||
if w.backend.Pause(maxHistory) {
|
||||
slog.Info("backend-pause", "vip", vipName, "backend", backendAddr)
|
||||
c.emit(Event{VIPName: vipName, Backend: w.backend.Address, Transition: w.backend.Transitions[0]})
|
||||
slog.Info("backend-pause", "backend", name)
|
||||
c.emitForBackend(name, w.backend.Address, w.backend.Transitions[0])
|
||||
}
|
||||
return w.backend, true
|
||||
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
|
||||
}
|
||||
|
||||
// ResumeBackend resumes health checking for a specific backend.
|
||||
func (c *Checker) ResumeBackend(vipName, backendAddr string) (*health.Backend, bool) {
|
||||
// ResumeBackend resumes health checking for a backend by name.
|
||||
func (c *Checker) ResumeBackend(name string) (BackendSnapshot, bool) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
key := backendKey{VIPName: vipName, Backend: backendAddr}
|
||||
w, ok := c.workers[key]
|
||||
w, ok := c.workers[name]
|
||||
if !ok {
|
||||
return nil, false
|
||||
return BackendSnapshot{}, false
|
||||
}
|
||||
maxHistory := c.cfg.HealthChecker.TransitionHistory
|
||||
if w.backend.Resume(maxHistory) {
|
||||
slog.Info("backend-resume", "vip", vipName, "backend", backendAddr)
|
||||
c.emit(Event{VIPName: vipName, Backend: w.backend.Address, Transition: w.backend.Transitions[0]})
|
||||
slog.Info("backend-resume", "backend", name)
|
||||
c.emitForBackend(name, w.backend.Address, w.backend.Transitions[0])
|
||||
}
|
||||
return w.backend, true
|
||||
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
|
||||
}
|
||||
|
||||
// ---- internal --------------------------------------------------------------
|
||||
|
||||
// startWorker creates a Backend and launches a probe goroutine.
|
||||
// pos and total are used to compute the startup stagger delay.
|
||||
// Must be called with c.mu held.
|
||||
func (c *Checker) startWorker(ctx context.Context, vipName string, vip config.VIP, backend net.IP, pos, total int) {
|
||||
key := backendKey{VIPName: vipName, Backend: backend.String()}
|
||||
func (c *Checker) startWorker(ctx context.Context, name string, entry config.Backend, hc config.HealthCheck, pos, total int) {
|
||||
rise, fall := hc.Rise, hc.Fall
|
||||
if entry.HealthCheck == "" {
|
||||
// No healthcheck: one synthetic pass drives the backend to Up immediately.
|
||||
rise, fall = 1, 1
|
||||
}
|
||||
wCtx, cancel := context.WithCancel(ctx)
|
||||
hc := vip.HealthCheck
|
||||
w := &worker{
|
||||
backend: health.New(vipName, backend, hc.Rise, hc.Fall),
|
||||
backend: health.New(name, entry.Address, rise, fall),
|
||||
hc: hc,
|
||||
vip: vip,
|
||||
entry: entry,
|
||||
cancel: cancel,
|
||||
}
|
||||
c.workers[key] = w
|
||||
go c.runProbe(wCtx, key, pos, total)
|
||||
c.workers[name] = w
|
||||
go c.runProbe(wCtx, name, pos, total)
|
||||
}
|
||||
|
||||
// runProbe is the per-backend probe loop.
|
||||
// pos and total drive the initial stagger: delay = interval * pos / total.
|
||||
func (c *Checker) runProbe(ctx context.Context, key backendKey, pos, total int) {
|
||||
// Stagger initial probe to spread startup load.
|
||||
func (c *Checker) runProbe(ctx context.Context, name string, pos, total int) {
|
||||
c.mu.RLock()
|
||||
w, ok := c.workers[key]
|
||||
w, ok := c.workers[name]
|
||||
if !ok {
|
||||
c.mu.RUnlock()
|
||||
return
|
||||
@@ -265,15 +292,21 @@ func (c *Checker) runProbe(ctx context.Context, key backendKey, pos, total int)
|
||||
|
||||
for {
|
||||
c.mu.RLock()
|
||||
w, ok := c.workers[key]
|
||||
w, ok := c.workers[name]
|
||||
if !ok {
|
||||
c.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
hc := w.hc
|
||||
vip := w.vip
|
||||
entry := w.entry
|
||||
maxHistory := c.cfg.HealthChecker.TransitionHistory
|
||||
sleepFor := w.backend.NextInterval(hc.Interval, hc.FastInterval, hc.DownInterval)
|
||||
netns := c.cfg.HealthChecker.Netns
|
||||
var sleepFor time.Duration
|
||||
if entry.HealthCheck == "" {
|
||||
sleepFor = 30 * time.Second
|
||||
} else {
|
||||
sleepFor = w.backend.NextInterval(hc.Interval, hc.FastInterval, hc.DownInterval)
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
|
||||
select {
|
||||
@@ -282,42 +315,44 @@ func (c *Checker) runProbe(ctx context.Context, key backendKey, pos, total int)
|
||||
case <-time.After(sleepFor):
|
||||
}
|
||||
|
||||
// Determine source IP based on target address family.
|
||||
backendIP := net.ParseIP(key.Backend)
|
||||
var probeSrc net.IP
|
||||
if backendIP.To4() != nil {
|
||||
probeSrc = c.cfg.ProbeIPv4Src
|
||||
var result health.ProbeResult
|
||||
if entry.HealthCheck == "" {
|
||||
// No healthcheck configured: synthesise a passing result so the
|
||||
// backend is assumed healthy without any network activity.
|
||||
result = health.ProbeResult{OK: true, Layer: health.LayerL7, Code: "L7OK"}
|
||||
} else {
|
||||
probeSrc = c.cfg.ProbeIPv6Src
|
||||
var probeSrc net.IP
|
||||
if entry.Address.To4() != nil {
|
||||
probeSrc = hc.ProbeIPv4Src
|
||||
} else {
|
||||
probeSrc = hc.ProbeIPv6Src
|
||||
}
|
||||
pcfg := prober.ProbeConfig{
|
||||
Target: entry.Address,
|
||||
Port: hc.Port,
|
||||
ProbeSrc: probeSrc,
|
||||
HealthCheckNetns: netns,
|
||||
Timeout: hc.Timeout,
|
||||
HTTP: hc.HTTP,
|
||||
TCP: hc.TCP,
|
||||
}
|
||||
probeCtx, cancel := context.WithTimeout(ctx, hc.Timeout)
|
||||
slog.Debug("probe-start", "backend", name, "type", hc.Type)
|
||||
start := time.Now()
|
||||
result = prober.ForType(hc.Type)(probeCtx, pcfg)
|
||||
cancel()
|
||||
slog.Debug("probe-done",
|
||||
"backend", name,
|
||||
"type", hc.Type,
|
||||
"ok", result.OK,
|
||||
"code", result.Code,
|
||||
"detail", result.Detail,
|
||||
"elapsed", time.Since(start).Round(time.Millisecond).String(),
|
||||
)
|
||||
}
|
||||
|
||||
pcfg := prober.ProbeConfig{
|
||||
Target: backendIP,
|
||||
Port: vip.Port,
|
||||
ProbeSrc: probeSrc,
|
||||
HealthCheckNetns: c.cfg.HealthCheckNetns,
|
||||
Timeout: hc.Timeout,
|
||||
HTTP: hc.HTTP,
|
||||
TCP: hc.TCP,
|
||||
}
|
||||
|
||||
probeCtx, cancel := context.WithTimeout(ctx, hc.Timeout)
|
||||
slog.Debug("probe-start", "vip", key.VIPName, "backend", key.Backend, "type", hc.Type)
|
||||
start := time.Now()
|
||||
result := prober.ForType(hc.Type)(probeCtx, pcfg)
|
||||
cancel()
|
||||
slog.Debug("probe-done",
|
||||
"vip", key.VIPName,
|
||||
"backend", key.Backend,
|
||||
"type", hc.Type,
|
||||
"ok", result.OK,
|
||||
"code", result.Code,
|
||||
"detail", result.Detail,
|
||||
"elapsed", time.Since(start).Round(time.Millisecond).String(),
|
||||
)
|
||||
|
||||
c.mu.Lock()
|
||||
w, exists := c.workers[key]
|
||||
w, exists := c.workers[name]
|
||||
if !exists {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
@@ -326,26 +361,38 @@ func (c *Checker) runProbe(ctx context.Context, key backendKey, pos, total int)
|
||||
t := w.backend.Transitions[0]
|
||||
addr := w.backend.Address
|
||||
slog.Info("backend-transition",
|
||||
"vip", key.VIPName,
|
||||
"backend", key.Backend,
|
||||
"backend", name,
|
||||
"from", t.From.String(),
|
||||
"to", t.To.String(),
|
||||
"code", result.Code,
|
||||
"detail", result.Detail,
|
||||
)
|
||||
c.emit(Event{VIPName: key.VIPName, Backend: addr, Transition: t})
|
||||
c.emitForBackend(name, addr, t)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// emitForBackend emits one Event per frontend that references backendName.
|
||||
// Must be called with c.mu held.
|
||||
func (c *Checker) emitForBackend(backendName string, addr net.IP, t health.Transition) {
|
||||
for feName, fe := range c.cfg.Frontends {
|
||||
for _, name := range fe.Backends {
|
||||
if name == backendName {
|
||||
c.emit(Event{FrontendName: feName, BackendName: backendName, Backend: addr, Transition: t})
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// emit sends an event to the internal fan-out channel (non-blocking).
|
||||
// Must be called with c.mu held.
|
||||
func (c *Checker) emit(e Event) {
|
||||
select {
|
||||
case c.eventCh <- e:
|
||||
default:
|
||||
slog.Warn("event-drop", "vip", e.VIPName, "backend", e.Backend)
|
||||
slog.Warn("event-drop", "frontend", e.FrontendName, "backend", e.BackendName)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -418,20 +465,29 @@ func tcpParamsEqual(a, b *config.TCPParams) bool {
|
||||
a.InsecureSkipVerify == b.InsecureSkipVerify
|
||||
}
|
||||
|
||||
// staggerDelay computes the initial probe delay for a backend at position pos
|
||||
// out of total backends: delay = interval * pos / total.
|
||||
// activeBackendNames returns a sorted, deduplicated list of backend names that
|
||||
// are referenced by at least one frontend and have Enabled: true.
|
||||
func activeBackendNames(cfg *config.Config) []string {
|
||||
seen := map[string]struct{}{}
|
||||
for _, fe := range cfg.Frontends {
|
||||
for _, name := range fe.Backends {
|
||||
if b, ok := cfg.Backends[name]; ok && b.Enabled {
|
||||
seen[name] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
names := make([]string, 0, len(seen))
|
||||
for name := range seen {
|
||||
names = append(names, name)
|
||||
}
|
||||
sort.Strings(names)
|
||||
return names
|
||||
}
|
||||
|
||||
// staggerDelay computes the initial probe delay for position pos out of total.
|
||||
func staggerDelay(interval time.Duration, pos, total int) time.Duration {
|
||||
if total <= 1 {
|
||||
return 0
|
||||
}
|
||||
return time.Duration(int64(interval) * int64(pos) / int64(total))
|
||||
}
|
||||
|
||||
// totalBackends counts all backends across all VIPs in a config.
|
||||
func totalBackends(cfg *config.Frontend) int {
|
||||
n := 0
|
||||
for _, vip := range cfg.VIPs {
|
||||
n += len(vip.Backends)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
@@ -10,23 +10,32 @@ import (
|
||||
"git.ipng.ch/ipng/vpp-maglev/internal/health"
|
||||
)
|
||||
|
||||
func makeTestConfig(interval time.Duration, fall, rise int) *config.Frontend {
|
||||
return &config.Frontend{
|
||||
HealthCheckNetns: "test",
|
||||
HealthChecker: config.HealthCheckerConfig{TransitionHistory: 5},
|
||||
VIPs: map[string]config.VIP{
|
||||
func makeTestConfig(interval time.Duration, fall, rise int) *config.Config {
|
||||
return &config.Config{
|
||||
HealthChecker: config.HealthCheckerConfig{TransitionHistory: 5},
|
||||
HealthChecks: map[string]config.HealthCheck{
|
||||
"icmp": {
|
||||
Type: "icmp",
|
||||
Interval: interval,
|
||||
Timeout: time.Second,
|
||||
Fall: fall,
|
||||
Rise: rise,
|
||||
},
|
||||
},
|
||||
Backends: map[string]config.Backend{
|
||||
"be0": {
|
||||
Address: net.ParseIP("10.0.0.2"),
|
||||
HealthCheck: "icmp",
|
||||
Enabled: true,
|
||||
Weight: 100,
|
||||
},
|
||||
},
|
||||
Frontends: map[string]config.Frontend{
|
||||
"web": {
|
||||
Address: net.ParseIP("192.0.2.1"),
|
||||
Protocol: "tcp",
|
||||
Port: 80,
|
||||
Backends: []net.IP{net.ParseIP("10.0.0.2")},
|
||||
HealthCheck: config.HealthCheck{
|
||||
Type: "icmp",
|
||||
Interval: interval,
|
||||
Timeout: time.Second,
|
||||
Fall: fall,
|
||||
Rise: rise,
|
||||
},
|
||||
Backends: []string{"be0"},
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -62,20 +71,16 @@ func TestHealthCheckEqual(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStateMachineViaBackend(t *testing.T) {
|
||||
// Directly test Backend state transitions (rise=2, fall=3) without goroutines.
|
||||
b := health.New("web", net.ParseIP("10.0.0.2"), 2, 3)
|
||||
b := health.New("be0", net.ParseIP("10.0.0.2"), 2, 3)
|
||||
pass := health.ProbeResult{OK: true, Layer: health.LayerL7, Code: "L7OK"}
|
||||
fail := health.ProbeResult{OK: false, Layer: health.LayerL4, Code: "L4CON"}
|
||||
|
||||
// Unknown → Down on first fail.
|
||||
if !b.Record(fail, 5) {
|
||||
t.Error("first fail from Unknown should transition to Down")
|
||||
}
|
||||
if b.State != health.StateDown {
|
||||
t.Errorf("expected down, got %s", b.State)
|
||||
}
|
||||
|
||||
// rise=2 passes → Up.
|
||||
if b.Record(pass, 5) {
|
||||
t.Error("should not transition after 1 pass (rise=2)")
|
||||
}
|
||||
@@ -105,21 +110,19 @@ func TestReloadAddsBackend(t *testing.T) {
|
||||
c := New(cfg)
|
||||
|
||||
newCfg := makeTestConfig(10*time.Millisecond, 3, 2)
|
||||
newCfg.VIPs["web2"] = config.VIP{
|
||||
newCfg.Backends["be1"] = config.Backend{
|
||||
Address: net.ParseIP("10.0.0.3"),
|
||||
HealthCheck: "icmp",
|
||||
Enabled: true,
|
||||
Weight: 100,
|
||||
}
|
||||
newCfg.Frontends["web2"] = config.Frontend{
|
||||
Address: net.ParseIP("192.0.2.2"),
|
||||
Protocol: "tcp",
|
||||
Port: 443,
|
||||
Backends: []net.IP{net.ParseIP("10.0.0.3")},
|
||||
HealthCheck: config.HealthCheck{
|
||||
Type: "icmp",
|
||||
Interval: 10 * time.Millisecond,
|
||||
Timeout: time.Second,
|
||||
Fall: 3,
|
||||
Rise: 2,
|
||||
},
|
||||
Backends: []string{"be1"},
|
||||
}
|
||||
|
||||
// Cancelled context: no probe goroutines actually run.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
@@ -128,7 +131,7 @@ func TestReloadAddsBackend(t *testing.T) {
|
||||
}
|
||||
|
||||
c.mu.RLock()
|
||||
_, ok := c.workers[backendKey{VIPName: "web2", Backend: "10.0.0.3"}]
|
||||
_, ok := c.workers["be1"]
|
||||
c.mu.RUnlock()
|
||||
if !ok {
|
||||
t.Error("new backend not added after Reload")
|
||||
@@ -144,22 +147,22 @@ func TestReloadRemovesBackend(t *testing.T) {
|
||||
|
||||
// Seed a worker manually.
|
||||
c.mu.Lock()
|
||||
key := backendKey{VIPName: "web", Backend: "10.0.0.2"}
|
||||
wCtx, wCancel := context.WithCancel(context.Background())
|
||||
c.workers[key] = &worker{
|
||||
backend: health.New("web", net.ParseIP("10.0.0.2"), 2, 3),
|
||||
hc: cfg.VIPs["web"].HealthCheck,
|
||||
vip: cfg.VIPs["web"],
|
||||
c.workers["be0"] = &worker{
|
||||
backend: health.New("be0", net.ParseIP("10.0.0.2"), 2, 3),
|
||||
hc: cfg.HealthChecks["icmp"],
|
||||
entry: cfg.Backends["be0"],
|
||||
cancel: wCancel,
|
||||
}
|
||||
c.mu.Unlock()
|
||||
_ = wCtx
|
||||
|
||||
// New config with "web" VIP removed.
|
||||
newCfg := &config.Frontend{
|
||||
HealthCheckNetns: cfg.HealthCheckNetns,
|
||||
HealthChecker: cfg.HealthChecker,
|
||||
VIPs: map[string]config.VIP{},
|
||||
// Remove all frontends → be0 is no longer active.
|
||||
newCfg := &config.Config{
|
||||
HealthChecker: cfg.HealthChecker,
|
||||
HealthChecks: cfg.HealthChecks,
|
||||
Backends: cfg.Backends,
|
||||
Frontends: map[string]config.Frontend{},
|
||||
}
|
||||
|
||||
if err := c.Reload(ctx, newCfg); err != nil {
|
||||
@@ -167,13 +170,30 @@ func TestReloadRemovesBackend(t *testing.T) {
|
||||
}
|
||||
|
||||
c.mu.RLock()
|
||||
_, ok := c.workers[key]
|
||||
_, ok := c.workers["be0"]
|
||||
c.mu.RUnlock()
|
||||
if ok {
|
||||
t.Error("removed backend still present after Reload")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSharedBackendProbedOnce(t *testing.T) {
|
||||
// be0 is referenced by two frontends — only one worker should exist.
|
||||
cfg := makeTestConfig(10*time.Millisecond, 3, 2)
|
||||
cfg.Frontends["web-tls"] = config.Frontend{
|
||||
Address: net.ParseIP("192.0.2.3"),
|
||||
Protocol: "tcp",
|
||||
Port: 443,
|
||||
Backends: []string{"be0"},
|
||||
}
|
||||
|
||||
c := New(cfg)
|
||||
names := activeBackendNames(c.cfg)
|
||||
if len(names) != 1 || names[0] != "be0" {
|
||||
t.Errorf("expected exactly one active backend, got %v", names)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribe(t *testing.T) {
|
||||
cfg := makeTestConfig(10*time.Millisecond, 1, 1)
|
||||
c := New(cfg)
|
||||
@@ -186,8 +206,9 @@ func TestSubscribe(t *testing.T) {
|
||||
defer unsub()
|
||||
|
||||
e := Event{
|
||||
VIPName: "web",
|
||||
Backend: net.ParseIP("10.0.0.2"),
|
||||
FrontendName: "web",
|
||||
BackendName: "be0",
|
||||
Backend: net.ParseIP("10.0.0.2"),
|
||||
Transition: health.Transition{
|
||||
From: health.StateUnknown,
|
||||
To: health.StateUp,
|
||||
@@ -199,8 +220,11 @@ func TestSubscribe(t *testing.T) {
|
||||
|
||||
select {
|
||||
case got := <-ch:
|
||||
if got.VIPName != "web" {
|
||||
t.Errorf("event VIPName: got %q, want %q", got.VIPName, "web")
|
||||
if got.FrontendName != "web" {
|
||||
t.Errorf("event FrontendName: got %q, want web", got.FrontendName)
|
||||
}
|
||||
if got.BackendName != "be0" {
|
||||
t.Errorf("event BackendName: got %q, want be0", got.BackendName)
|
||||
}
|
||||
if got.Transition.To != health.StateUp {
|
||||
t.Errorf("event To state: got %s, want up", got.Transition.To)
|
||||
@@ -211,38 +235,36 @@ func TestSubscribe(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPauseResume(t *testing.T) {
|
||||
cfg := makeTestConfig(time.Hour, 3, 2) // long interval so probes never fire
|
||||
cfg := makeTestConfig(time.Hour, 3, 2)
|
||||
c := New(cfg)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go c.fanOut(ctx)
|
||||
|
||||
// Seed a worker.
|
||||
c.mu.Lock()
|
||||
key := backendKey{VIPName: "web", Backend: "10.0.0.2"}
|
||||
_, wCancel := context.WithCancel(ctx)
|
||||
c.workers[key] = &worker{
|
||||
backend: health.New("web", net.ParseIP("10.0.0.2"), 2, 3),
|
||||
hc: cfg.VIPs["web"].HealthCheck,
|
||||
vip: cfg.VIPs["web"],
|
||||
c.workers["be0"] = &worker{
|
||||
backend: health.New("be0", net.ParseIP("10.0.0.2"), 2, 3),
|
||||
hc: cfg.HealthChecks["icmp"],
|
||||
entry: cfg.Backends["be0"],
|
||||
cancel: wCancel,
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
b, ok := c.PauseBackend("web", "10.0.0.2")
|
||||
b, ok := c.PauseBackend("be0")
|
||||
if !ok {
|
||||
t.Fatal("PauseBackend: not found")
|
||||
}
|
||||
if b.State != health.StatePaused {
|
||||
t.Errorf("after pause: %s", b.State)
|
||||
if b.Health.State != health.StatePaused {
|
||||
t.Errorf("after pause: %s", b.Health.State)
|
||||
}
|
||||
|
||||
b, ok = c.ResumeBackend("web", "10.0.0.2")
|
||||
b, ok = c.ResumeBackend("be0")
|
||||
if !ok {
|
||||
t.Fatal("ResumeBackend: not found")
|
||||
}
|
||||
if b.State != health.StateUnknown {
|
||||
t.Errorf("after resume: %s", b.State)
|
||||
if b.Health.State != health.StateUnknown {
|
||||
t.Errorf("after resume: %s", b.Health.State)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user