Auto-rediscover new glob patterns
This commit is contained in:
@@ -11,30 +11,27 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
|
pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
listen := flag.String("listen", ":9090", "gRPC listen address")
|
listen := flag.String("listen", ":9090", "gRPC listen address")
|
||||||
logPaths := flag.String("logs", "", "comma-separated log file paths/globs to tail")
|
logPaths := flag.String("logs", "", "comma-separated log file paths/globs to tail")
|
||||||
logsFile := flag.String("logs-file", "", "file containing one log path/glob per line")
|
logsFile := flag.String("logs-file", "", "file containing one log path/glob per line")
|
||||||
source := flag.String("source", hostname(), "name for this collector (default: hostname)")
|
source := flag.String("source", hostname(), "name for this collector (default: hostname)")
|
||||||
v4prefix := flag.Int("v4prefix", 24, "IPv4 prefix length for client bucketing")
|
v4prefix := flag.Int("v4prefix", 24, "IPv4 prefix length for client bucketing")
|
||||||
v6prefix := flag.Int("v6prefix", 48, "IPv6 prefix length for client bucketing")
|
v6prefix := flag.Int("v6prefix", 48, "IPv6 prefix length for client bucketing")
|
||||||
|
scanInterval := flag.Duration("scan-interval", 10*time.Second, "how often to rescan glob patterns for new/removed files")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
patterns := collectPatterns(*logPaths, *logsFile)
|
patterns := collectPatterns(*logPaths, *logsFile)
|
||||||
if len(patterns) == 0 {
|
if len(patterns) == 0 {
|
||||||
log.Fatal("collector: no log paths specified; use --logs or --logs-file")
|
log.Fatal("collector: no log paths specified; use --logs or --logs-file")
|
||||||
}
|
}
|
||||||
|
log.Printf("collector: watching %d pattern(s), rescan every %s", len(patterns), *scanInterval)
|
||||||
paths := expandGlobs(patterns)
|
|
||||||
if len(paths) == 0 {
|
|
||||||
log.Fatal("collector: no log files matched the specified patterns")
|
|
||||||
}
|
|
||||||
log.Printf("collector: tailing %d file(s)", len(paths))
|
|
||||||
|
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
defer stop()
|
defer stop()
|
||||||
@@ -45,7 +42,7 @@ func main() {
|
|||||||
store := NewStore(*source)
|
store := NewStore(*source)
|
||||||
go store.Run(ch)
|
go store.Run(ch)
|
||||||
|
|
||||||
tailer := NewMultiTailer(paths, *v4prefix, *v6prefix, ch)
|
tailer := NewMultiTailer(patterns, *scanInterval, *v4prefix, *v6prefix, ch)
|
||||||
go tailer.Run(ctx)
|
go tailer.Run(ctx)
|
||||||
|
|
||||||
lis, err := net.Listen("tcp", *listen)
|
lis, err := net.Listen("tcp", *listen)
|
||||||
|
|||||||
@@ -28,20 +28,25 @@ type reopenMsg struct {
|
|||||||
// fsnotify.Watcher (one inotify instance). This scales to hundreds of files
|
// fsnotify.Watcher (one inotify instance). This scales to hundreds of files
|
||||||
// without hitting the kernel limit on inotify instances per user.
|
// without hitting the kernel limit on inotify instances per user.
|
||||||
type MultiTailer struct {
|
type MultiTailer struct {
|
||||||
paths []string
|
patterns []string
|
||||||
v4bits int
|
scanInterval time.Duration
|
||||||
v6bits int
|
v4bits int
|
||||||
ch chan<- LogRecord
|
v6bits int
|
||||||
|
ch chan<- LogRecord
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMultiTailer(paths []string, v4bits, v6bits int, ch chan<- LogRecord) *MultiTailer {
|
func NewMultiTailer(patterns []string, scanInterval time.Duration, v4bits, v6bits int, ch chan<- LogRecord) *MultiTailer {
|
||||||
return &MultiTailer{paths: paths, v4bits: v4bits, v6bits: v6bits, ch: ch}
|
return &MultiTailer{patterns: patterns, scanInterval: scanInterval, v4bits: v4bits, v6bits: v6bits, ch: ch}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run tails all configured files until ctx is cancelled.
|
// Run tails all configured files until ctx is cancelled.
|
||||||
// All files share one fsnotify.Watcher. Log rotation is handled per-file:
|
// All files share one fsnotify.Watcher. Log rotation is handled per-file:
|
||||||
// on RENAME/REMOVE the old fd is drained then a retry goroutine re-opens
|
// on RENAME/REMOVE the old fd is drained then a retry goroutine re-opens
|
||||||
// the original path and hands it back via a channel.
|
// the original path and hands it back via a channel.
|
||||||
|
//
|
||||||
|
// A periodic rescan re-expands the glob patterns so that files created after
|
||||||
|
// startup are picked up automatically and files that have disappeared (and
|
||||||
|
// are no longer matched by any pattern) are retired.
|
||||||
func (mt *MultiTailer) Run(ctx context.Context) {
|
func (mt *MultiTailer) Run(ctx context.Context) {
|
||||||
watcher, err := fsnotify.NewWatcher()
|
watcher, err := fsnotify.NewWatcher()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -49,21 +54,33 @@ func (mt *MultiTailer) Run(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
defer watcher.Close()
|
defer watcher.Close()
|
||||||
|
|
||||||
files := make(map[string]*fileState, len(mt.paths))
|
files := make(map[string]*fileState)
|
||||||
reopenCh := make(chan reopenMsg, len(mt.paths))
|
retrying := make(map[string]struct{}) // paths currently in a retryOpen goroutine
|
||||||
|
reopenCh := make(chan reopenMsg, 32)
|
||||||
|
|
||||||
// Open all files and seek to EOF.
|
startRetry := func(path string) {
|
||||||
for _, path := range mt.paths {
|
if _, already := retrying[path]; already {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
retrying[path] = struct{}{}
|
||||||
|
go retryOpen(ctx, path, watcher, reopenCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initial scan.
|
||||||
|
for _, path := range expandGlobs(mt.patterns) {
|
||||||
fs, err := openAndSeekEOF(path, watcher)
|
fs, err := openAndSeekEOF(path, watcher)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("tailer: %s not found, will retry: %v", path, err)
|
log.Printf("tailer: %s not found, will retry: %v", path, err)
|
||||||
go retryOpen(ctx, path, watcher, reopenCh)
|
startRetry(path)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
files[path] = fs
|
files[path] = fs
|
||||||
log.Printf("tailer: watching %s", path)
|
log.Printf("tailer: watching %s", path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ticker := time.NewTicker(mt.scanInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@@ -76,12 +93,16 @@ func (mt *MultiTailer) Run(ctx context.Context) {
|
|||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
delete(retrying, msg.path)
|
||||||
files[msg.path] = &fileState{f: msg.f, reader: bufio.NewReader(msg.f)}
|
files[msg.path] = &fileState{f: msg.f, reader: bufio.NewReader(msg.f)}
|
||||||
if err := watcher.Add(msg.path); err != nil {
|
if err := watcher.Add(msg.path); err != nil {
|
||||||
log.Printf("tailer: watcher re-add failed for %s: %v", msg.path, err)
|
log.Printf("tailer: watcher re-add failed for %s: %v", msg.path, err)
|
||||||
}
|
}
|
||||||
log.Printf("tailer: re-opened %s after rotation", msg.path)
|
log.Printf("tailer: re-opened %s after rotation", msg.path)
|
||||||
|
|
||||||
|
case <-ticker.C:
|
||||||
|
mt.rescan(ctx, watcher, files, retrying, reopenCh, startRetry)
|
||||||
|
|
||||||
case event, ok := <-watcher.Events:
|
case event, ok := <-watcher.Events:
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
@@ -99,7 +120,7 @@ func (mt *MultiTailer) Run(ctx context.Context) {
|
|||||||
fs.f.Close()
|
fs.f.Close()
|
||||||
delete(files, event.Name)
|
delete(files, event.Name)
|
||||||
_ = watcher.Remove(event.Name)
|
_ = watcher.Remove(event.Name)
|
||||||
go retryOpen(ctx, event.Name, watcher, reopenCh)
|
startRetry(event.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
case err, ok := <-watcher.Errors:
|
case err, ok := <-watcher.Errors:
|
||||||
@@ -111,6 +132,49 @@ func (mt *MultiTailer) Run(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rescan re-expands the glob patterns and reconciles against the current file
|
||||||
|
// set: new matches are opened (or queued for retry), and files no longer
|
||||||
|
// matched by any pattern are drained, closed, and retired.
|
||||||
|
func (mt *MultiTailer) rescan(
|
||||||
|
ctx context.Context,
|
||||||
|
watcher *fsnotify.Watcher,
|
||||||
|
files map[string]*fileState,
|
||||||
|
retrying map[string]struct{},
|
||||||
|
_ chan reopenMsg,
|
||||||
|
startRetry func(string),
|
||||||
|
) {
|
||||||
|
current := make(map[string]struct{})
|
||||||
|
for _, path := range expandGlobs(mt.patterns) {
|
||||||
|
current[path] = struct{}{}
|
||||||
|
if _, inFiles := files[path]; inFiles {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, isRetrying := retrying[path]; isRetrying {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Newly matched file — try to open it right away.
|
||||||
|
fs, err := openAndSeekEOF(path, watcher)
|
||||||
|
if err != nil {
|
||||||
|
startRetry(path)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
files[path] = fs
|
||||||
|
log.Printf("tailer: discovered %s", path)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retire files that no longer match any pattern and are not being rotated
|
||||||
|
// (rotation is handled by the RENAME/REMOVE event path, not here).
|
||||||
|
for path, fs := range files {
|
||||||
|
if _, matched := current[path]; !matched {
|
||||||
|
mt.readLines(fs.reader)
|
||||||
|
fs.f.Close()
|
||||||
|
_ = watcher.Remove(path)
|
||||||
|
delete(files, path)
|
||||||
|
log.Printf("tailer: retired %s (no longer matched by any pattern)", path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// openAndSeekEOF opens path, seeks to EOF, and registers it with watcher.
|
// openAndSeekEOF opens path, seeks to EOF, and registers it with watcher.
|
||||||
func openAndSeekEOF(path string, watcher *fsnotify.Watcher) (*fileState, error) {
|
func openAndSeekEOF(path string, watcher *fsnotify.Watcher) (*fileState, error) {
|
||||||
f, err := os.Open(path)
|
f, err := os.Open(path)
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ func TestMultiTailerReadsLines(t *testing.T) {
|
|||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
ch := make(chan LogRecord, 100)
|
ch := make(chan LogRecord, 100)
|
||||||
mt := NewMultiTailer([]string{path}, 24, 48, ch)
|
mt := NewMultiTailer([]string{path}, time.Hour, 24, 48, ch)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
@@ -67,7 +67,7 @@ func TestMultiTailerMultipleFiles(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ch := make(chan LogRecord, 200)
|
ch := make(chan LogRecord, 200)
|
||||||
mt := NewMultiTailer(paths, 24, 48, ch)
|
mt := NewMultiTailer(paths, time.Hour, 24, 48, ch)
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
go mt.Run(ctx)
|
go mt.Run(ctx)
|
||||||
@@ -95,7 +95,7 @@ func TestMultiTailerLogRotation(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ch := make(chan LogRecord, 100)
|
ch := make(chan LogRecord, 100)
|
||||||
mt := NewMultiTailer([]string{path}, 24, 48, ch)
|
mt := NewMultiTailer([]string{path}, time.Hour, 24, 48, ch)
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
go mt.Run(ctx)
|
go mt.Run(ctx)
|
||||||
|
|||||||
Reference in New Issue
Block a user