diff --git a/config/local.yaml b/config/local.yaml index 89530e0..aae0093 100644 --- a/config/local.yaml +++ b/config/local.yaml @@ -4,6 +4,15 @@ app: http_addr: ":8080" shutdown_timeout: 10s +registry: + enabled: false + provider: "" + endpoint: "" + service_name: "chatappgateway" + instance_id: "" + register_timeout: 3s + deregister_timeout: 5s + # /ready 会使用这些 gRPC 依赖做接流量前检查。 grpc: user: diff --git a/config/prod.yaml b/config/prod.yaml index 1ca2fbf..fdce11e 100644 --- a/config/prod.yaml +++ b/config/prod.yaml @@ -4,6 +4,15 @@ app: http_addr: ":8080" shutdown_timeout: 10s +registry: + enabled: false + provider: "" + endpoint: "" + service_name: "chatappgateway" + instance_id: "" + register_timeout: 3s + deregister_timeout: 5s + grpc: # Current user instances: # - 10.0.11.17:9001 (user-1) diff --git a/config/prod.yaml.example b/config/prod.yaml.example index 1ca2fbf..fdce11e 100644 --- a/config/prod.yaml.example +++ b/config/prod.yaml.example @@ -4,6 +4,15 @@ app: http_addr: ":8080" shutdown_timeout: 10s +registry: + enabled: false + provider: "" + endpoint: "" + service_name: "chatappgateway" + instance_id: "" + register_timeout: 3s + deregister_timeout: 5s + grpc: # Current user instances: # - 10.0.11.17:9001 (user-1) diff --git a/go.mod b/go.mod index dd9c820..283af23 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module chatappgateway go 1.23.1 require ( - gitea.haiyihy.com/hy/chatappcommon v0.0.0 + gitea.haiyihy.com/hy/chatappcommon v0.1.1-0.20260404072625-9a68eb0302e5 google.golang.org/grpc v1.67.3 gopkg.in/yaml.v3 v3.0.1 ) @@ -15,5 +15,3 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect google.golang.org/protobuf v1.36.11 // indirect ) - -replace gitea.haiyihy.com/hy/chatappcommon => ../Common diff --git a/go.sum b/go.sum index f1e95eb..0667730 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +gitea.haiyihy.com/hy/chatappcommon v0.1.1-0.20260404072625-9a68eb0302e5 h1:RUMxeDXog9ryyLU25wQR44dQYDnDGgeAM+9X3Gvkm74= +gitea.haiyihy.com/hy/chatappcommon v0.1.1-0.20260404072625-9a68eb0302e5/go.mod h1:VXhR5abAucTWdJ7j+N09ddF57Pm5ZsKg0h55ejXjQ7s= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= diff --git a/internal/app/app.go b/internal/app/app.go index 34e3116..429bdd7 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -10,6 +10,7 @@ import ( "chatappgateway/internal/integration/paygrpc" "chatappgateway/internal/integration/upstream" "chatappgateway/internal/integration/usergrpc" + "chatappgateway/internal/lifecycle" "chatappgateway/internal/service/auth" "chatappgateway/internal/service/pay" httpserver "chatappgateway/internal/transport/http" @@ -19,6 +20,7 @@ import ( type Application struct { server *httpserver.Server closers []io.Closer + hooks lifecycle.Hooks } // New 构造完整应用,包括 gRPC 连接池、业务服务和 HTTP 服务。 @@ -39,6 +41,7 @@ func New(ctx context.Context, cfg config.Config, logger *slog.Logger) (*Applicat authService := auth.New(userClient) payService := pay.New(payClient) + registryHooks := lifecycle.NewRegistryHooks(cfg.Registry, logger.With("component", "registry_hooks")) readinessChecker := newReadinessGroup( namedChecker{ name: "ChatAppUser", @@ -49,16 +52,20 @@ func New(ctx context.Context, cfg config.Config, logger *slog.Logger) (*Applicat check: payPool.Ready, }, ) - server := httpserver.New(cfg.App.Name, cfg.App.HTTPAddr, cfg.App.ShutdownTimeout, logger, authService, payService, readinessChecker) + server := httpserver.New(cfg.App.Name, cfg.App.HTTPAddr, cfg.App.ShutdownTimeout, logger, authService, payService, readinessChecker, registryHooks) return &Application{ server: server, closers: []io.Closer{userPool, payPool}, + hooks: registryHooks, }, nil } // Run 启动 HTTP 服务。 func (a *Application) Run(ctx context.Context) error { + if err := a.hooks.OnRegister(ctx); err != nil { + return fmt.Errorf("register lifecycle hook: %w", err) + } return a.server.Run(ctx) } diff --git a/internal/config/config.go b/internal/config/config.go index 26d9c97..a47e9d8 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -14,8 +14,9 @@ const DefaultPath = "config/local.yaml" // Config 汇总整个网关服务的运行参数。 type Config struct { - App AppConfig `yaml:"app"` - GRPC GRPCConfig `yaml:"grpc"` + App AppConfig `yaml:"app"` + Registry RegistryConfig `yaml:"registry"` + GRPC GRPCConfig `yaml:"grpc"` } // AppConfig 描述应用自身参数。 @@ -26,6 +27,17 @@ type AppConfig struct { ShutdownTimeout time.Duration `yaml:"shutdown_timeout"` } +// RegistryConfig 预留服务注册中心接入参数,当前只保留注销钩子扩展点。 +type RegistryConfig struct { + Enabled bool `yaml:"enabled"` + Provider string `yaml:"provider"` + Endpoint string `yaml:"endpoint"` + ServiceName string `yaml:"service_name"` + InstanceID string `yaml:"instance_id"` + RegisterTimeout time.Duration `yaml:"register_timeout"` + DeregisterTimeout time.Duration `yaml:"deregister_timeout"` +} + // GRPCConfig 聚合所有下游 gRPC 服务配置。 type GRPCConfig struct { User UpstreamConfig `yaml:"user"` @@ -89,6 +101,10 @@ func defaultConfig() Config { HTTPAddr: ":8080", ShutdownTimeout: 10 * time.Second, }, + Registry: RegistryConfig{ + RegisterTimeout: 3 * time.Second, + DeregisterTimeout: 5 * time.Second, + }, GRPC: GRPCConfig{ User: defaultUpstreamConfig("127.0.0.1:9001"), Pay: defaultUpstreamConfig("127.0.0.1:9002"), @@ -137,6 +153,12 @@ func validate(cfg Config) error { if cfg.App.ShutdownTimeout <= 0 { return fmt.Errorf("app.shutdown_timeout must be greater than 0") } + if cfg.Registry.RegisterTimeout <= 0 { + return fmt.Errorf("registry.register_timeout must be greater than 0") + } + if cfg.Registry.DeregisterTimeout <= 0 { + return fmt.Errorf("registry.deregister_timeout must be greater than 0") + } if err := validateUpstream("grpc.user", cfg.GRPC.User); err != nil { return err } diff --git a/internal/lifecycle/registry.go b/internal/lifecycle/registry.go new file mode 100644 index 0000000..0903af2 --- /dev/null +++ b/internal/lifecycle/registry.go @@ -0,0 +1,43 @@ +package lifecycle + +import ( + "context" + "log/slog" + + "chatappgateway/internal/config" +) + +// Hooks 预留服务注册中心生命周期钩子。 +type Hooks interface { + OnRegister(context.Context) error + OnDeregister(context.Context) error +} + +type registryHooks struct { + cfg config.RegistryConfig + logger *slog.Logger +} + +// NewRegistryHooks 返回当前阶段的占位实现,后续可以替换成真实注册中心接入。 +func NewRegistryHooks(cfg config.RegistryConfig, logger *slog.Logger) Hooks { + return ®istryHooks{ + cfg: cfg, + logger: logger, + } +} + +func (h *registryHooks) OnRegister(_ context.Context) error { + if !h.cfg.Enabled { + return nil + } + h.logger.Info("registry register hook reserved", "provider", h.cfg.Provider, "service_name", h.cfg.ServiceName, "instance_id", h.cfg.InstanceID) + return nil +} + +func (h *registryHooks) OnDeregister(_ context.Context) error { + if !h.cfg.Enabled { + return nil + } + h.logger.Info("registry deregister hook reserved", "provider", h.cfg.Provider, "service_name", h.cfg.ServiceName, "instance_id", h.cfg.InstanceID) + return nil +} diff --git a/internal/transport/http/server.go b/internal/transport/http/server.go index 14079f7..b4c81bb 100644 --- a/internal/transport/http/server.go +++ b/internal/transport/http/server.go @@ -46,12 +46,18 @@ type Server struct { authService AuthService payService PayService readiness ReadinessChecker + drainHook DrainHook handler http.Handler ready atomic.Bool } +// DrainHook 在服务摘除 readiness 后执行,例如注销注册中心节点。 +type DrainHook interface { + OnDeregister(context.Context) error +} + // New 创建 HTTP 服务实例。 -func New(appName string, addr string, shutdownTimeout time.Duration, logger *slog.Logger, authService AuthService, payService PayService, readiness ReadinessChecker) *Server { +func New(appName string, addr string, shutdownTimeout time.Duration, logger *slog.Logger, authService AuthService, payService PayService, readiness ReadinessChecker, drainHook DrainHook) *Server { server := &Server{ appName: appName, addr: addr, @@ -60,6 +66,7 @@ func New(appName string, addr string, shutdownTimeout time.Duration, logger *slo authService: authService, payService: payService, readiness: readiness, + drainHook: drainHook, } server.ready.Store(true) server.handler = server.withRequestContext(server.routes()) @@ -88,6 +95,11 @@ func (s *Server) Run(ctx context.Context) error { s.ready.Store(false) shutdownCtx, cancel := context.WithTimeout(context.Background(), s.shutdownTimeout) defer cancel() + if s.drainHook != nil { + if err := s.drainHook.OnDeregister(shutdownCtx); err != nil { + s.logger.Error("run deregister hook failed", "error", err) + } + } _ = httpServer.Shutdown(shutdownCtx) }() diff --git a/internal/transport/http/server_test.go b/internal/transport/http/server_test.go index b1a84d1..2eca2e6 100644 --- a/internal/transport/http/server_test.go +++ b/internal/transport/http/server_test.go @@ -374,7 +374,7 @@ func newTestEnv(t *testing.T, userTimeout time.Duration, payTimeout time.Duratio authService := auth.New(usergrpc.New(commonpb.NewChatAppUserServiceClient(userConn), userTimeout)) payService := payservice.New(paygrpc.New(commonpb.NewChatAppPayServiceClient(payConn), payTimeout)) readinessChecker := &mockReadinessChecker{} - handler := httpserver.New("chatappgateway", ":0", 2*time.Second, logger, authService, payService, readinessChecker).Handler() + handler := httpserver.New("chatappgateway", ":0", 2*time.Second, logger, authService, payService, readinessChecker, nil).Handler() return &testEnv{ server: httptest.NewServer(handler),