180 lines
4.3 KiB
Go
180 lines
4.3 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/fsnotify/fsnotify"
|
|
)
|
|
|
|
// fileState holds the open file handle and buffered reader for one log file.
|
|
type fileState struct {
|
|
f *os.File
|
|
reader *bufio.Reader
|
|
}
|
|
|
|
// reopenMsg is sent by a retry goroutine back to the Run loop when a rotated
|
|
// file has reappeared and is ready to be watched again.
|
|
type reopenMsg struct {
|
|
path string
|
|
f *os.File
|
|
}
|
|
|
|
// MultiTailer tails any number of log files using a single shared
|
|
// fsnotify.Watcher (one inotify instance). This scales to hundreds of files
|
|
// without hitting the kernel limit on inotify instances per user.
|
|
type MultiTailer struct {
|
|
paths []string
|
|
v4bits int
|
|
v6bits int
|
|
ch chan<- LogRecord
|
|
}
|
|
|
|
func NewMultiTailer(paths []string, v4bits, v6bits int, ch chan<- LogRecord) *MultiTailer {
|
|
return &MultiTailer{paths: paths, v4bits: v4bits, v6bits: v6bits, ch: ch}
|
|
}
|
|
|
|
// Run tails all configured files until ctx is cancelled.
|
|
// All files share one fsnotify.Watcher. Log rotation is handled per-file:
|
|
// on RENAME/REMOVE the old fd is drained then a retry goroutine re-opens
|
|
// the original path and hands it back via a channel.
|
|
func (mt *MultiTailer) Run(ctx context.Context) {
|
|
watcher, err := fsnotify.NewWatcher()
|
|
if err != nil {
|
|
log.Fatalf("tailer: failed to create watcher: %v", err)
|
|
}
|
|
defer watcher.Close()
|
|
|
|
files := make(map[string]*fileState, len(mt.paths))
|
|
reopenCh := make(chan reopenMsg, len(mt.paths))
|
|
|
|
// Open all files and seek to EOF.
|
|
for _, path := range mt.paths {
|
|
fs, err := openAndSeekEOF(path, watcher)
|
|
if err != nil {
|
|
log.Printf("tailer: %s not found, will retry: %v", path, err)
|
|
go retryOpen(ctx, path, watcher, reopenCh)
|
|
continue
|
|
}
|
|
files[path] = fs
|
|
log.Printf("tailer: watching %s", path)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
for _, fs := range files {
|
|
fs.f.Close()
|
|
}
|
|
return
|
|
|
|
case msg, ok := <-reopenCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
files[msg.path] = &fileState{f: msg.f, reader: bufio.NewReader(msg.f)}
|
|
if err := watcher.Add(msg.path); err != nil {
|
|
log.Printf("tailer: watcher re-add failed for %s: %v", msg.path, err)
|
|
}
|
|
log.Printf("tailer: re-opened %s after rotation", msg.path)
|
|
|
|
case event, ok := <-watcher.Events:
|
|
if !ok {
|
|
return
|
|
}
|
|
fs, known := files[event.Name]
|
|
if !known {
|
|
continue
|
|
}
|
|
if event.Has(fsnotify.Write) {
|
|
mt.readLines(fs.reader)
|
|
}
|
|
if event.Has(fsnotify.Rename) || event.Has(fsnotify.Remove) {
|
|
// Drain remaining bytes in the old fd before it disappears.
|
|
mt.readLines(fs.reader)
|
|
fs.f.Close()
|
|
delete(files, event.Name)
|
|
_ = watcher.Remove(event.Name)
|
|
go retryOpen(ctx, event.Name, watcher, reopenCh)
|
|
}
|
|
|
|
case err, ok := <-watcher.Errors:
|
|
if !ok {
|
|
return
|
|
}
|
|
log.Printf("tailer: watcher error: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// openAndSeekEOF opens path, seeks to EOF, and registers it with watcher.
|
|
func openAndSeekEOF(path string, watcher *fsnotify.Watcher) (*fileState, error) {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if _, err := f.Seek(0, io.SeekEnd); err != nil {
|
|
f.Close()
|
|
return nil, err
|
|
}
|
|
if err := watcher.Add(path); err != nil {
|
|
f.Close()
|
|
return nil, err
|
|
}
|
|
return &fileState{f: f, reader: bufio.NewReader(f)}, nil
|
|
}
|
|
|
|
// retryOpen polls until path exists again (after log rotation), then sends
|
|
// the open file back on ch. Exits if ctx is cancelled.
|
|
func retryOpen(ctx context.Context, path string, watcher *fsnotify.Watcher, ch chan<- reopenMsg) {
|
|
backoff := 100 * time.Millisecond
|
|
const maxBackoff = 5 * time.Second
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(backoff):
|
|
}
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
backoff *= 2
|
|
if backoff > maxBackoff {
|
|
backoff = maxBackoff
|
|
}
|
|
continue
|
|
}
|
|
ch <- reopenMsg{path: path, f: f}
|
|
return
|
|
}
|
|
}
|
|
|
|
// readLines reads all complete lines currently available and emits records.
|
|
func (mt *MultiTailer) readLines(reader *bufio.Reader) {
|
|
for {
|
|
line, err := reader.ReadString('\n')
|
|
if len(line) > 0 {
|
|
l := line
|
|
if l[len(l)-1] == '\n' {
|
|
l = l[:len(l)-1]
|
|
}
|
|
if len(l) > 0 && l[len(l)-1] == '\r' {
|
|
l = l[:len(l)-1]
|
|
}
|
|
if rec, ok := ParseLine(l, mt.v4bits, mt.v6bits); ok {
|
|
select {
|
|
case mt.ch <- rec:
|
|
default:
|
|
// Channel full — drop rather than block the event loop.
|
|
}
|
|
}
|
|
}
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|