when maglevd rehashes its config:
- when a backend gets newly added or restarted, an event should fire; perhaps transition to 'unknown' - when a backend gets removed, an event should fire; perhaps transition to 'removed' - when a backend is in 'unknown' state, fast-interval is appropriate
This commit is contained in:
@@ -67,10 +67,11 @@ func (c *Checker) Run(ctx context.Context) error {
|
|||||||
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
names := activeBackendNames(c.cfg)
|
names := activeBackendNames(c.cfg)
|
||||||
|
maxHistory := c.cfg.HealthChecker.TransitionHistory
|
||||||
for i, name := range names {
|
for i, name := range names {
|
||||||
b := c.cfg.Backends[name]
|
b := c.cfg.Backends[name]
|
||||||
hc := c.cfg.HealthChecks[b.HealthCheck]
|
hc := c.cfg.HealthChecks[b.HealthCheck]
|
||||||
c.startWorker(ctx, name, b, hc, i, len(names))
|
c.startWorker(ctx, name, b, hc, i, len(names), maxHistory)
|
||||||
}
|
}
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
|
|
||||||
@@ -86,21 +87,25 @@ func (c *Checker) Reload(ctx context.Context, cfg *config.Config) error {
|
|||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
maxHistory := cfg.HealthChecker.TransitionHistory
|
||||||
|
|
||||||
desired := map[string]struct{}{}
|
desired := map[string]struct{}{}
|
||||||
for _, name := range activeBackendNames(cfg) {
|
for _, name := range activeBackendNames(cfg) {
|
||||||
desired[name] = struct{}{}
|
desired[name] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop workers no longer needed.
|
// Stop workers no longer needed; emit a removed event using the old frontends.
|
||||||
for name, w := range c.workers {
|
for name, w := range c.workers {
|
||||||
if _, ok := desired[name]; !ok {
|
if _, ok := desired[name]; !ok {
|
||||||
slog.Info("backend-stop", "backend", name)
|
slog.Info("backend-stop", "backend", name)
|
||||||
|
t := w.backend.Remove(maxHistory)
|
||||||
|
c.emitForBackend(name, w.backend.Address, t, c.cfg.Frontends)
|
||||||
w.cancel()
|
w.cancel()
|
||||||
delete(c.workers, name)
|
delete(c.workers, name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add new or restart changed workers.
|
// Add new or restart changed workers; emit an unknown event using the new frontends.
|
||||||
names := activeBackendNames(cfg)
|
names := activeBackendNames(cfg)
|
||||||
for i, name := range names {
|
for i, name := range names {
|
||||||
b := cfg.Backends[name]
|
b := cfg.Backends[name]
|
||||||
@@ -113,11 +118,12 @@ func (c *Checker) Reload(ctx context.Context, cfg *config.Config) error {
|
|||||||
}
|
}
|
||||||
slog.Info("backend-restart", "backend", name)
|
slog.Info("backend-restart", "backend", name)
|
||||||
w.cancel()
|
w.cancel()
|
||||||
c.startWorker(ctx, name, b, hc, i, len(names))
|
c.startWorker(ctx, name, b, hc, i, len(names), maxHistory)
|
||||||
} else {
|
} else {
|
||||||
slog.Info("backend-start", "backend", name)
|
slog.Info("backend-start", "backend", name)
|
||||||
c.startWorker(ctx, name, b, hc, i, len(names))
|
c.startWorker(ctx, name, b, hc, i, len(names), maxHistory)
|
||||||
}
|
}
|
||||||
|
c.emitForBackend(name, c.workers[name].backend.Address, c.workers[name].backend.Transitions[0], cfg.Frontends)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.cfg = cfg
|
c.cfg = cfg
|
||||||
@@ -231,7 +237,7 @@ func (c *Checker) PauseBackend(name string) (BackendSnapshot, bool) {
|
|||||||
maxHistory := c.cfg.HealthChecker.TransitionHistory
|
maxHistory := c.cfg.HealthChecker.TransitionHistory
|
||||||
if w.backend.Pause(maxHistory) {
|
if w.backend.Pause(maxHistory) {
|
||||||
slog.Info("backend-pause", "backend", name)
|
slog.Info("backend-pause", "backend", name)
|
||||||
c.emitForBackend(name, w.backend.Address, w.backend.Transitions[0])
|
c.emitForBackend(name, w.backend.Address, w.backend.Transitions[0], c.cfg.Frontends)
|
||||||
}
|
}
|
||||||
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
|
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
|
||||||
}
|
}
|
||||||
@@ -247,7 +253,7 @@ func (c *Checker) ResumeBackend(name string) (BackendSnapshot, bool) {
|
|||||||
maxHistory := c.cfg.HealthChecker.TransitionHistory
|
maxHistory := c.cfg.HealthChecker.TransitionHistory
|
||||||
if w.backend.Resume(maxHistory) {
|
if w.backend.Resume(maxHistory) {
|
||||||
slog.Info("backend-resume", "backend", name)
|
slog.Info("backend-resume", "backend", name)
|
||||||
c.emitForBackend(name, w.backend.Address, w.backend.Transitions[0])
|
c.emitForBackend(name, w.backend.Address, w.backend.Transitions[0], c.cfg.Frontends)
|
||||||
}
|
}
|
||||||
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
|
return BackendSnapshot{Health: w.backend, Config: w.entry}, true
|
||||||
}
|
}
|
||||||
@@ -256,7 +262,7 @@ func (c *Checker) ResumeBackend(name string) (BackendSnapshot, bool) {
|
|||||||
|
|
||||||
// startWorker creates a Backend and launches a probe goroutine.
|
// startWorker creates a Backend and launches a probe goroutine.
|
||||||
// Must be called with c.mu held.
|
// Must be called with c.mu held.
|
||||||
func (c *Checker) startWorker(ctx context.Context, name string, entry config.Backend, hc config.HealthCheck, pos, total int) {
|
func (c *Checker) startWorker(ctx context.Context, name string, entry config.Backend, hc config.HealthCheck, pos, total, maxHistory int) {
|
||||||
rise, fall := hc.Rise, hc.Fall
|
rise, fall := hc.Rise, hc.Fall
|
||||||
if entry.HealthCheck == "" {
|
if entry.HealthCheck == "" {
|
||||||
// No healthcheck: one synthetic pass drives the backend to Up immediately.
|
// No healthcheck: one synthetic pass drives the backend to Up immediately.
|
||||||
@@ -269,6 +275,7 @@ func (c *Checker) startWorker(ctx context.Context, name string, entry config.Bac
|
|||||||
entry: entry,
|
entry: entry,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
|
w.backend.Start(maxHistory)
|
||||||
c.workers[name] = w
|
c.workers[name] = w
|
||||||
go c.runProbe(wCtx, name, pos, total)
|
go c.runProbe(wCtx, name, pos, total)
|
||||||
}
|
}
|
||||||
@@ -369,16 +376,16 @@ func (c *Checker) runProbe(ctx context.Context, name string, pos, total int) {
|
|||||||
"code", result.Code,
|
"code", result.Code,
|
||||||
"detail", result.Detail,
|
"detail", result.Detail,
|
||||||
)
|
)
|
||||||
c.emitForBackend(name, addr, t)
|
c.emitForBackend(name, addr, t, c.cfg.Frontends)
|
||||||
}
|
}
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// emitForBackend emits one Event per frontend that references backendName.
|
// emitForBackend emits one Event per frontend that references backendName,
|
||||||
// Must be called with c.mu held.
|
// using the provided frontends map. Must be called with c.mu held.
|
||||||
func (c *Checker) emitForBackend(backendName string, addr net.IP, t health.Transition) {
|
func (c *Checker) emitForBackend(backendName string, addr net.IP, t health.Transition, frontends map[string]config.Frontend) {
|
||||||
for feName, fe := range c.cfg.Frontends {
|
for feName, fe := range frontends {
|
||||||
for _, name := range fe.Backends {
|
for _, name := range fe.Backends {
|
||||||
if name == backendName {
|
if name == backendName {
|
||||||
c.emit(Event{FrontendName: feName, BackendName: backendName, Backend: addr, Transition: t})
|
c.emit(Event{FrontendName: feName, BackendName: backendName, Backend: addr, Transition: t})
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ const (
|
|||||||
StateUp
|
StateUp
|
||||||
StateDown
|
StateDown
|
||||||
StatePaused
|
StatePaused
|
||||||
|
StateRemoved // backend was removed from configuration
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s State) String() string {
|
func (s State) String() string {
|
||||||
@@ -45,6 +46,8 @@ func (s State) String() string {
|
|||||||
return "down"
|
return "down"
|
||||||
case StatePaused:
|
case StatePaused:
|
||||||
return "paused"
|
return "paused"
|
||||||
|
case StateRemoved:
|
||||||
|
return "removed"
|
||||||
default:
|
default:
|
||||||
return "unknown"
|
return "unknown"
|
||||||
}
|
}
|
||||||
@@ -118,7 +121,7 @@ func New(name string, addr net.IP, rise, fall int) *Backend {
|
|||||||
// failure means the backend is not yet confirmed reachable), and to StateUp
|
// failure means the backend is not yet confirmed reachable), and to StateUp
|
||||||
// once the counter reaches Rise consecutive passes.
|
// once the counter reaches Rise consecutive passes.
|
||||||
func (b *Backend) Record(r ProbeResult, maxHistory int) bool {
|
func (b *Backend) Record(r ProbeResult, maxHistory int) bool {
|
||||||
if b.State == StatePaused {
|
if b.State == StatePaused || b.State == StateRemoved {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if r.OK {
|
if r.OK {
|
||||||
@@ -157,12 +160,15 @@ func (b *Backend) Resume(maxHistory int) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NextInterval returns the appropriate probe interval based on state and counter:
|
// NextInterval returns the appropriate probe interval based on state and counter:
|
||||||
// - Unknown (no probes yet): interval — probe promptly to establish initial state
|
// - Unknown (initial / post-resume): fastInterval (falls back to interval) — probe quickly to establish state
|
||||||
// - Fully healthy (counter at max): interval
|
// - Fully healthy (counter at max): interval
|
||||||
// - Fully down (counter at 0): downInterval (falls back to interval)
|
// - Fully down (counter at 0): downInterval (falls back to interval)
|
||||||
// - Degraded (anywhere in between): fastInterval (falls back to interval)
|
// - Degraded (anywhere in between): fastInterval (falls back to interval)
|
||||||
func (b *Backend) NextInterval(interval, fastInterval, downInterval time.Duration) time.Duration {
|
func (b *Backend) NextInterval(interval, fastInterval, downInterval time.Duration) time.Duration {
|
||||||
if b.State == StateUnknown {
|
if b.State == StateUnknown {
|
||||||
|
if fastInterval > 0 {
|
||||||
|
return fastInterval
|
||||||
|
}
|
||||||
return interval
|
return interval
|
||||||
}
|
}
|
||||||
if b.Counter.Health == b.Counter.Max() {
|
if b.Counter.Health == b.Counter.Max() {
|
||||||
@@ -180,6 +186,21 @@ func (b *Backend) NextInterval(interval, fastInterval, downInterval time.Duratio
|
|||||||
return interval
|
return interval
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start records the initial StateUnknown transition when a backend is first
|
||||||
|
// created or restarted. It exists solely to populate the transition history
|
||||||
|
// and fire a reload event; the state does not change.
|
||||||
|
func (b *Backend) Start(maxHistory int) Transition {
|
||||||
|
b.transition(StateUnknown, ProbeResult{Code: "start"}, maxHistory)
|
||||||
|
return b.Transitions[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove transitions the backend to StateRemoved. Returns the transition.
|
||||||
|
// After this call no further probe results are accepted.
|
||||||
|
func (b *Backend) Remove(maxHistory int) Transition {
|
||||||
|
b.transition(StateRemoved, ProbeResult{Code: "removed"}, maxHistory)
|
||||||
|
return b.Transitions[0]
|
||||||
|
}
|
||||||
|
|
||||||
// transition appends a new Transition and updates State.
|
// transition appends a new Transition and updates State.
|
||||||
func (b *Backend) transition(to State, r ProbeResult, maxHistory int) {
|
func (b *Backend) transition(to State, r ProbeResult, maxHistory int) {
|
||||||
t := Transition{From: b.State, To: to, At: time.Now(), Result: r}
|
t := Transition{From: b.State, To: to, At: time.Now(), Result: r}
|
||||||
|
|||||||
@@ -169,9 +169,13 @@ func TestNextInterval(t *testing.T) {
|
|||||||
|
|
||||||
b := New("test", net.ParseIP("10.0.0.1"), 2, 3) // max=4
|
b := New("test", net.ParseIP("10.0.0.1"), 2, 3) // max=4
|
||||||
|
|
||||||
// Unknown (no probes yet): always use interval, never downInterval.
|
// Unknown: use fast-interval to establish state quickly.
|
||||||
if got := b.NextInterval(interval, fast, down); got != interval {
|
if got := b.NextInterval(interval, fast, down); got != fast {
|
||||||
t.Errorf("StateUnknown: got %v, want %v (interval)", got, interval)
|
t.Errorf("StateUnknown with fast: got %v, want %v (fast)", got, fast)
|
||||||
|
}
|
||||||
|
// Unknown, no fast-interval configured: fall back to interval.
|
||||||
|
if got := b.NextInterval(interval, 0, down); got != interval {
|
||||||
|
t.Errorf("StateUnknown without fast: got %v, want %v (interval)", got, interval)
|
||||||
}
|
}
|
||||||
|
|
||||||
// After first fail: counter=0, state=Down → downInterval.
|
// After first fail: counter=0, state=Down → downInterval.
|
||||||
@@ -291,6 +295,40 @@ func TestTransitionTimestamp(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStartRemove(t *testing.T) {
|
||||||
|
b := newBackend()
|
||||||
|
|
||||||
|
// Start records an unknown→unknown transition.
|
||||||
|
tr := b.Start(5)
|
||||||
|
if tr.From != StateUnknown || tr.To != StateUnknown {
|
||||||
|
t.Errorf("Start transition: got %s→%s, want unknown→unknown", tr.From, tr.To)
|
||||||
|
}
|
||||||
|
if len(b.Transitions) != 1 {
|
||||||
|
t.Errorf("transitions after Start: got %d, want 1", len(b.Transitions))
|
||||||
|
}
|
||||||
|
if b.State != StateUnknown {
|
||||||
|
t.Errorf("state after Start: got %s, want unknown", b.State)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove transitions to StateRemoved.
|
||||||
|
b.State = StateUp
|
||||||
|
tr = b.Remove(5)
|
||||||
|
if tr.From != StateUp || tr.To != StateRemoved {
|
||||||
|
t.Errorf("Remove transition: got %s→%s, want up→removed", tr.From, tr.To)
|
||||||
|
}
|
||||||
|
if b.State != StateRemoved {
|
||||||
|
t.Errorf("state after Remove: got %s, want removed", b.State)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Record is a no-op once removed.
|
||||||
|
if b.Record(pass(), 5) {
|
||||||
|
t.Error("Record should not transition a removed backend")
|
||||||
|
}
|
||||||
|
if b.State != StateRemoved {
|
||||||
|
t.Errorf("state changed after Record on removed backend: %s", b.State)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestStateString(t *testing.T) {
|
func TestStateString(t *testing.T) {
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
s State
|
s State
|
||||||
@@ -300,6 +338,7 @@ func TestStateString(t *testing.T) {
|
|||||||
{StateUp, "up"},
|
{StateUp, "up"},
|
||||||
{StateDown, "down"},
|
{StateDown, "down"},
|
||||||
{StatePaused, "paused"},
|
{StatePaused, "paused"},
|
||||||
|
{StateRemoved, "removed"},
|
||||||
}
|
}
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
if c.s.String() != c.want {
|
if c.s.String() != c.want {
|
||||||
|
|||||||
Reference in New Issue
Block a user