From b89caa594c8d103a7450a5ed11fc12fc040c28ba Mon Sep 17 00:00:00 2001 From: Pim van Pelt Date: Mon, 23 Mar 2026 20:39:12 +0100 Subject: [PATCH] Auto-rediscover new glob patterns --- cmd/collector/main.go | 23 ++++------ cmd/collector/tailer.go | 88 +++++++++++++++++++++++++++++++----- cmd/collector/tailer_test.go | 6 +-- 3 files changed, 89 insertions(+), 28 deletions(-) diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 4a050cb..891e4ec 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -11,30 +11,27 @@ import ( "path/filepath" "strings" "syscall" + "time" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" "google.golang.org/grpc" ) func main() { - listen := flag.String("listen", ":9090", "gRPC listen address") - logPaths := flag.String("logs", "", "comma-separated log file paths/globs to tail") - logsFile := flag.String("logs-file", "", "file containing one log path/glob per line") - source := flag.String("source", hostname(), "name for this collector (default: hostname)") - v4prefix := flag.Int("v4prefix", 24, "IPv4 prefix length for client bucketing") - v6prefix := flag.Int("v6prefix", 48, "IPv6 prefix length for client bucketing") + listen := flag.String("listen", ":9090", "gRPC listen address") + logPaths := flag.String("logs", "", "comma-separated log file paths/globs to tail") + logsFile := flag.String("logs-file", "", "file containing one log path/glob per line") + source := flag.String("source", hostname(), "name for this collector (default: hostname)") + v4prefix := flag.Int("v4prefix", 24, "IPv4 prefix length for client bucketing") + v6prefix := flag.Int("v6prefix", 48, "IPv6 prefix length for client bucketing") + scanInterval := flag.Duration("scan-interval", 10*time.Second, "how often to rescan glob patterns for new/removed files") flag.Parse() patterns := collectPatterns(*logPaths, *logsFile) if len(patterns) == 0 { log.Fatal("collector: no log paths specified; use --logs or --logs-file") } - - paths := expandGlobs(patterns) - if len(paths) == 0 { - log.Fatal("collector: no log files matched the specified patterns") - } - log.Printf("collector: tailing %d file(s)", len(paths)) + log.Printf("collector: watching %d pattern(s), rescan every %s", len(patterns), *scanInterval) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() @@ -45,7 +42,7 @@ func main() { store := NewStore(*source) go store.Run(ch) - tailer := NewMultiTailer(paths, *v4prefix, *v6prefix, ch) + tailer := NewMultiTailer(patterns, *scanInterval, *v4prefix, *v6prefix, ch) go tailer.Run(ctx) lis, err := net.Listen("tcp", *listen) diff --git a/cmd/collector/tailer.go b/cmd/collector/tailer.go index 3c6b4e1..1277916 100644 --- a/cmd/collector/tailer.go +++ b/cmd/collector/tailer.go @@ -28,20 +28,25 @@ type reopenMsg struct { // fsnotify.Watcher (one inotify instance). This scales to hundreds of files // without hitting the kernel limit on inotify instances per user. type MultiTailer struct { - paths []string - v4bits int - v6bits int - ch chan<- LogRecord + patterns []string + scanInterval time.Duration + v4bits int + v6bits int + ch chan<- LogRecord } -func NewMultiTailer(paths []string, v4bits, v6bits int, ch chan<- LogRecord) *MultiTailer { - return &MultiTailer{paths: paths, v4bits: v4bits, v6bits: v6bits, ch: ch} +func NewMultiTailer(patterns []string, scanInterval time.Duration, v4bits, v6bits int, ch chan<- LogRecord) *MultiTailer { + return &MultiTailer{patterns: patterns, scanInterval: scanInterval, v4bits: v4bits, v6bits: v6bits, ch: ch} } // Run tails all configured files until ctx is cancelled. // All files share one fsnotify.Watcher. Log rotation is handled per-file: // on RENAME/REMOVE the old fd is drained then a retry goroutine re-opens // the original path and hands it back via a channel. +// +// A periodic rescan re-expands the glob patterns so that files created after +// startup are picked up automatically and files that have disappeared (and +// are no longer matched by any pattern) are retired. func (mt *MultiTailer) Run(ctx context.Context) { watcher, err := fsnotify.NewWatcher() if err != nil { @@ -49,21 +54,33 @@ func (mt *MultiTailer) Run(ctx context.Context) { } defer watcher.Close() - files := make(map[string]*fileState, len(mt.paths)) - reopenCh := make(chan reopenMsg, len(mt.paths)) + files := make(map[string]*fileState) + retrying := make(map[string]struct{}) // paths currently in a retryOpen goroutine + reopenCh := make(chan reopenMsg, 32) - // Open all files and seek to EOF. - for _, path := range mt.paths { + startRetry := func(path string) { + if _, already := retrying[path]; already { + return + } + retrying[path] = struct{}{} + go retryOpen(ctx, path, watcher, reopenCh) + } + + // Initial scan. + for _, path := range expandGlobs(mt.patterns) { fs, err := openAndSeekEOF(path, watcher) if err != nil { log.Printf("tailer: %s not found, will retry: %v", path, err) - go retryOpen(ctx, path, watcher, reopenCh) + startRetry(path) continue } files[path] = fs log.Printf("tailer: watching %s", path) } + ticker := time.NewTicker(mt.scanInterval) + defer ticker.Stop() + for { select { case <-ctx.Done(): @@ -76,12 +93,16 @@ func (mt *MultiTailer) Run(ctx context.Context) { if !ok { return } + delete(retrying, msg.path) files[msg.path] = &fileState{f: msg.f, reader: bufio.NewReader(msg.f)} if err := watcher.Add(msg.path); err != nil { log.Printf("tailer: watcher re-add failed for %s: %v", msg.path, err) } log.Printf("tailer: re-opened %s after rotation", msg.path) + case <-ticker.C: + mt.rescan(ctx, watcher, files, retrying, reopenCh, startRetry) + case event, ok := <-watcher.Events: if !ok { return @@ -99,7 +120,7 @@ func (mt *MultiTailer) Run(ctx context.Context) { fs.f.Close() delete(files, event.Name) _ = watcher.Remove(event.Name) - go retryOpen(ctx, event.Name, watcher, reopenCh) + startRetry(event.Name) } case err, ok := <-watcher.Errors: @@ -111,6 +132,49 @@ func (mt *MultiTailer) Run(ctx context.Context) { } } +// rescan re-expands the glob patterns and reconciles against the current file +// set: new matches are opened (or queued for retry), and files no longer +// matched by any pattern are drained, closed, and retired. +func (mt *MultiTailer) rescan( + ctx context.Context, + watcher *fsnotify.Watcher, + files map[string]*fileState, + retrying map[string]struct{}, + _ chan reopenMsg, + startRetry func(string), +) { + current := make(map[string]struct{}) + for _, path := range expandGlobs(mt.patterns) { + current[path] = struct{}{} + if _, inFiles := files[path]; inFiles { + continue + } + if _, isRetrying := retrying[path]; isRetrying { + continue + } + // Newly matched file — try to open it right away. + fs, err := openAndSeekEOF(path, watcher) + if err != nil { + startRetry(path) + continue + } + files[path] = fs + log.Printf("tailer: discovered %s", path) + } + + // Retire files that no longer match any pattern and are not being rotated + // (rotation is handled by the RENAME/REMOVE event path, not here). + for path, fs := range files { + if _, matched := current[path]; !matched { + mt.readLines(fs.reader) + fs.f.Close() + _ = watcher.Remove(path) + delete(files, path) + log.Printf("tailer: retired %s (no longer matched by any pattern)", path) + } + } +} + // openAndSeekEOF opens path, seeks to EOF, and registers it with watcher. func openAndSeekEOF(path string, watcher *fsnotify.Watcher) (*fileState, error) { f, err := os.Open(path) diff --git a/cmd/collector/tailer_test.go b/cmd/collector/tailer_test.go index bfee9cc..e5a617c 100644 --- a/cmd/collector/tailer_test.go +++ b/cmd/collector/tailer_test.go @@ -28,7 +28,7 @@ func TestMultiTailerReadsLines(t *testing.T) { defer f.Close() ch := make(chan LogRecord, 100) - mt := NewMultiTailer([]string{path}, 24, 48, ch) + mt := NewMultiTailer([]string{path}, time.Hour, 24, 48, ch) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -67,7 +67,7 @@ func TestMultiTailerMultipleFiles(t *testing.T) { } ch := make(chan LogRecord, 200) - mt := NewMultiTailer(paths, 24, 48, ch) + mt := NewMultiTailer(paths, time.Hour, 24, 48, ch) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go mt.Run(ctx) @@ -95,7 +95,7 @@ func TestMultiTailerLogRotation(t *testing.T) { } ch := make(chan LogRecord, 100) - mt := NewMultiTailer([]string{path}, 24, 48, ch) + mt := NewMultiTailer([]string{path}, time.Hour, 24, 48, ch) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go mt.Run(ctx)