149 lines
4.2 KiB
Go
149 lines
4.2 KiB
Go
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package grpcapi
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"sync"
|
|
)
|
|
|
|
// logSub is a single WatchEvents subscriber interested in log events.
|
|
type logSub struct {
|
|
minLevel slog.Level
|
|
ch chan *LogEvent
|
|
}
|
|
|
|
// broadcasterState holds the shared subscription registry. It is referenced
|
|
// by pointer so that copies returned from WithAttrs/WithGroup share the same set.
|
|
type broadcasterState struct {
|
|
mu sync.Mutex
|
|
nextID int
|
|
subs map[int]*logSub
|
|
}
|
|
|
|
func (s *broadcasterState) subscribe(minLevel slog.Level) (<-chan *LogEvent, func()) {
|
|
s.mu.Lock()
|
|
id := s.nextID
|
|
s.nextID++
|
|
sub := &logSub{minLevel: minLevel, ch: make(chan *LogEvent, 256)}
|
|
s.subs[id] = sub
|
|
s.mu.Unlock()
|
|
return sub.ch, func() {
|
|
s.mu.Lock()
|
|
delete(s.subs, id)
|
|
close(sub.ch)
|
|
s.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// hasSubscriberAt reports whether any subscriber wants records at level or above.
|
|
func (s *broadcasterState) hasSubscriberAt(level slog.Level) bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
for _, sub := range s.subs {
|
|
if level >= sub.minLevel {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (s *broadcasterState) fanOut(level slog.Level, ev *LogEvent) {
|
|
s.mu.Lock()
|
|
for _, sub := range s.subs {
|
|
if level >= sub.minLevel {
|
|
select {
|
|
case sub.ch <- ev:
|
|
default:
|
|
// slow subscriber — drop rather than block
|
|
}
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// LogBroadcaster implements slog.Handler. It forwards every record to an
|
|
// inner handler (e.g. the JSON stdout handler) and simultaneously fans out
|
|
// structured LogEvent messages to all gRPC WatchEvents subscribers.
|
|
type LogBroadcaster struct {
|
|
inner slog.Handler
|
|
preAttrs []*LogAttr // pre-resolved attrs from WithAttrs calls
|
|
groupPfx string // key prefix accumulated by WithGroup calls
|
|
shared *broadcasterState
|
|
}
|
|
|
|
// NewLogBroadcaster wraps inner and returns a LogBroadcaster ready for use
|
|
// as the process slog default handler.
|
|
func NewLogBroadcaster(inner slog.Handler) *LogBroadcaster {
|
|
return &LogBroadcaster{
|
|
inner: inner,
|
|
shared: &broadcasterState{subs: make(map[int]*logSub)},
|
|
}
|
|
}
|
|
|
|
// Subscribe registers a subscriber that receives LogEvents at or above
|
|
// minLevel. The returned channel is closed when the cancel func is called.
|
|
func (b *LogBroadcaster) Subscribe(minLevel slog.Level) (<-chan *LogEvent, func()) {
|
|
return b.shared.subscribe(minLevel)
|
|
}
|
|
|
|
// Enabled implements slog.Handler. It returns true when either the inner
|
|
// handler wants the record OR at least one gRPC subscriber has a minLevel at
|
|
// or below level. This allows a WatchEvents client requesting debug log events
|
|
// to receive them even when maglevd's own -log-level is set higher (e.g. error).
|
|
func (b *LogBroadcaster) Enabled(ctx context.Context, level slog.Level) bool {
|
|
return b.inner.Enabled(ctx, level) || b.shared.hasSubscriberAt(level)
|
|
}
|
|
|
|
// Handle implements slog.Handler. It forwards the record to the inner handler
|
|
// only when the inner handler is enabled for that level (avoiding duplicate or
|
|
// unwanted stdout output), then fans it out to all interested gRPC subscribers.
|
|
func (b *LogBroadcaster) Handle(ctx context.Context, r slog.Record) error {
|
|
var err error
|
|
if b.inner.Enabled(ctx, r.Level) {
|
|
err = b.inner.Handle(ctx, r)
|
|
}
|
|
|
|
attrs := make([]*LogAttr, 0, len(b.preAttrs)+r.NumAttrs())
|
|
attrs = append(attrs, b.preAttrs...)
|
|
r.Attrs(func(a slog.Attr) bool {
|
|
attrs = append(attrs, &LogAttr{Key: b.groupPfx + a.Key, Value: a.Value.String()})
|
|
return true
|
|
})
|
|
|
|
ev := &LogEvent{
|
|
AtUnixNs: r.Time.UnixNano(),
|
|
Level: r.Level.String(),
|
|
Msg: r.Message,
|
|
Attrs: attrs,
|
|
}
|
|
b.shared.fanOut(r.Level, ev)
|
|
return err
|
|
}
|
|
|
|
// WithAttrs implements slog.Handler.
|
|
func (b *LogBroadcaster) WithAttrs(attrs []slog.Attr) slog.Handler {
|
|
pre := make([]*LogAttr, len(b.preAttrs), len(b.preAttrs)+len(attrs))
|
|
copy(pre, b.preAttrs)
|
|
for _, a := range attrs {
|
|
pre = append(pre, &LogAttr{Key: b.groupPfx + a.Key, Value: a.Value.String()})
|
|
}
|
|
return &LogBroadcaster{
|
|
inner: b.inner.WithAttrs(attrs),
|
|
preAttrs: pre,
|
|
groupPfx: b.groupPfx,
|
|
shared: b.shared,
|
|
}
|
|
}
|
|
|
|
// WithGroup implements slog.Handler.
|
|
func (b *LogBroadcaster) WithGroup(name string) slog.Handler {
|
|
return &LogBroadcaster{
|
|
inner: b.inner.WithGroup(name),
|
|
preAttrs: b.preAttrs,
|
|
groupPfx: b.groupPfx + name + ".",
|
|
shared: b.shared,
|
|
}
|
|
}
|