From 507445f84a8bb3043f53687b052b9629f5609e0f Mon Sep 17 00:00:00 2001 From: ZuoZuo <68836346+Mrz-sakura@users.noreply.github.com> Date: Mon, 6 Apr 2026 17:08:51 +0800 Subject: [PATCH] deploy --- .gitea/workflows/upload-release.yml | 97 +++ config/local.yaml | 22 +- config/prod.yaml | 40 + config/prod.yaml.example | 40 + .../systemd/chatapp-gateway.service.example | 19 + docs/apifox/README.md | 19 + docs/apifox/gateway-openapi.json | 741 ++++++++++++++++++ internal/app/app.go | 36 +- internal/app/readiness.go | 27 - internal/config/config.go | 99 ++- internal/config/config_test.go | 48 +- internal/integration/paygrpc/client.go | 22 +- internal/integration/upstream/pool.go | 392 +++++++++ internal/integration/upstream/pool_test.go | 110 +++ internal/integration/usergrpc/client.go | 22 +- 15 files changed, 1644 insertions(+), 90 deletions(-) create mode 100644 .gitea/workflows/upload-release.yml create mode 100644 config/prod.yaml create mode 100644 config/prod.yaml.example create mode 100644 deploy/systemd/chatapp-gateway.service.example create mode 100644 docs/apifox/README.md create mode 100644 docs/apifox/gateway-openapi.json create mode 100644 internal/integration/upstream/pool.go create mode 100644 internal/integration/upstream/pool_test.go diff --git a/.gitea/workflows/upload-release.yml b/.gitea/workflows/upload-release.yml new file mode 100644 index 0000000..45b66fc --- /dev/null +++ b/.gitea/workflows/upload-release.yml @@ -0,0 +1,97 @@ +name: upload-release + +on: + workflow_dispatch: + inputs: + release_id: + description: "Shared release id, for example 20260406-abc1234" + required: true + +env: + SERVICE_NAME: "gateway" + BINARY_NAME: "gateway" + BUILD_TARGET: "./cmd/gateway" + CONFIG_SOURCE: "config/prod.yaml" + COS_BUCKET: "app-release-1417798587" + COS_REGION: "me-saudi-arabia" + COS_PREFIX: "releases/prod" + +jobs: + build-and-upload: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version-file: go.mod + + - name: Test + run: go test ./... + + - name: Build package + shell: bash + run: | + set -Eeuo pipefail + test -f "${CONFIG_SOURCE}" + RELEASE_ID="${{ github.event.inputs.release_id }}" + ROOT="$(pwd)" + OUT_DIR="${ROOT}/dist/${SERVICE_NAME}" + PKG_DIR="${OUT_DIR}/package" + + rm -rf "${OUT_DIR}" + mkdir -p "${PKG_DIR}/bin" "${PKG_DIR}/config" + + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \ + go build -trimpath -ldflags="-s -w" \ + -o "${PKG_DIR}/bin/${BINARY_NAME}" \ + "${BUILD_TARGET}" + + cp "${CONFIG_SOURCE}" "${PKG_DIR}/config/prod.yaml" + + tar -C "${PKG_DIR}" -czf "${OUT_DIR}/${SERVICE_NAME}.tgz" . + sha256sum "${OUT_DIR}/${SERVICE_NAME}.tgz" | awk '{print $1}' > "${OUT_DIR}/${SERVICE_NAME}.sha256" + + echo "RELEASE_ID=${RELEASE_ID}" >> "${GITHUB_ENV}" + echo "TGZ_PATH=${OUT_DIR}/${SERVICE_NAME}.tgz" >> "${GITHUB_ENV}" + echo "SHA_PATH=${OUT_DIR}/${SERVICE_NAME}.sha256" >> "${GITHUB_ENV}" + + - name: Upload to COS + env: + TENCENTCLOUD_SECRET_ID: ${{ secrets.TENCENTCLOUD_SECRET_ID }} + TENCENTCLOUD_SECRET_KEY: ${{ secrets.TENCENTCLOUD_SECRET_KEY }} + TENCENTCLOUD_SESSION_TOKEN: ${{ secrets.TENCENTCLOUD_SESSION_TOKEN }} + shell: bash + run: | + set -Eeuo pipefail + python3 -m pip install --upgrade pip >/dev/null + python3 -m pip install cos-python-sdk-v5 >/dev/null + python3 - <<'PY' + import os + from qcloud_cos import CosConfig, CosS3Client + + secret_id = os.environ["TENCENTCLOUD_SECRET_ID"] + secret_key = os.environ["TENCENTCLOUD_SECRET_KEY"] + token = os.getenv("TENCENTCLOUD_SESSION_TOKEN") + region = os.environ["COS_REGION"] + bucket = os.environ["COS_BUCKET"] + prefix = os.environ["COS_PREFIX"].strip("/") + release_id = os.environ["RELEASE_ID"] + service = os.environ["SERVICE_NAME"] + tgz_path = os.environ["TGZ_PATH"] + sha_path = os.environ["SHA_PATH"] + + cfg = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme="https") + client = CosS3Client(cfg) + + def upload(local_path: str, remote_name: str) -> None: + key = f"{prefix}/{release_id}/{service}/{remote_name}" + with open(local_path, "rb") as fh: + client.put_object(Bucket=bucket, Body=fh, Key=key, EnableMD5=True) + print(f"uploaded: {key}") + + upload(tgz_path, f"{service}.tgz") + upload(sha_path, f"{service}.sha256") + PY diff --git a/config/local.yaml b/config/local.yaml index 9e2a1c9..89530e0 100644 --- a/config/local.yaml +++ b/config/local.yaml @@ -7,8 +7,26 @@ app: # /ready 会使用这些 gRPC 依赖做接流量前检查。 grpc: user: - target: "127.0.0.1:9001" + targets: + - "127.0.0.1:9001" timeout: 3s + retry: + max_attempts: 2 + backoff: 200ms + circuit_breaker: + failure_threshold: 3 + open_timeout: 10s + health_cache: + ttl: 2s pay: - target: "127.0.0.1:9002" + targets: + - "127.0.0.1:9002" timeout: 3s + retry: + max_attempts: 2 + backoff: 200ms + circuit_breaker: + failure_threshold: 3 + open_timeout: 10s + health_cache: + ttl: 2s diff --git a/config/prod.yaml b/config/prod.yaml new file mode 100644 index 0000000..1ca2fbf --- /dev/null +++ b/config/prod.yaml @@ -0,0 +1,40 @@ +app: + name: chatappgateway + env: prod + http_addr: ":8080" + shutdown_timeout: 10s + +grpc: + # Current user instances: + # - 10.0.11.17:9001 (user-1) + # - 10.0.12.6:9001 (user-2) + user: + targets: + - "10.0.11.17:9001" + - "10.0.12.6:9001" + timeout: 3s + retry: + max_attempts: 2 + backoff: 200ms + circuit_breaker: + failure_threshold: 3 + open_timeout: 10s + health_cache: + ttl: 2s + + # Current pay instances: + # - 10.0.22.13:9002 (pay-1) + # - 10.0.21.8:9002 (pay-2) + pay: + targets: + - "10.0.22.13:9002" + - "10.0.21.8:9002" + timeout: 3s + retry: + max_attempts: 2 + backoff: 200ms + circuit_breaker: + failure_threshold: 3 + open_timeout: 10s + health_cache: + ttl: 2s diff --git a/config/prod.yaml.example b/config/prod.yaml.example new file mode 100644 index 0000000..1ca2fbf --- /dev/null +++ b/config/prod.yaml.example @@ -0,0 +1,40 @@ +app: + name: chatappgateway + env: prod + http_addr: ":8080" + shutdown_timeout: 10s + +grpc: + # Current user instances: + # - 10.0.11.17:9001 (user-1) + # - 10.0.12.6:9001 (user-2) + user: + targets: + - "10.0.11.17:9001" + - "10.0.12.6:9001" + timeout: 3s + retry: + max_attempts: 2 + backoff: 200ms + circuit_breaker: + failure_threshold: 3 + open_timeout: 10s + health_cache: + ttl: 2s + + # Current pay instances: + # - 10.0.22.13:9002 (pay-1) + # - 10.0.21.8:9002 (pay-2) + pay: + targets: + - "10.0.22.13:9002" + - "10.0.21.8:9002" + timeout: 3s + retry: + max_attempts: 2 + backoff: 200ms + circuit_breaker: + failure_threshold: 3 + open_timeout: 10s + health_cache: + ttl: 2s diff --git a/deploy/systemd/chatapp-gateway.service.example b/deploy/systemd/chatapp-gateway.service.example new file mode 100644 index 0000000..1032db9 --- /dev/null +++ b/deploy/systemd/chatapp-gateway.service.example @@ -0,0 +1,19 @@ +[Unit] +Description=ChatApp Gateway Service +Wants=network-online.target +After=network-online.target + +[Service] +Type=simple +User=root +Group=root +WorkingDirectory=/opt/apps/gateway/current +ExecStart=/opt/apps/gateway/current/bin/gateway -config /opt/apps/gateway/current/config/prod.yaml +Restart=always +RestartSec=3 +KillSignal=SIGTERM +TimeoutStopSec=30 +LimitNOFILE=65535 + +[Install] +WantedBy=multi-user.target diff --git a/docs/apifox/README.md b/docs/apifox/README.md new file mode 100644 index 0000000..ce24b60 --- /dev/null +++ b/docs/apifox/README.md @@ -0,0 +1,19 @@ +# Gateway Apifox 导入说明 + +- 导入文件:`docs/apifox/gateway-openapi.json` +- 文件格式:OpenAPI 3.0.3 JSON,Apifox 可直接导入 +- 覆盖范围:当前 `internal/transport/http/server.go` 中全部 HTTP 路由 + +当前已覆盖的接口: + +- `GET /health` +- `GET /heath` +- `GET /ready` +- `POST /api/v1/users/register` +- `POST /api/v1/pay` + +说明: + +- `/heath` 是兼容历史拼写错误保留的别名,行为与 `/health` 一致。 +- `POST` 接口请求体启用了严格 JSON 校验,文档中已标记 `additionalProperties: false`。 +- 成功响应统一返回 `code`、`message`、`request_id`、`data`;错误响应统一返回 `code`、`message`、`request_id`。 diff --git a/docs/apifox/gateway-openapi.json b/docs/apifox/gateway-openapi.json new file mode 100644 index 0000000..1892a0e --- /dev/null +++ b/docs/apifox/gateway-openapi.json @@ -0,0 +1,741 @@ +{ + "openapi": "3.0.3", + "info": { + "title": "ChatApp Gateway HTTP API", + "version": "1.0.0", + "description": "Gateway 对外 HTTP 接口定义,采用 OpenAPI 3.0.3,可直接导入 Apifox。该文件覆盖当前 Gateway 暴露的全部 HTTP 路由。" + }, + "servers": [ + { + "url": "http://127.0.0.1:8080", + "description": "本地默认地址,来源于 config/local.yaml" + } + ], + "tags": [ + { + "name": "System", + "description": "健康检查与就绪检查" + }, + { + "name": "Auth", + "description": "用户注册" + }, + { + "name": "Pay", + "description": "支付下单" + } + ], + "paths": { + "/health": { + "get": { + "tags": [ + "System" + ], + "summary": "健康检查", + "operationId": "getHealth", + "parameters": [ + { + "$ref": "#/components/parameters/XRequestIdHeader" + } + ], + "responses": { + "200": { + "description": "服务存活", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ServiceStatusResponse" + }, + "example": { + "status": "ok", + "service": "chatappgateway" + } + } + } + } + } + } + }, + "/heath": { + "get": { + "tags": [ + "System" + ], + "summary": "健康检查兼容别名", + "description": "兼容历史拼写错误的 `/heath` 路由,行为与 `/health` 完全一致。", + "operationId": "getHealthAlias", + "deprecated": true, + "parameters": [ + { + "$ref": "#/components/parameters/XRequestIdHeader" + } + ], + "responses": { + "200": { + "description": "服务存活", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ServiceStatusResponse" + }, + "example": { + "status": "ok", + "service": "chatappgateway" + } + } + } + } + } + } + }, + "/ready": { + "get": { + "tags": [ + "System" + ], + "summary": "就绪检查", + "description": "当 Gateway 可接收流量且下游依赖检查通过时返回 ready。", + "operationId": "getReady", + "parameters": [ + { + "$ref": "#/components/parameters/XRequestIdHeader" + } + ], + "responses": { + "200": { + "description": "服务已就绪", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ServiceStatusResponse" + }, + "example": { + "status": "ready", + "service": "chatappgateway" + } + } + } + }, + "503": { + "description": "服务未就绪或正在摘流", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorEnvelope" + }, + "examples": { + "dependency_not_ready": { + "summary": "依赖未就绪", + "value": { + "code": "not_ready", + "message": "chatappuser not ready", + "request_id": "b31e6d8248775b9a" + } + }, + "service_shutting_down": { + "summary": "服务摘流中", + "value": { + "code": "not_ready", + "message": "service is shutting down", + "request_id": "b31e6d8248775b9a" + } + } + } + } + } + } + } + } + }, + "/api/v1/users/register": { + "post": { + "tags": [ + "Auth" + ], + "summary": "用户注册", + "description": "校验注册参数后转发到用户服务。`nickname` 为空时会自动回退为 `account`。", + "operationId": "postUserRegister", + "parameters": [ + { + "$ref": "#/components/parameters/XRequestIdHeader" + } + ], + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/RegisterRequest" + }, + "example": { + "account": "demo@example.com", + "password": "secret", + "country_code": "+86", + "verify_code": "123456", + "nickname": "Neo", + "device_id": "dev-1", + "platform": "ios", + "app_version": "1.0.0" + } + } + } + }, + "responses": { + "200": { + "description": "注册成功", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/RegisterSuccessEnvelope" + }, + "example": { + "code": "ok", + "message": "ok", + "request_id": "b31e6d8248775b9a", + "data": { + "user_id": "u-100", + "access_token": "access-token", + "is_new_user": true, + "profile": { + "user_id": "u-100", + "nickname": "Neo", + "avatar_url": "https://example.com/avatar.png" + } + } + } + } + } + }, + "400": { + "description": "请求参数错误", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorEnvelope" + }, + "examples": { + "invalid_json": { + "summary": "JSON 非法", + "value": { + "code": "bad_request", + "message": "invalid json body", + "request_id": "b31e6d8248775b9a" + } + }, + "missing_account": { + "summary": "缺少账号", + "value": { + "code": "bad_request", + "message": "account is required", + "request_id": "b31e6d8248775b9a" + } + }, + "missing_password": { + "summary": "缺少密码", + "value": { + "code": "bad_request", + "message": "password is required", + "request_id": "b31e6d8248775b9a" + } + } + } + } + } + }, + "401": { + "description": "鉴权失败", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorEnvelope" + }, + "example": { + "code": "unauthorized", + "message": "register denied", + "request_id": "b31e6d8248775b9a" + } + } + } + }, + "405": { + "description": "请求方法不允许", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorEnvelope" + }, + "example": { + "code": "method_not_allowed", + "message": "method not allowed", + "request_id": "b31e6d8248775b9a" + } + } + } + }, + "502": { + "description": "下游服务错误", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorEnvelope" + }, + "example": { + "code": "upstream_error", + "message": "user service unavailable", + "request_id": "b31e6d8248775b9a" + } + } + } + }, + "504": { + "description": "下游服务超时", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorEnvelope" + }, + "example": { + "code": "upstream_timeout", + "message": "upstream request timeout", + "request_id": "b31e6d8248775b9a" + } + } + } + } + } + } + }, + "/api/v1/pay": { + "post": { + "tags": [ + "Pay" + ], + "summary": "发起支付", + "description": "校验支付参数后转发到支付服务创建支付单。", + "operationId": "postPay", + "parameters": [ + { + "$ref": "#/components/parameters/XRequestIdHeader" + } + ], + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/PayRequest" + }, + "example": { + "order_no": "order-001", + "user_id": "u-100", + "amount": "9.99", + "currency": "USD", + "pay_method": "apple_pay", + "subject": "vip" + } + } + } + }, + "responses": { + "200": { + "description": "支付单创建成功", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/PaySuccessEnvelope" + }, + "example": { + "code": "ok", + "message": "ok", + "request_id": "b31e6d8248775b9a", + "data": { + "payment_id": "pay-001", + "order_no": "order-001", + "user_id": "u-100", + "status": "processing", + "amount": "9.99", + "currency": "USD", + "pay_method": "apple_pay", + "subject": "vip", + "created_at": "2026-04-04T12:00:00Z" + } + } + } + } + }, + "400": { + "description": "请求参数错误", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorEnvelope" + }, + "examples": { + "invalid_json": { + "summary": "JSON 非法", + "value": { + "code": "bad_request", + "message": "invalid json body", + "request_id": "b31e6d8248775b9a" + } + }, + "missing_order_no": { + "summary": "缺少订单号", + "value": { + "code": "bad_request", + "message": "order_no is required", + "request_id": "b31e6d8248775b9a" + } + }, + "missing_user_id": { + "summary": "缺少用户 ID", + "value": { + "code": "bad_request", + "message": "user_id is required", + "request_id": "b31e6d8248775b9a" + } + }, + "missing_amount": { + "summary": "缺少金额", + "value": { + "code": "bad_request", + "message": "amount is required", + "request_id": "b31e6d8248775b9a" + } + } + } + } + } + }, + "405": { + "description": "请求方法不允许", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorEnvelope" + }, + "example": { + "code": "method_not_allowed", + "message": "method not allowed", + "request_id": "b31e6d8248775b9a" + } + } + } + }, + "502": { + "description": "下游支付服务错误", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorEnvelope" + }, + "example": { + "code": "upstream_error", + "message": "pay service unavailable", + "request_id": "b31e6d8248775b9a" + } + } + } + }, + "504": { + "description": "下游支付服务超时", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorEnvelope" + }, + "example": { + "code": "upstream_timeout", + "message": "upstream request timeout", + "request_id": "b31e6d8248775b9a" + } + } + } + } + } + } + } + }, + "components": { + "parameters": { + "XRequestIdHeader": { + "name": "X-Request-Id", + "in": "header", + "required": false, + "description": "可选链路请求 ID。不传时由 Gateway 自动生成,并在响应头与响应体中返回。", + "schema": { + "type": "string" + } + } + }, + "schemas": { + "ServiceStatusResponse": { + "type": "object", + "required": [ + "status", + "service" + ], + "properties": { + "status": { + "type": "string", + "description": "服务状态", + "example": "ok" + }, + "service": { + "type": "string", + "description": "服务名称", + "example": "chatappgateway" + } + } + }, + "ErrorEnvelope": { + "type": "object", + "required": [ + "code", + "message", + "request_id" + ], + "properties": { + "code": { + "type": "string", + "description": "业务错误码", + "example": "bad_request" + }, + "message": { + "type": "string", + "description": "错误描述", + "example": "account is required" + }, + "request_id": { + "type": "string", + "description": "请求链路 ID", + "example": "b31e6d8248775b9a" + } + } + }, + "RegisterRequest": { + "type": "object", + "additionalProperties": false, + "required": [ + "account", + "password" + ], + "properties": { + "account": { + "type": "string", + "description": "注册账号", + "example": "demo@example.com" + }, + "password": { + "type": "string", + "description": "登录密码", + "example": "secret" + }, + "country_code": { + "type": "string", + "description": "国家区号", + "example": "+86" + }, + "verify_code": { + "type": "string", + "description": "验证码", + "example": "123456" + }, + "nickname": { + "type": "string", + "description": "昵称;为空时服务端回退为 account", + "example": "Neo" + }, + "device_id": { + "type": "string", + "description": "设备 ID", + "example": "dev-1" + }, + "platform": { + "type": "string", + "description": "平台标识", + "example": "ios" + }, + "app_version": { + "type": "string", + "description": "App 版本号", + "example": "1.0.0" + } + } + }, + "UserProfile": { + "type": "object", + "properties": { + "user_id": { + "type": "string", + "example": "u-100" + }, + "nickname": { + "type": "string", + "example": "Neo" + }, + "avatar_url": { + "type": "string", + "format": "uri", + "example": "https://example.com/avatar.png" + } + } + }, + "RegisterResponseData": { + "type": "object", + "properties": { + "user_id": { + "type": "string", + "example": "u-100" + }, + "access_token": { + "type": "string", + "example": "access-token" + }, + "is_new_user": { + "type": "boolean", + "example": true + }, + "profile": { + "$ref": "#/components/schemas/UserProfile" + } + } + }, + "RegisterSuccessEnvelope": { + "type": "object", + "required": [ + "code", + "message", + "request_id", + "data" + ], + "properties": { + "code": { + "type": "string", + "example": "ok" + }, + "message": { + "type": "string", + "example": "ok" + }, + "request_id": { + "type": "string", + "example": "b31e6d8248775b9a" + }, + "data": { + "$ref": "#/components/schemas/RegisterResponseData" + } + } + }, + "PayRequest": { + "type": "object", + "additionalProperties": false, + "required": [ + "order_no", + "user_id", + "amount" + ], + "properties": { + "order_no": { + "type": "string", + "description": "业务订单号", + "example": "order-001" + }, + "user_id": { + "type": "string", + "description": "用户 ID", + "example": "u-100" + }, + "amount": { + "type": "string", + "description": "金额,当前按字符串透传", + "example": "9.99" + }, + "currency": { + "type": "string", + "description": "币种", + "example": "USD" + }, + "pay_method": { + "type": "string", + "description": "支付方式", + "example": "apple_pay" + }, + "subject": { + "type": "string", + "description": "支付主题或商品描述", + "example": "vip" + } + } + }, + "PayResponseData": { + "type": "object", + "properties": { + "payment_id": { + "type": "string", + "example": "pay-001" + }, + "order_no": { + "type": "string", + "example": "order-001" + }, + "user_id": { + "type": "string", + "example": "u-100" + }, + "status": { + "type": "string", + "example": "processing" + }, + "amount": { + "type": "string", + "example": "9.99" + }, + "currency": { + "type": "string", + "example": "USD" + }, + "pay_method": { + "type": "string", + "example": "apple_pay" + }, + "subject": { + "type": "string", + "example": "vip" + }, + "created_at": { + "type": "string", + "format": "date-time", + "example": "2026-04-04T12:00:00Z" + } + } + }, + "PaySuccessEnvelope": { + "type": "object", + "required": [ + "code", + "message", + "request_id", + "data" + ], + "properties": { + "code": { + "type": "string", + "example": "ok" + }, + "message": { + "type": "string", + "example": "ok" + }, + "request_id": { + "type": "string", + "example": "b31e6d8248775b9a" + }, + "data": { + "$ref": "#/components/schemas/PayResponseData" + } + } + } + } + } +} diff --git a/internal/app/app.go b/internal/app/app.go index e3cc813..34e3116 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -8,14 +8,11 @@ import ( "chatappgateway/internal/config" "chatappgateway/internal/integration/paygrpc" + "chatappgateway/internal/integration/upstream" "chatappgateway/internal/integration/usergrpc" "chatappgateway/internal/service/auth" "chatappgateway/internal/service/pay" httpserver "chatappgateway/internal/transport/http" - commonpb "gitea.haiyihy.com/hy/chatappcommon/proto" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) // Application 组合网关运行所需的全部组件。 @@ -24,39 +21,39 @@ type Application struct { closers []io.Closer } -// New 构造完整应用,包括 gRPC 连接、业务服务和 HTTP 服务。 +// New 构造完整应用,包括 gRPC 连接池、业务服务和 HTTP 服务。 func New(ctx context.Context, cfg config.Config, logger *slog.Logger) (*Application, error) { - userConn, err := dialGRPC(ctx, cfg.GRPC.User.Target) + userPool, err := upstream.New(ctx, "ChatAppUser", cfg.GRPC.User) if err != nil { - return nil, fmt.Errorf("dial ChatAppUser: %w", err) + return nil, fmt.Errorf("build ChatAppUser pool: %w", err) } - payConn, err := dialGRPC(ctx, cfg.GRPC.Pay.Target) + payPool, err := upstream.New(ctx, "ChatAppPay", cfg.GRPC.Pay) if err != nil { - _ = userConn.Close() - return nil, fmt.Errorf("dial ChatAppPay: %w", err) + _ = userPool.Close() + return nil, fmt.Errorf("build ChatAppPay pool: %w", err) } - userClient := usergrpc.New(commonpb.NewChatAppUserServiceClient(userConn), cfg.GRPC.User.Timeout) - payClient := paygrpc.New(commonpb.NewChatAppPayServiceClient(payConn), cfg.GRPC.Pay.Timeout) + userClient := usergrpc.NewWithPool(userPool) + payClient := paygrpc.NewWithPool(payPool) authService := auth.New(userClient) payService := pay.New(payClient) readinessChecker := newReadinessGroup( namedChecker{ name: "ChatAppUser", - check: grpcConnectionReady(userConn), + check: userPool.Ready, }, namedChecker{ name: "ChatAppPay", - check: grpcConnectionReady(payConn), + check: payPool.Ready, }, ) server := httpserver.New(cfg.App.Name, cfg.App.HTTPAddr, cfg.App.ShutdownTimeout, logger, authService, payService, readinessChecker) return &Application{ server: server, - closers: []io.Closer{userConn, payConn}, + closers: []io.Closer{userPool, payPool}, }, nil } @@ -75,12 +72,3 @@ func (a *Application) Close() error { } return firstErr } - -func dialGRPC(ctx context.Context, target string) (*grpc.ClientConn, error) { - // 网关只做自身健康检查,因此这里使用懒连接,避免下游暂时不可用时阻塞启动。 - return grpc.DialContext( - ctx, - target, - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) -} diff --git a/internal/app/readiness.go b/internal/app/readiness.go index a9132c7..547bcea 100644 --- a/internal/app/readiness.go +++ b/internal/app/readiness.go @@ -3,9 +3,6 @@ package app import ( "context" "fmt" - - "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" ) // ReadinessChecker 定义统一的就绪检查能力。 @@ -36,27 +33,3 @@ func (g readinessGroup) Ready(ctx context.Context) error { } return nil } - -func grpcConnectionReady(conn *grpc.ClientConn) func(context.Context) error { - return func(ctx context.Context) error { - // 主动触发一次连接过程,避免懒连接状态下 readiness 永远不去拨号。 - conn.Connect() - - for { - state := conn.GetState() - switch state { - case connectivity.Ready: - return nil - case connectivity.Shutdown: - return fmt.Errorf("connection shutdown") - } - - if !conn.WaitForStateChange(ctx, state) { - if err := ctx.Err(); err != nil { - return err - } - return fmt.Errorf("state did not change") - } - } - } -} diff --git a/internal/config/config.go b/internal/config/config.go index ea1bdf4..26d9c97 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -32,21 +32,41 @@ type GRPCConfig struct { Pay UpstreamConfig `yaml:"pay"` } -// UpstreamConfig 描述单个下游服务的地址和超时。 +// UpstreamConfig 描述单个下游服务的节点和容错策略。 type UpstreamConfig struct { - Target string `yaml:"target"` - Timeout time.Duration `yaml:"timeout"` + // Target 保留兼容旧配置;内部会归一化到 Targets。 + Target string `yaml:"target,omitempty"` + Targets []string `yaml:"targets"` + Timeout time.Duration `yaml:"timeout"` + Retry RetryConfig `yaml:"retry"` + CircuitBreaker CircuitBreakerConfig `yaml:"circuit_breaker"` + HealthCache HealthCacheConfig `yaml:"health_cache"` +} + +// RetryConfig 控制单次请求在多个节点间的重试行为。 +type RetryConfig struct { + MaxAttempts int `yaml:"max_attempts"` + Backoff time.Duration `yaml:"backoff"` +} + +// CircuitBreakerConfig 控制连续失败后的断路时长。 +type CircuitBreakerConfig struct { + FailureThreshold int `yaml:"failure_threshold"` + OpenTimeout time.Duration `yaml:"open_timeout"` +} + +// HealthCacheConfig 控制 readiness 对下游健康状态缓存多久。 +type HealthCacheConfig struct { + TTL time.Duration `yaml:"ttl"` } // Load 从 YAML 文件加载配置,并补齐默认值和校验。 func Load(path string) (Config, error) { - // 先读取配置文件内容。 data, err := os.ReadFile(path) if err != nil { return Config{}, fmt.Errorf("read config file %s: %w", path, err) } - // 使用 KnownFields 防止配置拼写错误悄悄溜过。 cfg := defaultConfig() decoder := yaml.NewDecoder(bytes.NewReader(data)) decoder.KnownFields(true) @@ -54,6 +74,7 @@ func Load(path string) (Config, error) { return Config{}, fmt.Errorf("decode config file %s: %w", path, err) } + cfg.normalize() if err := validate(cfg); err != nil { return Config{}, err } @@ -69,24 +90,47 @@ func defaultConfig() Config { ShutdownTimeout: 10 * time.Second, }, GRPC: GRPCConfig{ - User: UpstreamConfig{ - Target: "127.0.0.1:9001", - Timeout: 3 * time.Second, - }, - Pay: UpstreamConfig{ - Target: "127.0.0.1:9002", - Timeout: 3 * time.Second, - }, + User: defaultUpstreamConfig("127.0.0.1:9001"), + Pay: defaultUpstreamConfig("127.0.0.1:9002"), }, } } +func defaultUpstreamConfig(target string) UpstreamConfig { + return UpstreamConfig{ + Targets: []string{target}, + Timeout: 3 * time.Second, + Retry: RetryConfig{ + MaxAttempts: 2, + Backoff: 200 * time.Millisecond, + }, + CircuitBreaker: CircuitBreakerConfig{ + FailureThreshold: 3, + OpenTimeout: 10 * time.Second, + }, + HealthCache: HealthCacheConfig{ + TTL: 2 * time.Second, + }, + } +} + +func (c *Config) normalize() { + c.GRPC.User = normalizeUpstreamConfig(c.GRPC.User) + c.GRPC.Pay = normalizeUpstreamConfig(c.GRPC.Pay) +} + +func normalizeUpstreamConfig(cfg UpstreamConfig) UpstreamConfig { + if len(cfg.Targets) == 0 && cfg.Target != "" { + cfg.Targets = []string{cfg.Target} + } + cfg.Target = "" + return cfg +} + func validate(cfg Config) error { - // 应用名用于日志标签,不能为空。 if cfg.App.Name == "" { return fmt.Errorf("app.name is required") } - // 监听地址必须能被 TCP 地址解析。 if _, err := net.ResolveTCPAddr("tcp", cfg.App.HTTPAddr); err != nil { return fmt.Errorf("app.http_addr is invalid: %w", err) } @@ -103,14 +147,31 @@ func validate(cfg Config) error { } func validateUpstream(name string, cfg UpstreamConfig) error { - if cfg.Target == "" { - return fmt.Errorf("%s.target is required", name) + if len(cfg.Targets) == 0 { + return fmt.Errorf("%s.targets must contain at least one target", name) } - if _, err := net.ResolveTCPAddr("tcp", cfg.Target); err != nil { - return fmt.Errorf("%s.target is invalid: %w", name, err) + for _, target := range cfg.Targets { + if _, err := net.ResolveTCPAddr("tcp", target); err != nil { + return fmt.Errorf("%s.targets contains invalid target %q: %w", name, target, err) + } } if cfg.Timeout <= 0 { return fmt.Errorf("%s.timeout must be greater than 0", name) } + if cfg.Retry.MaxAttempts <= 0 { + return fmt.Errorf("%s.retry.max_attempts must be greater than 0", name) + } + if cfg.Retry.Backoff < 0 { + return fmt.Errorf("%s.retry.backoff must be greater than or equal to 0", name) + } + if cfg.CircuitBreaker.FailureThreshold <= 0 { + return fmt.Errorf("%s.circuit_breaker.failure_threshold must be greater than 0", name) + } + if cfg.CircuitBreaker.OpenTimeout <= 0 { + return fmt.Errorf("%s.circuit_breaker.open_timeout must be greater than 0", name) + } + if cfg.HealthCache.TTL <= 0 { + return fmt.Errorf("%s.health_cache.ttl must be greater than 0", name) + } return nil } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index b8db1ab..6241c93 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -22,12 +22,44 @@ func TestLoadLocalConfig(t *testing.T) { if cfg.App.HTTPAddr != ":8080" { t.Fatalf("unexpected http addr: %s", cfg.App.HTTPAddr) } - if cfg.GRPC.User.Target != "127.0.0.1:9001" { - t.Fatalf("unexpected user target: %s", cfg.GRPC.User.Target) + if len(cfg.GRPC.User.Targets) != 1 || cfg.GRPC.User.Targets[0] != "127.0.0.1:9001" { + t.Fatalf("unexpected user targets: %#v", cfg.GRPC.User.Targets) } if cfg.GRPC.Pay.Timeout != 3*time.Second { t.Fatalf("unexpected pay timeout: %s", cfg.GRPC.Pay.Timeout) } + if cfg.GRPC.User.Retry.MaxAttempts != 2 { + t.Fatalf("unexpected retry attempts: %d", cfg.GRPC.User.Retry.MaxAttempts) + } +} + +func TestLoadCompatibilityTargetConfig(t *testing.T) { + t.Parallel() + + path := filepath.Join(t.TempDir(), "compat.yaml") + content := strings.TrimSpace(` +app: + name: chatappgateway + http_addr: ":8080" +grpc: + user: + target: "127.0.0.1:9001" + timeout: 3s + pay: + target: "127.0.0.1:9002" + timeout: 3s +`) + if err := os.WriteFile(path, []byte(content), 0o600); err != nil { + t.Fatalf("WriteFile returned error: %v", err) + } + + cfg, err := Load(path) + if err != nil { + t.Fatalf("Load returned error: %v", err) + } + if len(cfg.GRPC.User.Targets) != 1 || cfg.GRPC.User.Targets[0] != "127.0.0.1:9001" { + t.Fatalf("unexpected user targets: %#v", cfg.GRPC.User.Targets) + } } func TestLoadInvalidConfig(t *testing.T) { @@ -40,11 +72,11 @@ app: http_addr: "bad-address" grpc: user: - target: "" + targets: ["127.0.0.1:9001"] timeout: 3s pay: - target: "127.0.0.1:9002" - timeout: 0s + targets: ["127.0.0.1:9002"] + timeout: 3s `) if err := os.WriteFile(path, []byte(content), 0o600); err != nil { t.Fatalf("WriteFile returned error: %v", err) @@ -69,10 +101,10 @@ app: http_addr: ":8080" grpc: user: - target: "127.0.0.1:9001" + targets: ["127.0.0.1:9001"] timeout: 3s pay: - target: "invalid" + targets: ["invalid"] timeout: 3s `) if err := os.WriteFile(path, []byte(content), 0o600); err != nil { @@ -83,7 +115,7 @@ grpc: if err == nil { t.Fatal("Load returned nil error") } - if !strings.Contains(err.Error(), "grpc.pay.target is invalid") { + if !strings.Contains(err.Error(), `grpc.pay.targets contains invalid target "invalid"`) { t.Fatalf("unexpected error: %v", err) } } diff --git a/internal/integration/paygrpc/client.go b/internal/integration/paygrpc/client.go index 09afad4..9826da5 100644 --- a/internal/integration/paygrpc/client.go +++ b/internal/integration/paygrpc/client.go @@ -4,16 +4,18 @@ import ( "context" "time" + "chatappgateway/internal/integration/upstream" commonpb "gitea.haiyihy.com/hy/chatappcommon/proto" ) -// Client 封装支付服务 gRPC client,并统一超时控制。 +// Client 封装支付服务 gRPC client,并统一超时、重试和节点选择。 type Client struct { timeout time.Duration client commonpb.ChatAppPayServiceClient + pool *upstream.Pool } -// New 根据底层 gRPC client 构造支付服务调用器。 +// New 根据单个底层 gRPC client 构造支付服务调用器。 func New(client commonpb.ChatAppPayServiceClient, timeout time.Duration) *Client { return &Client{ timeout: timeout, @@ -21,10 +23,20 @@ func New(client commonpb.ChatAppPayServiceClient, timeout time.Duration) *Client } } +// NewWithPool 根据下游节点池构造支付服务调用器。 +func NewWithPool(pool *upstream.Pool) *Client { + return &Client{pool: pool} +} + // Pay 调用支付服务最小支付接口。 func (c *Client) Pay(ctx context.Context, request *commonpb.PayRequest) (*commonpb.PayResponse, error) { - callCtx, cancel := context.WithTimeout(ctx, c.timeout) - defer cancel() + if c.pool == nil { + callCtx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return c.client.Pay(callCtx, request) + } - return c.client.Pay(callCtx, request) + return upstream.Call(ctx, c.pool, func(callCtx context.Context, handle upstream.Handle) (*commonpb.PayResponse, error) { + return commonpb.NewChatAppPayServiceClient(handle.Conn).Pay(callCtx, request) + }) } diff --git a/internal/integration/upstream/pool.go b/internal/integration/upstream/pool.go new file mode 100644 index 0000000..b443fd6 --- /dev/null +++ b/internal/integration/upstream/pool.go @@ -0,0 +1,392 @@ +package upstream + +import ( + "context" + "errors" + "fmt" + "io" + "strings" + "sync" + "sync/atomic" + "time" + + "chatappgateway/internal/config" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" +) + +// Handle 暴露给调用方的单个下游节点句柄。 +type Handle struct { + Target string + Conn *grpc.ClientConn +} + +// Pool 管理一组下游节点的连接、健康状态和容错策略。 +type Pool struct { + name string + timeout time.Duration + maxAttempts int + retryBackoff time.Duration + failureThreshold int + openTimeout time.Duration + healthCacheTTL time.Duration + counter atomic.Uint64 + endpoints []*endpoint +} + +type endpoint struct { + handle Handle + + mu sync.Mutex + consecutiveFailures int + openUntil time.Time + lastCheckedAt time.Time + lastHealthy bool + lastErr string +} + +type snapshot struct { + handle Handle + fresh bool + healthy bool + open bool + errString string +} + +// New 为一组下游节点建立连接池。 +func New(ctx context.Context, name string, cfg config.UpstreamConfig) (*Pool, error) { + pool := &Pool{ + name: name, + timeout: cfg.Timeout, + maxAttempts: cfg.Retry.MaxAttempts, + retryBackoff: cfg.Retry.Backoff, + failureThreshold: cfg.CircuitBreaker.FailureThreshold, + openTimeout: cfg.CircuitBreaker.OpenTimeout, + healthCacheTTL: cfg.HealthCache.TTL, + endpoints: make([]*endpoint, 0, len(cfg.Targets)), + } + + for _, target := range cfg.Targets { + conn, err := grpc.DialContext( + ctx, + target, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + _ = pool.Close() + return nil, fmt.Errorf("dial %s %s: %w", name, target, err) + } + pool.endpoints = append(pool.endpoints, &endpoint{ + handle: Handle{ + Target: target, + Conn: conn, + }, + }) + } + + return pool, nil +} + +// Close 关闭全部 gRPC 连接。 +func (p *Pool) Close() error { + var firstErr error + for _, endpoint := range p.endpoints { + if endpoint.handle.Conn == nil { + continue + } + if err := endpoint.handle.Conn.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + return firstErr +} + +// Ready 使用缓存和连接状态判断当前是否至少有一个健康节点可用。 +func (p *Pool) Ready(ctx context.Context) error { + now := time.Now() + if cached, err := p.cachedReady(now); cached { + return err + } + + ordered := p.candidateOrder(now) + if len(ordered) == 0 { + return p.unavailableError(now) + } + + var errs []string + for _, endpoint := range ordered { + if err := p.refreshEndpoint(ctx, endpoint); err == nil { + return nil + } else { + errs = append(errs, fmt.Sprintf("%s: %v", endpoint.handle.Target, err)) + } + } + + return errors.New(strings.Join(errs, "; ")) +} + +// Call 选择健康节点执行调用,并在可重试错误上做节点级重试。 +func Call[T any](ctx context.Context, pool *Pool, invoke func(context.Context, Handle) (T, error)) (T, error) { + var zero T + var lastErr error + tried := make(map[string]struct{}) + + for attempt := 1; attempt <= pool.maxAttempts; attempt++ { + endpoint := pool.pickEndpoint(time.Now(), tried) + if endpoint == nil { + if lastErr != nil { + return zero, lastErr + } + return zero, pool.unavailableError(time.Now()) + } + + tried[endpoint.handle.Target] = struct{}{} + + callCtx, cancel := context.WithTimeout(ctx, pool.timeout) + result, err := invoke(callCtx, endpoint.handle) + cancel() + + if err == nil { + endpoint.recordSuccess(time.Now()) + return result, nil + } + + lastErr = err + if shouldRetry(err, ctx) { + endpoint.recordFailure(time.Now(), err, pool.failureThreshold, pool.openTimeout) + if attempt < pool.maxAttempts { + if err := sleepWithContext(ctx, pool.retryBackoff); err != nil { + return zero, err + } + continue + } + return zero, err + } + + // 非重试型错误说明目标节点虽然返回失败,但链路是可达的,不应拉开熔断。 + endpoint.recordSuccess(time.Now()) + return zero, err + } + + if lastErr != nil { + return zero, lastErr + } + return zero, pool.unavailableError(time.Now()) +} + +func (p *Pool) cachedReady(now time.Time) (bool, error) { + allFresh := true + var errs []string + + for _, endpoint := range p.endpoints { + snap := endpoint.snapshot(now, p.healthCacheTTL) + if snap.fresh && snap.healthy && !snap.open { + return true, nil + } + if !snap.fresh { + allFresh = false + continue + } + if snap.errString != "" { + errs = append(errs, fmt.Sprintf("%s: %s", snap.handle.Target, snap.errString)) + } else { + errs = append(errs, fmt.Sprintf("%s: not ready", snap.handle.Target)) + } + } + + if allFresh { + return true, errors.New(strings.Join(errs, "; ")) + } + return false, nil +} + +func (p *Pool) refreshEndpoint(ctx context.Context, endpoint *endpoint) error { + checkCtx, cancel := context.WithTimeout(ctx, p.readyCheckTimeout()) + defer cancel() + + if err := connectionReady(checkCtx, endpoint.handle.Conn); err != nil { + endpoint.recordFailure(time.Now(), err, p.failureThreshold, p.openTimeout) + return err + } + + endpoint.recordSuccess(time.Now()) + return nil +} + +func (p *Pool) readyCheckTimeout() time.Duration { + if p.timeout <= 2*time.Second { + return p.timeout + } + return 2 * time.Second +} + +func (p *Pool) pickEndpoint(now time.Time, tried map[string]struct{}) *endpoint { + ordered := p.candidateOrder(now) + for _, endpoint := range ordered { + if _, ok := tried[endpoint.handle.Target]; !ok { + return endpoint + } + } + if len(ordered) == 0 { + return nil + } + return ordered[0] +} + +func (p *Pool) candidateOrder(now time.Time) []*endpoint { + var healthy []*endpoint + var unknown []*endpoint + var unhealthy []*endpoint + + for _, endpoint := range p.endpoints { + snap := endpoint.snapshot(now, p.healthCacheTTL) + if snap.open { + continue + } + switch { + case snap.fresh && snap.healthy: + healthy = append(healthy, endpoint) + case !snap.fresh: + unknown = append(unknown, endpoint) + default: + unhealthy = append(unhealthy, endpoint) + } + } + + seed := int(p.counter.Add(1) - 1) + ordered := make([]*endpoint, 0, len(healthy)+len(unknown)+len(unhealthy)) + ordered = append(ordered, rotate(healthy, seed)...) + ordered = append(ordered, rotate(unknown, seed)...) + ordered = append(ordered, rotate(unhealthy, seed)...) + return ordered +} + +func rotate(items []*endpoint, seed int) []*endpoint { + if len(items) == 0 { + return nil + } + offset := seed % len(items) + return append(items[offset:], items[:offset]...) +} + +func (p *Pool) unavailableError(now time.Time) error { + var parts []string + for _, endpoint := range p.endpoints { + snap := endpoint.snapshot(now, p.healthCacheTTL) + if snap.open { + parts = append(parts, fmt.Sprintf("%s: circuit open", snap.handle.Target)) + continue + } + if snap.errString != "" { + parts = append(parts, fmt.Sprintf("%s: %s", snap.handle.Target, snap.errString)) + continue + } + parts = append(parts, fmt.Sprintf("%s: unavailable", snap.handle.Target)) + } + return fmt.Errorf("%s has no available upstreams: %s", p.name, strings.Join(parts, "; ")) +} + +func (e *endpoint) snapshot(now time.Time, ttl time.Duration) snapshot { + e.mu.Lock() + defer e.mu.Unlock() + + fresh := !e.lastCheckedAt.IsZero() && now.Sub(e.lastCheckedAt) <= ttl + open := !e.openUntil.IsZero() && now.Before(e.openUntil) + return snapshot{ + handle: e.handle, + fresh: fresh, + healthy: e.lastHealthy, + open: open, + errString: e.lastErr, + } +} + +func (e *endpoint) recordSuccess(now time.Time) { + e.mu.Lock() + defer e.mu.Unlock() + + e.consecutiveFailures = 0 + e.openUntil = time.Time{} + e.lastCheckedAt = now + e.lastHealthy = true + e.lastErr = "" +} + +func (e *endpoint) recordFailure(now time.Time, err error, failureThreshold int, openTimeout time.Duration) { + e.mu.Lock() + defer e.mu.Unlock() + + e.consecutiveFailures++ + e.lastCheckedAt = now + e.lastHealthy = false + e.lastErr = err.Error() + if e.consecutiveFailures >= failureThreshold { + e.openUntil = now.Add(openTimeout) + } +} + +func connectionReady(ctx context.Context, conn *grpc.ClientConn) error { + conn.Connect() + + for { + state := conn.GetState() + switch state { + case connectivity.Ready: + return nil + case connectivity.Shutdown: + return fmt.Errorf("connection shutdown") + } + + if !conn.WaitForStateChange(ctx, state) { + if err := ctx.Err(); err != nil { + return err + } + return fmt.Errorf("state did not change") + } + } +} + +func shouldRetry(err error, parent context.Context) bool { + if err == nil { + return false + } + if parent.Err() != nil { + return false + } + if errors.Is(err, context.DeadlineExceeded) { + return true + } + + st, ok := status.FromError(err) + if !ok { + return true + } + + switch st.Code() { + case codes.DeadlineExceeded, codes.Unavailable, codes.ResourceExhausted, codes.Aborted, codes.Internal: + return true + default: + return false + } +} + +func sleepWithContext(ctx context.Context, d time.Duration) error { + if d <= 0 { + return nil + } + timer := time.NewTimer(d) + defer timer.Stop() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + +var _ io.Closer = (*Pool)(nil) diff --git a/internal/integration/upstream/pool_test.go b/internal/integration/upstream/pool_test.go new file mode 100644 index 0000000..69ae291 --- /dev/null +++ b/internal/integration/upstream/pool_test.go @@ -0,0 +1,110 @@ +package upstream + +import ( + "context" + "strings" + "testing" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestCallRetriesToSecondEndpoint(t *testing.T) { + t.Parallel() + + pool := &Pool{ + name: "user", + timeout: time.Second, + maxAttempts: 2, + retryBackoff: 0, + failureThreshold: 3, + openTimeout: 5 * time.Second, + healthCacheTTL: 2 * time.Second, + endpoints: []*endpoint{ + {handle: Handle{Target: "10.0.11.17:9001"}}, + {handle: Handle{Target: "10.0.12.6:9001"}}, + }, + } + + var attempts []string + result, err := Call(context.Background(), pool, func(_ context.Context, handle Handle) (string, error) { + attempts = append(attempts, handle.Target) + if handle.Target == "10.0.11.17:9001" { + return "", status.Error(codes.Unavailable, "first endpoint down") + } + return "ok", nil + }) + if err != nil { + t.Fatalf("Call returned error: %v", err) + } + if result != "ok" { + t.Fatalf("unexpected result: %s", result) + } + if len(attempts) != 2 { + t.Fatalf("unexpected attempts: %#v", attempts) + } + if attempts[0] != "10.0.11.17:9001" || attempts[1] != "10.0.12.6:9001" { + t.Fatalf("unexpected attempts order: %#v", attempts) + } +} + +func TestCallOpensCircuitAfterRepeatedFailures(t *testing.T) { + t.Parallel() + + pool := &Pool{ + name: "pay", + timeout: time.Second, + maxAttempts: 1, + retryBackoff: 0, + failureThreshold: 2, + openTimeout: time.Minute, + healthCacheTTL: 2 * time.Second, + endpoints: []*endpoint{ + {handle: Handle{Target: "10.0.22.13:9002"}}, + }, + } + + invocations := 0 + fail := func(_ context.Context, _ Handle) (string, error) { + invocations++ + return "", status.Error(codes.Unavailable, "down") + } + + for i := 0; i < 2; i++ { + _, err := Call(context.Background(), pool, fail) + if err == nil { + t.Fatal("Call returned nil error") + } + } + + _, err := Call(context.Background(), pool, fail) + if err == nil { + t.Fatal("expected circuit-open error") + } + if !strings.Contains(err.Error(), "circuit open") { + t.Fatalf("unexpected error: %v", err) + } + if invocations != 2 { + t.Fatalf("unexpected invocation count: %d", invocations) + } +} + +func TestReadyUsesCachedHealthyState(t *testing.T) { + t.Parallel() + + pool := &Pool{ + name: "user", + timeout: time.Second, + maxAttempts: 2, + healthCacheTTL: 5 * time.Second, + endpoints: []*endpoint{ + {handle: Handle{Target: "10.0.11.17:9001"}}, + }, + } + + pool.endpoints[0].recordSuccess(time.Now()) + if err := pool.Ready(context.Background()); err != nil { + t.Fatalf("Ready returned error: %v", err) + } +} diff --git a/internal/integration/usergrpc/client.go b/internal/integration/usergrpc/client.go index 361b8dc..9df91b1 100644 --- a/internal/integration/usergrpc/client.go +++ b/internal/integration/usergrpc/client.go @@ -4,16 +4,18 @@ import ( "context" "time" + "chatappgateway/internal/integration/upstream" commonpb "gitea.haiyihy.com/hy/chatappcommon/proto" ) -// Client 封装用户服务 gRPC client,并统一超时控制。 +// Client 封装用户服务 gRPC client,并统一超时、重试和节点选择。 type Client struct { timeout time.Duration client commonpb.ChatAppUserServiceClient + pool *upstream.Pool } -// New 根据底层 gRPC client 构造用户服务调用器。 +// New 根据单个底层 gRPC client 构造用户服务调用器。 func New(client commonpb.ChatAppUserServiceClient, timeout time.Duration) *Client { return &Client{ timeout: timeout, @@ -21,10 +23,20 @@ func New(client commonpb.ChatAppUserServiceClient, timeout time.Duration) *Clien } } +// NewWithPool 根据下游节点池构造用户服务调用器。 +func NewWithPool(pool *upstream.Pool) *Client { + return &Client{pool: pool} +} + // Register 调用用户服务注册接口。 func (c *Client) Register(ctx context.Context, request *commonpb.RegisterRequest) (*commonpb.RegisterResponse, error) { - callCtx, cancel := context.WithTimeout(ctx, c.timeout) - defer cancel() + if c.pool == nil { + callCtx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return c.client.Register(callCtx, request) + } - return c.client.Register(callCtx, request) + return upstream.Call(ctx, c.pool, func(callCtx context.Context, handle upstream.Handle) (*commonpb.RegisterResponse, error) { + return commonpb.NewChatAppUserServiceClient(handle.Conn).Register(callCtx, request) + }) }