package main import ( "bufio" "context" "flag" "log" "net" "net/http" "os" "os/signal" "path/filepath" "strings" "syscall" "time" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" "google.golang.org/grpc" ) func main() { listen := flag.String("listen", ":9090", "gRPC listen address") promListen := flag.String("prom-listen", ":9100", "Prometheus metrics listen address (empty to disable)") logPaths := flag.String("logs", "", "comma-separated log file paths/globs to tail") logsFile := flag.String("logs-file", "", "file containing one log path/glob per line") source := flag.String("source", hostname(), "name for this collector (default: hostname)") v4prefix := flag.Int("v4prefix", 24, "IPv4 prefix length for client bucketing") v6prefix := flag.Int("v6prefix", 48, "IPv6 prefix length for client bucketing") scanInterval := flag.Duration("scan-interval", 10*time.Second, "how often to rescan glob patterns for new/removed files") flag.Parse() patterns := collectPatterns(*logPaths, *logsFile) if len(patterns) == 0 { log.Fatal("collector: no log paths specified; use --logs or --logs-file") } log.Printf("collector: watching %d pattern(s), rescan every %s", len(patterns), *scanInterval) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() // Shared channel: tailer → store. Buffer absorbs ~20s of peak load. ch := make(chan LogRecord, 200_000) store := NewStore(*source) if *promListen != "" { ps := NewPromStore() store.prom = ps mux := http.NewServeMux() mux.Handle("/metrics", ps) go func() { log.Printf("collector: Prometheus metrics on %s/metrics", *promListen) if err := http.ListenAndServe(*promListen, mux); err != nil { log.Fatalf("collector: Prometheus server: %v", err) } }() } go store.Run(ch) tailer := NewMultiTailer(patterns, *scanInterval, *v4prefix, *v6prefix, ch) go tailer.Run(ctx) lis, err := net.Listen("tcp", *listen) if err != nil { log.Fatalf("collector: failed to listen on %s: %v", *listen, err) } grpcServer := grpc.NewServer() pb.RegisterLogtailServiceServer(grpcServer, NewServer(store, *source)) go func() { log.Printf("collector: gRPC server listening on %s (source=%s)", *listen, *source) if err := grpcServer.Serve(lis); err != nil { log.Printf("collector: gRPC server stopped: %v", err) } }() <-ctx.Done() log.Printf("collector: shutting down") // GracefulStop waits for all RPCs to finish. StreamSnapshots subscribers // (e.g. the aggregator) hold a stream open indefinitely, so we give it a // short window and then force-stop to avoid hanging systemctl stop/restart. stopped := make(chan struct{}) go func() { grpcServer.GracefulStop() close(stopped) }() select { case <-stopped: case <-time.After(5 * time.Second): log.Printf("collector: graceful stop timed out, forcing stop") grpcServer.Stop() } close(ch) } // collectPatterns merges patterns from --logs (comma-separated) and --logs-file. func collectPatterns(logPaths, logsFile string) []string { var patterns []string for _, p := range strings.Split(logPaths, ",") { if p = strings.TrimSpace(p); p != "" { patterns = append(patterns, p) } } if logsFile != "" { f, err := os.Open(logsFile) if err != nil { log.Fatalf("collector: cannot open --logs-file %s: %v", logsFile, err) } defer f.Close() sc := bufio.NewScanner(f) for sc.Scan() { if p := strings.TrimSpace(sc.Text()); p != "" && !strings.HasPrefix(p, "#") { patterns = append(patterns, p) } } } return patterns } // expandGlobs expands any glob patterns and returns deduplicated concrete paths. func expandGlobs(patterns []string) []string { seen := make(map[string]struct{}) var paths []string for _, pat := range patterns { matches, err := filepath.Glob(pat) if err != nil { log.Printf("collector: invalid glob %q: %v", pat, err) continue } if len(matches) == 0 { // Keep the path even if it doesn't exist yet; the tailer will retry. log.Printf("collector: pattern %q matched no files, will watch for creation", pat) if _, ok := seen[pat]; !ok { seen[pat] = struct{}{} paths = append(paths, pat) } continue } for _, m := range matches { if _, ok := seen[m]; !ok { seen[m] = struct{}{} paths = append(paths, m) } } } return paths } func hostname() string { h, err := os.Hostname() if err != nil { return "unknown" } return h }