Files
vpp-maglev/internal/grpcapi/loghandler.go

149 lines
4.2 KiB
Go

// SPDX-License-Identifier: Apache-2.0
package grpcapi
import (
"context"
"log/slog"
"sync"
)
// logSub is a single WatchEvents subscriber interested in log events.
type logSub struct {
minLevel slog.Level
ch chan *LogEvent
}
// broadcasterState holds the shared subscription registry. It is referenced
// by pointer so that copies returned from WithAttrs/WithGroup share the same set.
type broadcasterState struct {
mu sync.Mutex
nextID int
subs map[int]*logSub
}
func (s *broadcasterState) subscribe(minLevel slog.Level) (<-chan *LogEvent, func()) {
s.mu.Lock()
id := s.nextID
s.nextID++
sub := &logSub{minLevel: minLevel, ch: make(chan *LogEvent, 256)}
s.subs[id] = sub
s.mu.Unlock()
return sub.ch, func() {
s.mu.Lock()
delete(s.subs, id)
close(sub.ch)
s.mu.Unlock()
}
}
// hasSubscriberAt reports whether any subscriber wants records at level or above.
func (s *broadcasterState) hasSubscriberAt(level slog.Level) bool {
s.mu.Lock()
defer s.mu.Unlock()
for _, sub := range s.subs {
if level >= sub.minLevel {
return true
}
}
return false
}
func (s *broadcasterState) fanOut(level slog.Level, ev *LogEvent) {
s.mu.Lock()
for _, sub := range s.subs {
if level >= sub.minLevel {
select {
case sub.ch <- ev:
default:
// slow subscriber — drop rather than block
}
}
}
s.mu.Unlock()
}
// LogBroadcaster implements slog.Handler. It forwards every record to an
// inner handler (e.g. the JSON stdout handler) and simultaneously fans out
// structured LogEvent messages to all gRPC WatchEvents subscribers.
type LogBroadcaster struct {
inner slog.Handler
preAttrs []*LogAttr // pre-resolved attrs from WithAttrs calls
groupPfx string // key prefix accumulated by WithGroup calls
shared *broadcasterState
}
// NewLogBroadcaster wraps inner and returns a LogBroadcaster ready for use
// as the process slog default handler.
func NewLogBroadcaster(inner slog.Handler) *LogBroadcaster {
return &LogBroadcaster{
inner: inner,
shared: &broadcasterState{subs: make(map[int]*logSub)},
}
}
// Subscribe registers a subscriber that receives LogEvents at or above
// minLevel. The returned channel is closed when the cancel func is called.
func (b *LogBroadcaster) Subscribe(minLevel slog.Level) (<-chan *LogEvent, func()) {
return b.shared.subscribe(minLevel)
}
// Enabled implements slog.Handler. It returns true when either the inner
// handler wants the record OR at least one gRPC subscriber has a minLevel at
// or below level. This allows a WatchEvents client requesting debug log events
// to receive them even when maglevd's own -log-level is set higher (e.g. error).
func (b *LogBroadcaster) Enabled(ctx context.Context, level slog.Level) bool {
return b.inner.Enabled(ctx, level) || b.shared.hasSubscriberAt(level)
}
// Handle implements slog.Handler. It forwards the record to the inner handler
// only when the inner handler is enabled for that level (avoiding duplicate or
// unwanted stdout output), then fans it out to all interested gRPC subscribers.
func (b *LogBroadcaster) Handle(ctx context.Context, r slog.Record) error {
var err error
if b.inner.Enabled(ctx, r.Level) {
err = b.inner.Handle(ctx, r)
}
attrs := make([]*LogAttr, 0, len(b.preAttrs)+r.NumAttrs())
attrs = append(attrs, b.preAttrs...)
r.Attrs(func(a slog.Attr) bool {
attrs = append(attrs, &LogAttr{Key: b.groupPfx + a.Key, Value: a.Value.String()})
return true
})
ev := &LogEvent{
AtUnixNs: r.Time.UnixNano(),
Level: r.Level.String(),
Msg: r.Message,
Attrs: attrs,
}
b.shared.fanOut(r.Level, ev)
return err
}
// WithAttrs implements slog.Handler.
func (b *LogBroadcaster) WithAttrs(attrs []slog.Attr) slog.Handler {
pre := make([]*LogAttr, len(b.preAttrs), len(b.preAttrs)+len(attrs))
copy(pre, b.preAttrs)
for _, a := range attrs {
pre = append(pre, &LogAttr{Key: b.groupPfx + a.Key, Value: a.Value.String()})
}
return &LogBroadcaster{
inner: b.inner.WithAttrs(attrs),
preAttrs: pre,
groupPfx: b.groupPfx,
shared: b.shared,
}
}
// WithGroup implements slog.Handler.
func (b *LogBroadcaster) WithGroup(name string) slog.Handler {
return &LogBroadcaster{
inner: b.inner.WithGroup(name),
preAttrs: b.preAttrs,
groupPfx: b.groupPfx + name + ".",
shared: b.shared,
}
}