diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 891e4ec..e6c3229 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -61,7 +61,22 @@ func main() { <-ctx.Done() log.Printf("collector: shutting down") - grpcServer.GracefulStop() + + // GracefulStop waits for all RPCs to finish. StreamSnapshots subscribers + // (e.g. the aggregator) hold a stream open indefinitely, so we give it a + // short window and then force-stop to avoid hanging systemctl stop/restart. + stopped := make(chan struct{}) + go func() { + grpcServer.GracefulStop() + close(stopped) + }() + select { + case <-stopped: + case <-time.After(5 * time.Second): + log.Printf("collector: graceful stop timed out, forcing stop") + grpcServer.Stop() + } + close(ch) }