// SPDX-License-Identifier: Apache-2.0 package grpcapi import ( "context" "errors" "log/slog" "net" "sort" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "git.ipng.ch/ipng/vpp-maglev/internal/checker" "git.ipng.ch/ipng/vpp-maglev/internal/config" "git.ipng.ch/ipng/vpp-maglev/internal/health" "git.ipng.ch/ipng/vpp-maglev/internal/vpp" ) // Server implements the MaglevServer gRPC interface. type Server struct { UnimplementedMaglevServer ctx context.Context checker *checker.Checker logs *LogBroadcaster configPath string vppClient *vpp.Client // nil when VPP integration is disabled } // NewServer creates a Server backed by the given Checker. logs may be nil, in // which case log events are never sent to WatchEvents streams. configPath is // used by CheckConfig to reload and validate the configuration file on demand. // vppClient may be nil if VPP integration is disabled. The provided context // controls the lifetime of streaming RPCs: cancelling it closes all active // WatchEvents streams so that grpc.Server.GracefulStop can complete. func NewServer(ctx context.Context, c *checker.Checker, logs *LogBroadcaster, configPath string, vppClient *vpp.Client) *Server { return &Server{ctx: ctx, checker: c, logs: logs, configPath: configPath, vppClient: vppClient} } // ListFrontends returns the names of all configured frontends. func (s *Server) ListFrontends(_ context.Context, _ *ListFrontendsRequest) (*ListFrontendsResponse, error) { return &ListFrontendsResponse{FrontendNames: s.checker.ListFrontends()}, nil } // GetFrontend returns configuration details for a single frontend. func (s *Server) GetFrontend(_ context.Context, req *GetFrontendRequest) (*FrontendInfo, error) { fe, ok := s.checker.GetFrontend(req.Name) if !ok { return nil, status.Errorf(codes.NotFound, "frontend %q not found", req.Name) } return frontendToProto(req.Name, fe, s.checker), nil } // ListBackends returns the names of all active backends. func (s *Server) ListBackends(_ context.Context, _ *ListBackendsRequest) (*ListBackendsResponse, error) { return &ListBackendsResponse{BackendNames: s.checker.ListBackends()}, nil } // GetBackend returns health state for a backend by name. func (s *Server) GetBackend(_ context.Context, req *GetBackendRequest) (*BackendInfo, error) { b, ok := s.checker.GetBackend(req.Name) if !ok { return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name) } return backendToProto(b), nil } // PauseBackend pauses health checking for a backend by name. func (s *Server) PauseBackend(_ context.Context, req *BackendRequest) (*BackendInfo, error) { b, err := s.checker.PauseBackend(req.Name) if err != nil { return nil, status.Errorf(codes.FailedPrecondition, "%v", err) } return backendToProto(b), nil } // ResumeBackend resumes health checking for a backend by name. func (s *Server) ResumeBackend(_ context.Context, req *BackendRequest) (*BackendInfo, error) { b, err := s.checker.ResumeBackend(req.Name) if err != nil { return nil, status.Errorf(codes.FailedPrecondition, "%v", err) } return backendToProto(b), nil } // EnableBackend re-enables a previously disabled backend. func (s *Server) EnableBackend(_ context.Context, req *BackendRequest) (*BackendInfo, error) { b, ok := s.checker.EnableBackend(req.Name) if !ok { return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name) } return backendToProto(b), nil } // DisableBackend disables a backend, stopping its probe goroutine. func (s *Server) DisableBackend(_ context.Context, req *BackendRequest) (*BackendInfo, error) { b, ok := s.checker.DisableBackend(req.Name) if !ok { return nil, status.Errorf(codes.NotFound, "backend %q not found", req.Name) } return backendToProto(b), nil } // SetFrontendPoolBackendWeight updates the weight of a backend in a pool // and immediately pushes the change into VPP via a targeted single-VIP // sync. When req.Flush is true the backend's AS row is rewritten with // lb_as_set_weight(is_flush=true), which tears down VPP's flow table for // that AS so existing sessions are dropped; when false the flow table is // left alone and only Maglev's new-bucket mapping is updated, so existing // sessions keep reaching this backend until they naturally drain. func (s *Server) SetFrontendPoolBackendWeight(_ context.Context, req *SetWeightRequest) (*FrontendInfo, error) { if req.Weight < 0 || req.Weight > 100 { return nil, status.Errorf(codes.InvalidArgument, "weight %d out of range [0, 100]", req.Weight) } fe, err := s.checker.SetFrontendPoolBackendWeight(req.Frontend, req.Pool, req.Backend, int(req.Weight)) if err != nil { return nil, status.Errorf(codes.NotFound, "%v", err) } // Push the change into VPP so the operator doesn't have to wait // for the periodic 30s reconcile to pick it up. Silently skipped // when VPP integration is disabled — the mutation still lands in // config and any future sync will reconcile it. if s.vppClient != nil && s.vppClient.IsConnected() { cfg := s.checker.Config() flushAddr := "" if req.Flush { if b, ok := cfg.Backends[req.Backend]; ok && b.Address != nil { flushAddr = b.Address.String() } } if err := s.vppClient.SyncLBStateVIP(cfg, req.Frontend, flushAddr); err != nil && !errors.Is(err, vpp.ErrFrontendNotFound) { slog.Warn("set-weight-sync", "frontend", req.Frontend, "backend", req.Backend, "weight", req.Weight, "flush", req.Flush, "err", err) } } return frontendToProto(req.Frontend, fe, s.checker), nil } // ListHealthChecks returns the names of all configured health checks. func (s *Server) ListHealthChecks(_ context.Context, _ *ListHealthChecksRequest) (*ListHealthChecksResponse, error) { return &ListHealthChecksResponse{Names: s.checker.ListHealthChecks()}, nil } // GetHealthCheck returns the full configuration for a health check by name. func (s *Server) GetHealthCheck(_ context.Context, req *GetHealthCheckRequest) (*HealthCheckInfo, error) { hc, ok := s.checker.GetHealthCheck(req.Name) if !ok { return nil, status.Errorf(codes.NotFound, "healthcheck %q not found", req.Name) } return healthCheckToProto(req.Name, hc), nil } // WatchEvents streams events to the client. On connect, the current state of // every backend and/or frontend is sent as a synthetic event. Afterwards, // live events are forwarded based on the filter flags in req. An unset (nil) // flag defaults to true (subscribe). An empty log_level defaults to "info". func (s *Server) WatchEvents(req *WatchRequest, stream Maglev_WatchEventsServer) error { wantLog := req.Log == nil || *req.Log wantBackend := req.Backend == nil || *req.Backend wantFrontend := req.Frontend == nil || *req.Frontend logLevel := slog.LevelInfo if req.LogLevel != "" { if err := logLevel.UnmarshalText([]byte(req.LogLevel)); err != nil { return status.Errorf(codes.InvalidArgument, "invalid log_level %q: must be debug, info, warn, or error", req.LogLevel) } } // Subscribe to log events (nil channel blocks forever when not wanted). var logCh <-chan *LogEvent if wantLog && s.logs != nil { var unsub func() logCh, unsub = s.logs.Subscribe(logLevel) defer unsub() } // Subscribe to the checker event stream once; we demultiplex backend // and frontend events in the select below. Skip the subscription if // neither kind is wanted. var eventCh <-chan checker.Event if wantBackend || wantFrontend { var unsub func() eventCh, unsub = s.checker.Subscribe() defer unsub() } // Send initial state snapshot: one synthetic event per existing backend // (if wanted), and one per existing frontend (if wanted). Clients that // connect mid-flight see the current state immediately instead of // waiting for the next transition. if wantBackend { for _, name := range s.checker.ListBackends() { snap, ok := s.checker.GetBackend(name) if !ok { continue } ev := &Event{Event: &Event_Backend{Backend: &BackendEvent{ BackendName: name, Transition: &TransitionRecord{ From: snap.Health.State.String(), To: snap.Health.State.String(), AtUnixNs: 0, }, }}} if err := stream.Send(ev); err != nil { return err } } } if wantFrontend { for _, name := range s.checker.ListFrontends() { fs, ok := s.checker.FrontendState(name) if !ok { continue } ev := &Event{Event: &Event_Frontend{Frontend: &FrontendEvent{ FrontendName: name, Transition: &TransitionRecord{ From: fs.String(), To: fs.String(), AtUnixNs: 0, }, }}} if err := stream.Send(ev); err != nil { return err } } } for { select { case <-s.ctx.Done(): return status.Error(codes.Unavailable, "server shutting down") case <-stream.Context().Done(): return nil case le, ok := <-logCh: if !ok { return nil } if err := stream.Send(&Event{Event: &Event_Log{Log: le}}); err != nil { return err } case e, ok := <-eventCh: if !ok { return nil } if e.FrontendTransition != nil { if !wantFrontend { continue } if err := stream.Send(&Event{Event: &Event_Frontend{Frontend: &FrontendEvent{ FrontendName: e.FrontendName, Transition: &TransitionRecord{ From: e.FrontendTransition.From.String(), To: e.FrontendTransition.To.String(), AtUnixNs: e.FrontendTransition.At.UnixNano(), }, }}}); err != nil { return err } continue } if !wantBackend { continue } if err := stream.Send(&Event{Event: &Event_Backend{Backend: &BackendEvent{ BackendName: e.BackendName, Transition: transitionToProto(e.Transition), }}}); err != nil { return err } } } } // CheckConfig reads and validates the configuration file, returning a // structured result that distinguishes YAML parse errors from semantic errors. func (s *Server) CheckConfig(_ context.Context, _ *CheckConfigRequest) (*CheckConfigResponse, error) { slog.Info("config-check-start", "path", s.configPath) _, result := config.Check(s.configPath) resp := &CheckConfigResponse{ Ok: result.OK(), ParseError: result.ParseError, SemanticError: result.SemanticError, } if result.OK() { slog.Info("config-check-done", "result", "ok") } else if result.ParseError != "" { slog.Info("config-check-done", "result", "failed", "type", "parse", "err", result.ParseError) } else { slog.Info("config-check-done", "result", "failed", "type", "semantic", "err", result.SemanticError) } return resp, nil } // ReloadConfig checks the configuration file and, if valid, applies it to the // running checker. This is the same code path used by SIGHUP. func (s *Server) ReloadConfig(_ context.Context, _ *ReloadConfigRequest) (*ReloadConfigResponse, error) { return s.doReloadConfig(), nil } // TriggerReload performs a config check and reload. Intended for use by the // SIGHUP handler so that signals and gRPC share the same code path. func (s *Server) TriggerReload() { s.doReloadConfig() } func (s *Server) doReloadConfig() *ReloadConfigResponse { slog.Info("config-reload-start") newCfg, result := config.Check(s.configPath) if !result.OK() { if result.ParseError != "" { slog.Error("config-check-failed", "type", "parse", "err", result.ParseError) } else { slog.Error("config-check-failed", "type", "semantic", "err", result.SemanticError) } return &ReloadConfigResponse{ ParseError: result.ParseError, SemanticError: result.SemanticError, } } if err := s.checker.Reload(s.ctx, newCfg); err != nil { slog.Error("checker-reload-error", "err", err) return &ReloadConfigResponse{ ReloadError: err.Error(), } } // Push new global LB conf to VPP if anything changed. SetLBConf is a // no-op when VPP isn't connected or when the values are unchanged. if s.vppClient != nil { if err := s.vppClient.SetLBConf(newCfg); err != nil { slog.Warn("vpp-lb-conf-set-failed", "err", err) } } slog.Info("config-reload-done", "frontends", len(newCfg.Frontends)) return &ReloadConfigResponse{Ok: true} } // GetVPPInfo returns VPP version and runtime information. func (s *Server) GetVPPInfo(_ context.Context, _ *GetVPPInfoRequest) (*VPPInfo, error) { if s.vppClient == nil { return nil, status.Error(codes.Unavailable, "VPP integration is disabled") } info, err := s.vppClient.GetInfo() if err != nil { return nil, status.Errorf(codes.Unavailable, "%v", err) } var boottimeNs int64 if !info.BootTime.IsZero() { boottimeNs = info.BootTime.UnixNano() } return &VPPInfo{ Version: info.Version, BuildDate: info.BuildDate, BuildDirectory: info.BuildDirectory, Pid: info.PID, BoottimeNs: boottimeNs, ConnecttimeNs: info.ConnectedSince.UnixNano(), }, nil } // GetVPPLBState returns a snapshot of the VPP load-balancer plugin state. func (s *Server) GetVPPLBState(_ context.Context, _ *GetVPPLBStateRequest) (*VPPLBState, error) { if s.vppClient == nil { return nil, status.Error(codes.Unavailable, "VPP integration is disabled") } state, err := s.vppClient.GetLBStateAll() if err != nil { return nil, status.Errorf(codes.Unavailable, "%v", err) } return lbStateToProto(state), nil } // GetVPPLBCounters returns the most recent per-VIP counter snapshot // captured by the client's 5s scrape loop. The call is served from an // in-process cache and does not hit VPP. An empty response is returned // when VPP is disconnected or no scrape has completed yet. // // Per-backend counters are deliberately not included: the LB plugin's // forwarding node bypasses the ip{4,6}-lookup path, so the FIB's // /net/route/to counter at a backend's stats_index never ticks for // LB-forwarded traffic (see internal/vpp/lbstats.go::scrapeLBStats // for the full chain). Exposing a zero-valued "packets" column would // just mislead operators. func (s *Server) GetVPPLBCounters(_ context.Context, _ *GetVPPLBCountersRequest) (*VPPLBCounters, error) { if s.vppClient == nil { return nil, status.Error(codes.Unavailable, "VPP integration is disabled") } out := &VPPLBCounters{} for _, v := range s.vppClient.VIPStats() { out.Vips = append(out.Vips, &VPPLBVIPCounters{ Prefix: v.Prefix, Protocol: v.Protocol, Port: uint32(v.Port), NextPacket: v.NextPkt, FirstPacket: v.FirstPkt, UntrackedPacket: v.Untracked, NoServer: v.NoServer, Packets: v.Packets, Bytes: v.Bytes, }) } sort.Slice(out.Vips, func(i, j int) bool { if out.Vips[i].Prefix != out.Vips[j].Prefix { return out.Vips[i].Prefix < out.Vips[j].Prefix } if out.Vips[i].Protocol != out.Vips[j].Protocol { return out.Vips[i].Protocol < out.Vips[j].Protocol } return out.Vips[i].Port < out.Vips[j].Port }) return out, nil } // SyncVPPLBState runs the LB reconciler. With frontend_name unset it does a // full sync (SyncLBStateAll), which may remove stale VIPs. With frontend_name // set it does a single-VIP sync (SyncLBStateVIP) that only adds/updates. func (s *Server) SyncVPPLBState(_ context.Context, req *SyncVPPLBStateRequest) (*SyncVPPLBStateResponse, error) { if s.vppClient == nil { return nil, status.Error(codes.Unavailable, "VPP integration is disabled") } cfg := s.checker.Config() if req.FrontendName != nil && *req.FrontendName != "" { if err := s.vppClient.SyncLBStateVIP(cfg, *req.FrontendName, ""); err != nil { if errors.Is(err, vpp.ErrFrontendNotFound) { return nil, status.Errorf(codes.NotFound, "%v", err) } return nil, status.Errorf(codes.Unavailable, "%v", err) } return &SyncVPPLBStateResponse{}, nil } if err := s.vppClient.SyncLBStateAll(cfg); err != nil { return nil, status.Errorf(codes.Unavailable, "%v", err) } return &SyncVPPLBStateResponse{}, nil } // lbStateToProto converts the vpp package's LBState into the proto message. func lbStateToProto(s *vpp.LBState) *VPPLBState { out := &VPPLBState{ Conf: &VPPLBConf{ Ip4SrcAddress: ipStringOrEmpty(s.Conf.IP4SrcAddress), Ip6SrcAddress: ipStringOrEmpty(s.Conf.IP6SrcAddress), StickyBucketsPerCore: s.Conf.StickyBucketsPerCore, FlowTimeout: s.Conf.FlowTimeout, }, } for _, v := range s.VIPs { pv := &VPPLBVIP{ Prefix: v.Prefix.String(), Protocol: uint32(v.Protocol), Port: uint32(v.Port), Encap: v.Encap, FlowTableLength: uint32(v.FlowTableLength), SrcIpSticky: v.SrcIPSticky, } for _, a := range v.ASes { var ts int64 if !a.InUseSince.IsZero() { ts = a.InUseSince.UnixNano() } pv.ApplicationServers = append(pv.ApplicationServers, &VPPLBAS{ Address: a.Address.String(), Weight: uint32(a.Weight), Flags: uint32(a.Flags), NumBuckets: a.NumBuckets, InUseSinceNs: ts, }) } out.Vips = append(out.Vips, pv) } return out } func ipStringOrEmpty(ip net.IP) string { if len(ip) == 0 || ip.IsUnspecified() { return "" } return ip.String() } // ---- conversion helpers ---------------------------------------------------- func frontendToProto(name string, fe config.Frontend, src vpp.StateSource) *FrontendInfo { // Compute the state-aware effective weights once; these reflect the // pool-failover logic and what would be programmed into VPP. eff := vpp.EffectiveWeights(fe, src) pools := make([]*PoolInfo, 0, len(fe.Pools)) for poolIdx, p := range fe.Pools { pi := &PoolInfo{Name: p.Name} for bName, pb := range p.Backends { pi.Backends = append(pi.Backends, &PoolBackendInfo{ Name: bName, Weight: int32(pb.Weight), EffectiveWeight: int32(eff[poolIdx][bName]), }) } pools = append(pools, pi) } return &FrontendInfo{ Name: name, Address: fe.Address.String(), Protocol: fe.Protocol, Port: uint32(fe.Port), Description: fe.Description, Pools: pools, SrcIpSticky: fe.SrcIPSticky, FlushOnDown: fe.FlushOnDown, } } func backendToProto(snap checker.BackendSnapshot) *BackendInfo { info := &BackendInfo{ Name: snap.Health.Name, Address: snap.Health.Address.String(), State: snap.Health.State.String(), Enabled: snap.Config.Enabled, Healthcheck: snap.Config.HealthCheck, } for _, t := range snap.Health.Transitions { info.Transitions = append(info.Transitions, transitionToProto(t)) } return info } func healthCheckToProto(name string, hc config.HealthCheck) *HealthCheckInfo { info := &HealthCheckInfo{ Name: name, Type: hc.Type, Port: uint32(hc.Port), IntervalNs: hc.Interval.Nanoseconds(), FastIntervalNs: hc.FastInterval.Nanoseconds(), DownIntervalNs: hc.DownInterval.Nanoseconds(), TimeoutNs: hc.Timeout.Nanoseconds(), Rise: int32(hc.Rise), Fall: int32(hc.Fall), } if hc.ProbeIPv4Src != nil { info.ProbeIpv4Src = hc.ProbeIPv4Src.String() } if hc.ProbeIPv6Src != nil { info.ProbeIpv6Src = hc.ProbeIPv6Src.String() } if hc.HTTP != nil { re := "" if hc.HTTP.ResponseRegexp != nil { re = hc.HTTP.ResponseRegexp.String() } info.Http = &HTTPCheckParams{ Path: hc.HTTP.Path, Host: hc.HTTP.Host, ResponseCodeMin: int32(hc.HTTP.ResponseCodeMin), ResponseCodeMax: int32(hc.HTTP.ResponseCodeMax), ResponseRegexp: re, ServerName: hc.HTTP.ServerName, InsecureSkipVerify: hc.HTTP.InsecureSkipVerify, } } if hc.TCP != nil { info.Tcp = &TCPCheckParams{ Ssl: hc.TCP.SSL, ServerName: hc.TCP.ServerName, InsecureSkipVerify: hc.TCP.InsecureSkipVerify, } } return info } func transitionToProto(t health.Transition) *TransitionRecord { return &TransitionRecord{ From: t.From.String(), To: t.To.String(), AtUnixNs: t.At.UnixNano(), } } // Ensure net.IP is imported (used via b.Address.String()). var _ = net.IP{}