Files
vpp-maglev/internal/grpcapi/server.go
Pim van Pelt d8ad89d115 When the server exits (^C or because docker/systemd exits it), streaming gRPC clients must be
closed. Currently, the server does not exit until the gRPC client disconnects.
2026-04-11 02:18:44 +02:00

218 lines
6.6 KiB
Go

// Copyright (c) 2026, Pim van Pelt <pim@ipng.ch>
package grpcapi
import (
"context"
"net"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"git.ipng.ch/ipng/vpp-maglev/internal/checker"
"git.ipng.ch/ipng/vpp-maglev/internal/config"
"git.ipng.ch/ipng/vpp-maglev/internal/health"
)
// Server implements the MaglevServer gRPC interface.
type Server struct {
UnimplementedMaglevServer
ctx context.Context
checker *checker.Checker
}
// NewServer creates a Server backed by the given Checker. The provided context
// controls the lifetime of streaming RPCs: cancelling it closes all active
// WatchBackendEvents streams so that grpc.Server.GracefulStop can complete.
func NewServer(ctx context.Context, c *checker.Checker) *Server {
return &Server{ctx: ctx, checker: c}
}
// ListFrontends returns the names of all configured frontends.
func (s *Server) ListFrontends(_ context.Context, _ *ListFrontendsRequest) (*ListFrontendsResponse, error) {
return &ListFrontendsResponse{FrontendNames: s.checker.ListFrontends()}, nil
}
// GetFrontend returns configuration details for a single frontend.
func (s *Server) GetFrontend(_ context.Context, req *GetFrontendRequest) (*FrontendInfo, error) {
fe, ok := s.checker.GetFrontend(req.Name)
if !ok {
return nil, status.Errorf(codes.NotFound, "frontend %q not found", req.Name)
}
return frontendToProto(req.Name, fe), nil
}
// ListBackends returns the names of all active backends.
func (s *Server) ListBackends(_ context.Context, _ *ListBackendsRequest) (*ListBackendsResponse, error) {
return &ListBackendsResponse{BackendNames: s.checker.ListBackends()}, nil
}
// GetBackend returns health state for a backend by name.
func (s *Server) GetBackend(_ context.Context, req *GetBackendRequest) (*BackendInfo, error) {
b, ok := s.checker.GetBackend(req.Name)
if !ok {
return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name)
}
return backendToProto(b), nil
}
// PauseBackend pauses health checking for a backend by name.
func (s *Server) PauseBackend(_ context.Context, req *PauseResumeRequest) (*BackendInfo, error) {
b, ok := s.checker.PauseBackend(req.Name)
if !ok {
return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name)
}
return backendToProto(b), nil
}
// ResumeBackend resumes health checking for a backend by name.
func (s *Server) ResumeBackend(_ context.Context, req *PauseResumeRequest) (*BackendInfo, error) {
b, ok := s.checker.ResumeBackend(req.Name)
if !ok {
return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name)
}
return backendToProto(b), nil
}
// ListHealthChecks returns the names of all configured health checks.
func (s *Server) ListHealthChecks(_ context.Context, _ *ListHealthChecksRequest) (*ListHealthChecksResponse, error) {
return &ListHealthChecksResponse{Names: s.checker.ListHealthChecks()}, nil
}
// GetHealthCheck returns the full configuration for a health check by name.
func (s *Server) GetHealthCheck(_ context.Context, req *GetHealthCheckRequest) (*HealthCheckInfo, error) {
hc, ok := s.checker.GetHealthCheck(req.Name)
if !ok {
return nil, status.Errorf(codes.NotFound, "healthcheck %q not found", req.Name)
}
return healthCheckToProto(req.Name, hc), nil
}
// WatchBackendEvents streams the current state of all backends on connect, then
// streams live state transitions until the client disconnects.
func (s *Server) WatchBackendEvents(_ *WatchRequest, stream Maglev_WatchBackendEventsServer) error {
// Send current state of all backends as synthetic events.
for _, name := range s.checker.ListBackends() {
snap, ok := s.checker.GetBackend(name)
if !ok {
continue
}
ev := &BackendEvent{
BackendName: name,
Transition: &TransitionRecord{
From: snap.Health.State.String(),
To: snap.Health.State.String(),
AtUnixNs: 0,
},
}
if err := stream.Send(ev); err != nil {
return err
}
}
ch, unsub := s.checker.Subscribe()
defer unsub()
for {
select {
case <-s.ctx.Done():
return status.Error(codes.Unavailable, "server shutting down")
case <-stream.Context().Done():
return nil
case e, ok := <-ch:
if !ok {
return nil
}
ev := &BackendEvent{
BackendName: e.BackendName,
Transition: transitionToProto(e.Transition),
}
if err := stream.Send(ev); err != nil {
return err
}
}
}
}
// ---- conversion helpers ----------------------------------------------------
func frontendToProto(name string, fe config.Frontend) *FrontendInfo {
return &FrontendInfo{
Name: name,
Address: fe.Address.String(),
Protocol: fe.Protocol,
Port: uint32(fe.Port),
Description: fe.Description,
BackendNames: fe.Backends,
}
}
func backendToProto(snap checker.BackendSnapshot) *BackendInfo {
info := &BackendInfo{
Name: snap.Health.Name,
Address: snap.Health.Address.String(),
State: snap.Health.State.String(),
Enabled: snap.Config.Enabled,
Weight: int32(snap.Config.Weight),
Healthcheck: snap.Config.HealthCheck,
}
for _, t := range snap.Health.Transitions {
info.Transitions = append(info.Transitions, transitionToProto(t))
}
return info
}
func healthCheckToProto(name string, hc config.HealthCheck) *HealthCheckInfo {
info := &HealthCheckInfo{
Name: name,
Type: hc.Type,
Port: uint32(hc.Port),
IntervalNs: hc.Interval.Nanoseconds(),
FastIntervalNs: hc.FastInterval.Nanoseconds(),
DownIntervalNs: hc.DownInterval.Nanoseconds(),
TimeoutNs: hc.Timeout.Nanoseconds(),
Rise: int32(hc.Rise),
Fall: int32(hc.Fall),
}
if hc.ProbeIPv4Src != nil {
info.ProbeIpv4Src = hc.ProbeIPv4Src.String()
}
if hc.ProbeIPv6Src != nil {
info.ProbeIpv6Src = hc.ProbeIPv6Src.String()
}
if hc.HTTP != nil {
re := ""
if hc.HTTP.ResponseRegexp != nil {
re = hc.HTTP.ResponseRegexp.String()
}
info.Http = &HTTPCheckParams{
Path: hc.HTTP.Path,
Host: hc.HTTP.Host,
ResponseCodeMin: int32(hc.HTTP.ResponseCodeMin),
ResponseCodeMax: int32(hc.HTTP.ResponseCodeMax),
ResponseRegexp: re,
ServerName: hc.HTTP.ServerName,
InsecureSkipVerify: hc.HTTP.InsecureSkipVerify,
}
}
if hc.TCP != nil {
info.Tcp = &TCPCheckParams{
Ssl: hc.TCP.SSL,
ServerName: hc.TCP.ServerName,
InsecureSkipVerify: hc.TCP.InsecureSkipVerify,
}
}
return info
}
func transitionToProto(t health.Transition) *TransitionRecord {
return &TransitionRecord{
From: t.From.String(),
To: t.To.String(),
AtUnixNs: t.At.UnixNano(),
}
}
// Ensure net.IP is imported (used via b.Address.String()).
var _ = net.IP{}