package httpserver import ( "context" "crypto/rand" "encoding/hex" "encoding/json" "errors" "io" "log/slog" "net/http" "strconv" "strings" "sync/atomic" "time" "chatappgateway/internal/apperr" "chatappgateway/internal/service/auth" payservice "chatappgateway/internal/service/pay" commonpb "gitea.haiyihy.com/hy/chatappcommon/proto" ) type requestIDKey struct{} // AuthService 定义 HTTP 层依赖的用户注册能力。 type AuthService interface { Register(ctx context.Context, request auth.RegisterRequest) (*commonpb.RegisterResponse, error) } // PayService 定义 HTTP 层依赖的支付下单能力。 type PayService interface { Pay(ctx context.Context, request payservice.Request) (*commonpb.PayResponse, error) } // ReadinessChecker 定义就绪检查能力,只有可接流量时才返回 nil。 type ReadinessChecker interface { Ready(ctx context.Context) error } // Server 包装整个 HTTP 服务。 type Server struct { appName string addr string shutdownTimeout time.Duration logger *slog.Logger authService AuthService payService PayService readiness ReadinessChecker drainHook DrainHook handler http.Handler ready atomic.Bool } // DrainHook 在服务摘除 readiness 后执行,例如注销注册中心节点。 type DrainHook interface { OnDeregister(context.Context) error } // New 创建 HTTP 服务实例。 func New(appName string, addr string, shutdownTimeout time.Duration, logger *slog.Logger, authService AuthService, payService PayService, readiness ReadinessChecker, drainHook DrainHook) *Server { server := &Server{ appName: appName, addr: addr, shutdownTimeout: shutdownTimeout, logger: logger, authService: authService, payService: payService, readiness: readiness, drainHook: drainHook, } server.ready.Store(true) server.handler = server.withRequestContext(server.routes()) return server } // Handler 返回可供测试复用的 HTTP handler。 func (s *Server) Handler() http.Handler { return s.handler } // Run 启动 HTTP 服务并在上下文取消后优雅关闭。 func (s *Server) Run(ctx context.Context) error { httpServer := &http.Server{ Addr: s.addr, Handler: s.handler, ReadHeaderTimeout: 5 * time.Second, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, IdleTimeout: 60 * time.Second, } go func() { <-ctx.Done() // 先摘除 readiness,通知上游停止继续发送新流量。 s.ready.Store(false) shutdownCtx, cancel := context.WithTimeout(context.Background(), s.shutdownTimeout) defer cancel() if s.drainHook != nil { if err := s.drainHook.OnDeregister(shutdownCtx); err != nil { s.logger.Error("run deregister hook failed", "error", err) } } _ = httpServer.Shutdown(shutdownCtx) }() s.logger.Info("http server started", "addr", s.addr) err := httpServer.ListenAndServe() if errors.Is(err, http.ErrServerClosed) { return nil } return err } func (s *Server) routes() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/health", s.handleHealth) mux.HandleFunc("/heath", s.handleHealth) mux.HandleFunc("/ready", s.handleReady) mux.HandleFunc("/api/v1/users/register", s.handleRegister) mux.HandleFunc("/api/v1/pay", s.handlePay) mux.HandleFunc("/", s.handleNotFound) return mux } func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, map[string]string{ "status": "ok", "service": s.appName, }) } func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) { if !s.ready.Load() { writeError(w, requestIDFromContext(r.Context()), apperr.New(http.StatusServiceUnavailable, "not_ready", "service is shutting down")) return } if s.readiness != nil { checkCtx, cancel := context.WithTimeout(r.Context(), 2*time.Second) defer cancel() if err := s.readiness.Ready(checkCtx); err != nil { writeError(w, requestIDFromContext(r.Context()), apperr.New(http.StatusServiceUnavailable, "not_ready", err.Error())) return } } writeJSON(w, http.StatusOK, map[string]string{ "status": "ready", "service": s.appName, }) } func (s *Server) handleRegister(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, requestIDFromContext(r.Context()), apperr.New(http.StatusMethodNotAllowed, "method_not_allowed", "method not allowed")) return } var request auth.RegisterRequest decoder := json.NewDecoder(r.Body) decoder.DisallowUnknownFields() if err := decoder.Decode(&request); err != nil { writeError(w, requestIDFromContext(r.Context()), apperr.New(http.StatusBadRequest, "bad_request", "invalid json body")) return } response, err := s.authService.Register(r.Context(), request) if err != nil { writeError(w, requestIDFromContext(r.Context()), err) return } writeEnvelope(w, http.StatusOK, requestIDFromContext(r.Context()), response) } func (s *Server) handlePay(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, requestIDFromContext(r.Context()), apperr.New(http.StatusMethodNotAllowed, "method_not_allowed", "method not allowed")) return } var request payservice.Request decoder := json.NewDecoder(r.Body) decoder.DisallowUnknownFields() if err := decoder.Decode(&request); err != nil { writeError(w, requestIDFromContext(r.Context()), apperr.New(http.StatusBadRequest, "bad_request", "invalid json body")) return } response, err := s.payService.Pay(r.Context(), request) if err != nil { writeError(w, requestIDFromContext(r.Context()), err) return } writeEnvelope(w, http.StatusOK, requestIDFromContext(r.Context()), response) } func (s *Server) handleNotFound(w http.ResponseWriter, r *http.Request) { writeError(w, requestIDFromContext(r.Context()), apperr.New(http.StatusNotFound, "not_found", "route not found")) } func (s *Server) withRequestContext(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { requestID := strings.TrimSpace(r.Header.Get("X-Request-Id")) if requestID == "" { requestID = newRequestID() } w.Header().Set("X-Request-Id", requestID) start := time.Now() writer := &statusCapturingResponseWriter{ResponseWriter: w, statusCode: http.StatusOK} ctx := context.WithValue(r.Context(), requestIDKey{}, requestID) next.ServeHTTP(writer, r.WithContext(ctx)) s.logger.Info("http request completed", "request_id", requestID, "method", r.Method, "path", r.URL.Path, "status", writer.statusCode, "duration", time.Since(start), ) }) } func writeEnvelope(w http.ResponseWriter, statusCode int, requestID string, data any) { writeJSON(w, statusCode, map[string]any{ "code": "ok", "message": "ok", "request_id": requestID, "data": data, }) } func writeError(w http.ResponseWriter, requestID string, err error) { statusCode, code, message := apperr.Resolve(err) writeJSON(w, statusCode, map[string]any{ "code": code, "message": message, "request_id": requestID, }) } func writeJSON(w http.ResponseWriter, statusCode int, payload any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(statusCode) _ = json.NewEncoder(w).Encode(payload) } func requestIDFromContext(ctx context.Context) string { requestID, _ := ctx.Value(requestIDKey{}).(string) return requestID } func newRequestID() string { // 优先使用随机值,避免在高并发下出现碰撞。 buffer := make([]byte, 8) if _, err := io.ReadFull(rand.Reader, buffer); err == nil { return hex.EncodeToString(buffer) } return strconv.FormatInt(time.Now().UnixNano(), 36) } type statusCapturingResponseWriter struct { http.ResponseWriter statusCode int } func (w *statusCapturingResponseWriter) WriteHeader(statusCode int) { w.statusCode = statusCode w.ResponseWriter.WriteHeader(statusCode) }