package main import ( "bufio" "context" "flag" "log" "net" "net/http" "os" "os/signal" "path/filepath" "strconv" "strings" "syscall" "time" pb "git.ipng.ch/ipng/nginx-logtail/proto/logtailpb" "google.golang.org/grpc" ) func main() { listen := flag.String("listen", envOr("COLLECTOR_LISTEN", ":9090"), "gRPC listen address (env: COLLECTOR_LISTEN)") promListen := flag.String("prom-listen", envOr("COLLECTOR_PROM_LISTEN", ":9100"), "Prometheus metrics listen address, empty to disable (env: COLLECTOR_PROM_LISTEN)") logPaths := flag.String("logs", envOr("COLLECTOR_LOGS", ""), "comma-separated log file paths/globs to tail (env: COLLECTOR_LOGS)") logsFile := flag.String("logs-file", envOr("COLLECTOR_LOGS_FILE", ""), "file containing one log path/glob per line (env: COLLECTOR_LOGS_FILE)") source := flag.String("source", envOr("COLLECTOR_SOURCE", hostname()), "name for this collector (env: COLLECTOR_SOURCE, default: hostname)") v4prefix := flag.Int("v4prefix", envOrInt("COLLECTOR_V4PREFIX", 24), "IPv4 prefix length for client bucketing (env: COLLECTOR_V4PREFIX)") v6prefix := flag.Int("v6prefix", envOrInt("COLLECTOR_V6PREFIX", 48), "IPv6 prefix length for client bucketing (env: COLLECTOR_V6PREFIX)") scanInterval := flag.Duration("scan-interval", envOrDuration("COLLECTOR_SCAN_INTERVAL", 10*time.Second), "how often to rescan glob patterns for new/removed files (env: COLLECTOR_SCAN_INTERVAL)") flag.Parse() patterns := collectPatterns(*logPaths, *logsFile) if len(patterns) == 0 { log.Fatal("collector: no log paths specified; use --logs or --logs-file") } log.Printf("collector: watching %d pattern(s), rescan every %s", len(patterns), *scanInterval) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() // Shared channel: tailer → store. Buffer absorbs ~20s of peak load. ch := make(chan LogRecord, 200_000) store := NewStore(*source) if *promListen != "" { ps := NewPromStore() store.prom = ps mux := http.NewServeMux() mux.Handle("/metrics", ps) go func() { log.Printf("collector: Prometheus metrics on %s/metrics", *promListen) if err := http.ListenAndServe(*promListen, mux); err != nil { log.Fatalf("collector: Prometheus server: %v", err) } }() } go store.Run(ch) tailer := NewMultiTailer(patterns, *scanInterval, *v4prefix, *v6prefix, ch) go tailer.Run(ctx) lis, err := net.Listen("tcp", *listen) if err != nil { log.Fatalf("collector: failed to listen on %s: %v", *listen, err) } grpcServer := grpc.NewServer() pb.RegisterLogtailServiceServer(grpcServer, NewServer(store, *source)) go func() { log.Printf("collector: gRPC server listening on %s (source=%s)", *listen, *source) if err := grpcServer.Serve(lis); err != nil { log.Printf("collector: gRPC server stopped: %v", err) } }() <-ctx.Done() log.Printf("collector: shutting down") // GracefulStop waits for all RPCs to finish. StreamSnapshots subscribers // (e.g. the aggregator) hold a stream open indefinitely, so we give it a // short window and then force-stop to avoid hanging systemctl stop/restart. stopped := make(chan struct{}) go func() { grpcServer.GracefulStop() close(stopped) }() select { case <-stopped: case <-time.After(5 * time.Second): log.Printf("collector: graceful stop timed out, forcing stop") grpcServer.Stop() } close(ch) } // collectPatterns merges patterns from --logs (comma-separated) and --logs-file. func collectPatterns(logPaths, logsFile string) []string { var patterns []string for _, p := range strings.Split(logPaths, ",") { if p = strings.TrimSpace(p); p != "" { patterns = append(patterns, p) } } if logsFile != "" { f, err := os.Open(logsFile) if err != nil { log.Fatalf("collector: cannot open --logs-file %s: %v", logsFile, err) } defer f.Close() sc := bufio.NewScanner(f) for sc.Scan() { if p := strings.TrimSpace(sc.Text()); p != "" && !strings.HasPrefix(p, "#") { patterns = append(patterns, p) } } } return patterns } // expandGlobs expands any glob patterns and returns deduplicated concrete paths. func expandGlobs(patterns []string) []string { seen := make(map[string]struct{}) var paths []string for _, pat := range patterns { matches, err := filepath.Glob(pat) if err != nil { log.Printf("collector: invalid glob %q: %v", pat, err) continue } if len(matches) == 0 { // Keep the path even if it doesn't exist yet; the tailer will retry. log.Printf("collector: pattern %q matched no files, will watch for creation", pat) if _, ok := seen[pat]; !ok { seen[pat] = struct{}{} paths = append(paths, pat) } continue } for _, m := range matches { if _, ok := seen[m]; !ok { seen[m] = struct{}{} paths = append(paths, m) } } } return paths } func hostname() string { h, err := os.Hostname() if err != nil { return "unknown" } return h } func envOr(key, def string) string { if v := os.Getenv(key); v != "" { return v } return def } func envOrInt(key string, def int) int { if v := os.Getenv(key); v != "" { if n, err := strconv.Atoi(v); err == nil { return n } log.Printf("collector: invalid int for %s=%q, using default %d", key, v, def) } return def } func envOrDuration(key string, def time.Duration) time.Duration { if v := os.Getenv(key); v != "" { if d, err := time.ParseDuration(v); err == nil { return d } log.Printf("collector: invalid duration for %s=%q, using default %s", key, v, def) } return def }