Files
nginx-logtail/cmd/collector/tailer.go

244 lines
6.2 KiB
Go

package main
import (
"bufio"
"context"
"io"
"log"
"os"
"time"
"github.com/fsnotify/fsnotify"
)
// fileState holds the open file handle and buffered reader for one log file.
type fileState struct {
f *os.File
reader *bufio.Reader
}
// reopenMsg is sent by a retry goroutine back to the Run loop when a rotated
// file has reappeared and is ready to be watched again.
type reopenMsg struct {
path string
f *os.File
}
// MultiTailer tails any number of log files using a single shared
// fsnotify.Watcher (one inotify instance). This scales to hundreds of files
// without hitting the kernel limit on inotify instances per user.
type MultiTailer struct {
patterns []string
scanInterval time.Duration
v4bits int
v6bits int
ch chan<- LogRecord
}
func NewMultiTailer(patterns []string, scanInterval time.Duration, v4bits, v6bits int, ch chan<- LogRecord) *MultiTailer {
return &MultiTailer{patterns: patterns, scanInterval: scanInterval, v4bits: v4bits, v6bits: v6bits, ch: ch}
}
// Run tails all configured files until ctx is cancelled.
// All files share one fsnotify.Watcher. Log rotation is handled per-file:
// on RENAME/REMOVE the old fd is drained then a retry goroutine re-opens
// the original path and hands it back via a channel.
//
// A periodic rescan re-expands the glob patterns so that files created after
// startup are picked up automatically and files that have disappeared (and
// are no longer matched by any pattern) are retired.
func (mt *MultiTailer) Run(ctx context.Context) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatalf("tailer: failed to create watcher: %v", err)
}
defer watcher.Close()
files := make(map[string]*fileState)
retrying := make(map[string]struct{}) // paths currently in a retryOpen goroutine
reopenCh := make(chan reopenMsg, 32)
startRetry := func(path string) {
if _, already := retrying[path]; already {
return
}
retrying[path] = struct{}{}
go retryOpen(ctx, path, watcher, reopenCh)
}
// Initial scan.
for _, path := range expandGlobs(mt.patterns) {
fs, err := openAndSeekEOF(path, watcher)
if err != nil {
log.Printf("tailer: %s not found, will retry: %v", path, err)
startRetry(path)
continue
}
files[path] = fs
log.Printf("tailer: watching %s", path)
}
ticker := time.NewTicker(mt.scanInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
for _, fs := range files {
fs.f.Close()
}
return
case msg, ok := <-reopenCh:
if !ok {
return
}
delete(retrying, msg.path)
files[msg.path] = &fileState{f: msg.f, reader: bufio.NewReader(msg.f)}
if err := watcher.Add(msg.path); err != nil {
log.Printf("tailer: watcher re-add failed for %s: %v", msg.path, err)
}
log.Printf("tailer: re-opened %s after rotation", msg.path)
case <-ticker.C:
mt.rescan(ctx, watcher, files, retrying, reopenCh, startRetry)
case event, ok := <-watcher.Events:
if !ok {
return
}
fs, known := files[event.Name]
if !known {
continue
}
if event.Has(fsnotify.Write) {
mt.readLines(fs.reader)
}
if event.Has(fsnotify.Rename) || event.Has(fsnotify.Remove) {
// Drain remaining bytes in the old fd before it disappears.
mt.readLines(fs.reader)
fs.f.Close()
delete(files, event.Name)
_ = watcher.Remove(event.Name)
startRetry(event.Name)
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Printf("tailer: watcher error: %v", err)
}
}
}
// rescan re-expands the glob patterns and reconciles against the current file
// set: new matches are opened (or queued for retry), and files no longer
// matched by any pattern are drained, closed, and retired.
func (mt *MultiTailer) rescan(
ctx context.Context,
watcher *fsnotify.Watcher,
files map[string]*fileState,
retrying map[string]struct{},
_ chan reopenMsg,
startRetry func(string),
) {
current := make(map[string]struct{})
for _, path := range expandGlobs(mt.patterns) {
current[path] = struct{}{}
if _, inFiles := files[path]; inFiles {
continue
}
if _, isRetrying := retrying[path]; isRetrying {
continue
}
// Newly matched file — try to open it right away.
fs, err := openAndSeekEOF(path, watcher)
if err != nil {
startRetry(path)
continue
}
files[path] = fs
log.Printf("tailer: discovered %s", path)
}
// Retire files that no longer match any pattern and are not being rotated
// (rotation is handled by the RENAME/REMOVE event path, not here).
for path, fs := range files {
if _, matched := current[path]; !matched {
mt.readLines(fs.reader)
fs.f.Close()
_ = watcher.Remove(path)
delete(files, path)
log.Printf("tailer: retired %s (no longer matched by any pattern)", path)
}
}
}
// openAndSeekEOF opens path, seeks to EOF, and registers it with watcher.
func openAndSeekEOF(path string, watcher *fsnotify.Watcher) (*fileState, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
if _, err := f.Seek(0, io.SeekEnd); err != nil {
f.Close()
return nil, err
}
if err := watcher.Add(path); err != nil {
f.Close()
return nil, err
}
return &fileState{f: f, reader: bufio.NewReader(f)}, nil
}
// retryOpen polls until path exists again (after log rotation), then sends
// the open file back on ch. Exits if ctx is cancelled.
func retryOpen(ctx context.Context, path string, watcher *fsnotify.Watcher, ch chan<- reopenMsg) {
backoff := 100 * time.Millisecond
const maxBackoff = 5 * time.Second
for {
select {
case <-ctx.Done():
return
case <-time.After(backoff):
}
f, err := os.Open(path)
if err != nil {
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
continue
}
ch <- reopenMsg{path: path, f: f}
return
}
}
// readLines reads all complete lines currently available and emits records.
func (mt *MultiTailer) readLines(reader *bufio.Reader) {
for {
line, err := reader.ReadString('\n')
if len(line) > 0 {
l := line
if l[len(l)-1] == '\n' {
l = l[:len(l)-1]
}
if len(l) > 0 && l[len(l)-1] == '\r' {
l = l[:len(l)-1]
}
if rec, ok := ParseLine(l, mt.v4bits, mt.v6bits); ok {
select {
case mt.ch <- rec:
default:
// Channel full — drop rather than block the event loop.
}
}
}
if err != nil {
return
}
}
}