// Copyright (c) 2026, Pim van Pelt package grpcapi import ( "context" "net" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "git.ipng.ch/ipng/vpp-maglev/internal/checker" "git.ipng.ch/ipng/vpp-maglev/internal/config" "git.ipng.ch/ipng/vpp-maglev/internal/health" ) // Server implements the MaglevServer gRPC interface. type Server struct { UnimplementedMaglevServer ctx context.Context checker *checker.Checker } // NewServer creates a Server backed by the given Checker. The provided context // controls the lifetime of streaming RPCs: cancelling it closes all active // WatchBackendEvents streams so that grpc.Server.GracefulStop can complete. func NewServer(ctx context.Context, c *checker.Checker) *Server { return &Server{ctx: ctx, checker: c} } // ListFrontends returns the names of all configured frontends. func (s *Server) ListFrontends(_ context.Context, _ *ListFrontendsRequest) (*ListFrontendsResponse, error) { return &ListFrontendsResponse{FrontendNames: s.checker.ListFrontends()}, nil } // GetFrontend returns configuration details for a single frontend. func (s *Server) GetFrontend(_ context.Context, req *GetFrontendRequest) (*FrontendInfo, error) { fe, ok := s.checker.GetFrontend(req.Name) if !ok { return nil, status.Errorf(codes.NotFound, "frontend %q not found", req.Name) } return frontendToProto(req.Name, fe), nil } // ListBackends returns the names of all active backends. func (s *Server) ListBackends(_ context.Context, _ *ListBackendsRequest) (*ListBackendsResponse, error) { return &ListBackendsResponse{BackendNames: s.checker.ListBackends()}, nil } // GetBackend returns health state for a backend by name. func (s *Server) GetBackend(_ context.Context, req *GetBackendRequest) (*BackendInfo, error) { b, ok := s.checker.GetBackend(req.Name) if !ok { return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name) } return backendToProto(b), nil } // PauseBackend pauses health checking for a backend by name. func (s *Server) PauseBackend(_ context.Context, req *PauseResumeRequest) (*BackendInfo, error) { b, ok := s.checker.PauseBackend(req.Name) if !ok { return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name) } return backendToProto(b), nil } // ResumeBackend resumes health checking for a backend by name. func (s *Server) ResumeBackend(_ context.Context, req *PauseResumeRequest) (*BackendInfo, error) { b, ok := s.checker.ResumeBackend(req.Name) if !ok { return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name) } return backendToProto(b), nil } // ListHealthChecks returns the names of all configured health checks. func (s *Server) ListHealthChecks(_ context.Context, _ *ListHealthChecksRequest) (*ListHealthChecksResponse, error) { return &ListHealthChecksResponse{Names: s.checker.ListHealthChecks()}, nil } // GetHealthCheck returns the full configuration for a health check by name. func (s *Server) GetHealthCheck(_ context.Context, req *GetHealthCheckRequest) (*HealthCheckInfo, error) { hc, ok := s.checker.GetHealthCheck(req.Name) if !ok { return nil, status.Errorf(codes.NotFound, "healthcheck %q not found", req.Name) } return healthCheckToProto(req.Name, hc), nil } // WatchBackendEvents streams the current state of all backends on connect, then // streams live state transitions until the client disconnects. func (s *Server) WatchBackendEvents(_ *WatchRequest, stream Maglev_WatchBackendEventsServer) error { // Send current state of all backends as synthetic events. for _, name := range s.checker.ListBackends() { snap, ok := s.checker.GetBackend(name) if !ok { continue } ev := &BackendEvent{ BackendName: name, Transition: &TransitionRecord{ From: snap.Health.State.String(), To: snap.Health.State.String(), AtUnixNs: 0, }, } if err := stream.Send(ev); err != nil { return err } } ch, unsub := s.checker.Subscribe() defer unsub() for { select { case <-s.ctx.Done(): return status.Error(codes.Unavailable, "server shutting down") case <-stream.Context().Done(): return nil case e, ok := <-ch: if !ok { return nil } ev := &BackendEvent{ BackendName: e.BackendName, Transition: transitionToProto(e.Transition), } if err := stream.Send(ev); err != nil { return err } } } } // ---- conversion helpers ---------------------------------------------------- func frontendToProto(name string, fe config.Frontend) *FrontendInfo { pools := make([]*PoolInfo, 0, len(fe.Pools)) for _, p := range fe.Pools { pi := &PoolInfo{Name: p.Name} for bName, pb := range p.Backends { pi.Backends = append(pi.Backends, &PoolBackendInfo{ Name: bName, Weight: int32(pb.Weight), }) } pools = append(pools, pi) } return &FrontendInfo{ Name: name, Address: fe.Address.String(), Protocol: fe.Protocol, Port: uint32(fe.Port), Description: fe.Description, Pools: pools, } } func backendToProto(snap checker.BackendSnapshot) *BackendInfo { info := &BackendInfo{ Name: snap.Health.Name, Address: snap.Health.Address.String(), State: snap.Health.State.String(), Enabled: snap.Config.Enabled, Healthcheck: snap.Config.HealthCheck, } for _, t := range snap.Health.Transitions { info.Transitions = append(info.Transitions, transitionToProto(t)) } return info } func healthCheckToProto(name string, hc config.HealthCheck) *HealthCheckInfo { info := &HealthCheckInfo{ Name: name, Type: hc.Type, Port: uint32(hc.Port), IntervalNs: hc.Interval.Nanoseconds(), FastIntervalNs: hc.FastInterval.Nanoseconds(), DownIntervalNs: hc.DownInterval.Nanoseconds(), TimeoutNs: hc.Timeout.Nanoseconds(), Rise: int32(hc.Rise), Fall: int32(hc.Fall), } if hc.ProbeIPv4Src != nil { info.ProbeIpv4Src = hc.ProbeIPv4Src.String() } if hc.ProbeIPv6Src != nil { info.ProbeIpv6Src = hc.ProbeIPv6Src.String() } if hc.HTTP != nil { re := "" if hc.HTTP.ResponseRegexp != nil { re = hc.HTTP.ResponseRegexp.String() } info.Http = &HTTPCheckParams{ Path: hc.HTTP.Path, Host: hc.HTTP.Host, ResponseCodeMin: int32(hc.HTTP.ResponseCodeMin), ResponseCodeMax: int32(hc.HTTP.ResponseCodeMax), ResponseRegexp: re, ServerName: hc.HTTP.ServerName, InsecureSkipVerify: hc.HTTP.InsecureSkipVerify, } } if hc.TCP != nil { info.Tcp = &TCPCheckParams{ Ssl: hc.TCP.SSL, ServerName: hc.TCP.ServerName, InsecureSkipVerify: hc.TCP.InsecureSkipVerify, } } return info } func transitionToProto(t health.Transition) *TransitionRecord { return &TransitionRecord{ From: t.From.String(), To: t.To.String(), AtUnixNs: t.At.UnixNano(), } } // Ensure net.IP is imported (used via b.Address.String()). var _ = net.IP{}