#!/usr/bin/env python3 from __future__ import annotations import argparse import base64 import json import shlex import sys import time import textwrap import urllib.request from pathlib import Path from typing import Any import yaml from qcloud_cos import CosConfig, CosS3Client from tencentcloud.clb.v20180317 import clb_client, models as clb_models from tencentcloud.common import credential from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException from tencentcloud.tat.v20201028 import tat_client, models as tat_models TERMINAL_TASK_STATUSES = {"SUCCESS", "FAILED", "TIMEOUT", "CANCELLED"} def log(message: str) -> None: ts = time.strftime("%Y-%m-%d %H:%M:%S %z") print(f"[{ts}] {message}", flush=True) def load_yaml(path: Path) -> dict[str, Any]: with path.open("r", encoding="utf-8") as fh: data = yaml.safe_load(fh) if not isinstance(data, dict): raise RuntimeError(f"invalid yaml root in {path}") return data def decode_output(encoded: str | None) -> str: if not encoded: return "" try: return base64.b64decode(encoded).decode("utf-8", errors="replace") except Exception: return "" def sdk_body(resp: Any) -> dict[str, Any]: body = json.loads(resp.to_json_string()) if not isinstance(body, dict): raise RuntimeError(f"unexpected sdk response type: {type(body).__name__}") return body def should_retry_without_output_cos(exc: TencentCloudSDKException) -> bool: code = str(getattr(exc, "code", "") or "") message = str(getattr(exc, "message", "") or "") return code == "ResourceNotFound.RoleNotFound" and "TAT_QCSLinkedRoleInUploadInvocation" in message class CloudOperator: def __init__(self, config: dict[str, Any], service_name: str, instance_id: str, release_id: str | None) -> None: self.config = config self.service_name = service_name self.instance_id = instance_id self.release_id = release_id or "" try: self.service_cfg = dict(config["services"][service_name]) except KeyError as exc: raise RuntimeError(f"unknown service: {service_name}") from exc self.instance_cfg = next( (item for item in self.service_cfg["instances"] if str(item["instance_id"]) == instance_id), None, ) if not self.instance_cfg: raise RuntimeError(f"instance {instance_id} not found under service {service_name}") cloud_cfg = dict(config.get("tencentcloud", {})) secret_id = str(cloud_cfg.get("secret_id", "")).strip() secret_key = str(cloud_cfg.get("secret_key", "")).strip() session_token = str(cloud_cfg.get("session_token", "")).strip() if not secret_id or not secret_key: raise RuntimeError("missing tencentcloud.secret_id or tencentcloud.secret_key in config/prod.yaml") region = str(config["region"]) self.bucket = str(config["cos"]["bucket"]) self.credential = credential.Credential(secret_id, secret_key, session_token or None) self.tat = tat_client.TatClient(self.credential, str(config["tat"]["region"])) self.clb = clb_client.ClbClient(self.credential, region) self.cos = CosS3Client( CosConfig( Region=str(config["cos"]["region"]), SecretId=secret_id, SecretKey=secret_key, Token=session_token or None, Scheme="https", Timeout=30, EnableInternalDomain=False, AutoSwitchDomainOnRetry=True, ) ) def deploy(self) -> None: self.ensure_agent_online() self.ensure_systemd_unit_installed() package_url, sha256 = self.resolve_release_package() if self.service_name == "gateway": self.deploy_gateway(package_url, sha256) return self.run_remote_deploy(package_url, sha256) def rollback(self) -> None: if not self.release_id: raise RuntimeError("rollback requires --release-id") self.deploy() def restart(self) -> None: self.ensure_agent_online() self.ensure_systemd_unit_installed() unit_name = str(self.service_cfg["unit_name"]) health_url = str(self.service_cfg["health_url"]) command = " && ".join( [ f"/usr/bin/systemctl restart {shlex.quote(unit_name)}", f"/usr/bin/curl -fsS --max-time 3 {shlex.quote(health_url)} >/dev/null", ] ) invocation_id = self.run_tat_command(command, f"restart-{self.service_name}") task = self.wait_for_tat(invocation_id) self.ensure_task_success(task, f"restart {self.service_name}") log(f"restart completed for {self.service_name}@{self.instance_id}") def deploy_gateway(self, package_url: str, sha256: str) -> None: clb_cfg = dict(self.service_cfg["clb"]) target = next( (item for item in clb_cfg["targets"] if str(item["instance_id"]) == self.instance_id), None, ) if not target: raise RuntimeError(f"missing CLB target config for gateway instance {self.instance_id}") original_weight = int(target.get("weight", clb_cfg.get("original_weight", 10))) drain_seconds = int(self.service_cfg.get("rollout", {}).get("drain_seconds", 30)) self.set_gateway_weight(clb_cfg, target, 0) try: if drain_seconds > 0: log(f"draining gateway {self.instance_id} for {drain_seconds}s") time.sleep(drain_seconds) self.run_remote_deploy(package_url, sha256) except Exception: log(f"gateway deploy failed on {self.instance_id}, restoring CLB weight") self.set_gateway_weight(clb_cfg, target, original_weight) raise self.set_gateway_weight(clb_cfg, target, original_weight) log(f"gateway deploy completed for {self.instance_id}") def run_remote_deploy(self, package_url: str, sha256: str) -> None: unit_name = str(self.service_cfg["unit_name"]) health_url = str(self.service_cfg["health_url"]) deploy_root = str(self.service_cfg["deploy_root"]) remote_script = str(self.config["tat"]["script_path"]) command = " ".join( shlex.quote(part) for part in [ remote_script, "--service", self.service_name, "--release-id", self.release_id, "--package-url", package_url, "--sha256", sha256, "--health-url", health_url, "--unit-name", unit_name, "--deploy-root", deploy_root, ] ) invocation_id = self.run_tat_command(command, f"deploy-{self.service_name}-{self.release_id}") task = self.wait_for_tat(invocation_id) self.ensure_task_success(task, f"deploy {self.service_name}") log(f"deploy completed for {self.service_name}@{self.instance_id} -> {self.release_id}") def ensure_systemd_unit_installed(self) -> None: unit_name = str(self.service_cfg["unit_name"]) unit_path = f"/etc/systemd/system/{unit_name}" unit_content = self.systemd_unit_content().rstrip() + "\n" command = textwrap.dedent( f"""\ set -Eeuo pipefail unit_path={shlex.quote(unit_path)} tmp_file="$(mktemp)" trap 'rm -f "$tmp_file"' EXIT cat >"$tmp_file" <<'UNIT_EOF' {unit_content}UNIT_EOF if [[ ! -f "$unit_path" ]] || ! cmp -s "$tmp_file" "$unit_path"; then install -D -m 0644 "$tmp_file" "$unit_path" /usr/bin/systemctl daemon-reload fi /usr/bin/systemctl enable {shlex.quote(unit_name)} >/dev/null """ ) invocation_id = self.run_tat_command(command, f"ensure-unit-{self.service_name}") task = self.wait_for_tat(invocation_id) self.ensure_task_success(task, f"ensure systemd unit {unit_name}") def systemd_unit_content(self) -> str: deploy_root = str(self.service_cfg["deploy_root"]).rstrip("/") binary_path = f"{deploy_root}/current/bin/{self.service_name}" config_path = f"{deploy_root}/current/config/prod.yaml" description = f"ChatApp {self.service_name.capitalize()} Service" working_directory = f"{deploy_root}/current" return textwrap.dedent( f"""\ [Unit] Description={description} Wants=network-online.target After=network-online.target [Service] Type=simple User=root Group=root WorkingDirectory={working_directory} ExecStart={binary_path} -config {config_path} Restart=always RestartSec=3 KillSignal=SIGTERM TimeoutStopSec=30 LimitNOFILE=65535 [Install] WantedBy=multi-user.target """ ) def run_tat_command(self, command: str, command_name: str) -> str: payload: dict[str, Any] = { "CommandName": command_name, "Description": command_name, "SaveCommand": False, "Content": base64.b64encode(command.encode("utf-8")).decode("ascii"), "CommandType": str(self.config["tat"].get("command_type", "SHELL")), "WorkingDirectory": str(self.config["tat"]["working_directory"]), "Timeout": int(self.config["tat"]["timeout_seconds"]), "InstanceIds": [self.instance_id], "Username": str(self.config["tat"].get("execution_user", "root")), } output_prefix = str(self.config["cos"].get("output_prefix", "")).strip("/") if output_prefix: payload["OutputCOSBucketUrl"] = self.cos_bucket_url() payload["OutputCOSKeyPrefix"] = f"{output_prefix}/{self.release_id or 'adhoc'}/{self.service_name}/{self.instance_id}" return self._run_tat_command(payload, allow_retry_without_output_cos=bool(output_prefix)) def _run_tat_command(self, payload: dict[str, Any], allow_retry_without_output_cos: bool) -> str: req = tat_models.RunCommandRequest() req.from_json_string(json.dumps(payload)) try: resp = self.tat.RunCommand(req) except TencentCloudSDKException as exc: if allow_retry_without_output_cos and should_retry_without_output_cos(exc): log( "TAT missing linked role TAT_QCSLinkedRoleInUploadInvocation, " "retrying RunCommand without OutputCOSBucketUrl" ) retry_payload = dict(payload) retry_payload.pop("OutputCOSBucketUrl", None) retry_payload.pop("OutputCOSKeyPrefix", None) return self._run_tat_command(retry_payload, allow_retry_without_output_cos=False) raise body = sdk_body(resp) return str(body["InvocationId"]) def wait_for_tat(self, invocation_id: str) -> dict[str, Any]: deadline = time.time() + int(self.config["tat"]["timeout_seconds"]) + 300 while time.time() < deadline: req = tat_models.DescribeInvocationTasksRequest() req.from_json_string( json.dumps( { "HideOutput": False, "Limit": 100, "Filters": [{"Name": "invocation-id", "Values": [invocation_id]}], } ) ) resp = self.tat.DescribeInvocationTasks(req) body = sdk_body(resp) tasks = body.get("InvocationTaskSet", []) task = next((item for item in tasks if str(item["InstanceId"]) == self.instance_id), None) if not task: time.sleep(2) continue status = str(task.get("TaskStatus")) if status in TERMINAL_TASK_STATUSES: return task time.sleep(2) raise RuntimeError(f"TAT invocation timed out: {invocation_id}") def ensure_task_success(self, task: dict[str, Any], action_name: str) -> None: status = str(task.get("TaskStatus")) result = task.get("TaskResult", {}) exit_code = int(result.get("ExitCode", -1)) if status == "SUCCESS" and exit_code == 0: return output = decode_output(result.get("Output")) raise RuntimeError(f"{action_name} failed: status={status} exit_code={exit_code}\n{output[-2000:]}") def ensure_agent_online(self) -> None: req = tat_models.DescribeAutomationAgentStatusRequest() req.from_json_string(json.dumps({"InstanceIds": [self.instance_id], "Limit": 1})) resp = self.tat.DescribeAutomationAgentStatus(req) body = sdk_body(resp) online = any(item.get("AgentStatus") == "Online" for item in body.get("AutomationAgentSet", [])) if not online: raise RuntimeError(f"TAT agent is not online for {self.instance_id}") def resolve_release_package(self) -> tuple[str, str]: if not self.release_id: raise RuntimeError("deploy requires --release-id") package_key = self.package_key() sha_key = self.sha_key() sha256 = self.fetch_text_via_presigned_url(sha_key).strip().split()[0] package_url = self.presign_download_url(package_key) return package_url, sha256 def set_gateway_weight(self, clb_cfg: dict[str, Any], target: dict[str, Any], weight: int) -> None: payload: dict[str, Any] = { "LoadBalancerId": str(clb_cfg["load_balancer_id"]), "ListenerId": str(clb_cfg["listener_id"]), "Targets": [ { "InstanceId": str(target["instance_id"]), "Port": int(target.get("port", clb_cfg["backend_port"])), } ], "Weight": int(weight), } location_id = str(clb_cfg.get("location_id", "")).strip() domain = str(clb_cfg.get("domain", "")).strip() url = str(clb_cfg.get("url", "")).strip() if location_id: payload["LocationId"] = location_id elif domain and url: payload["Domain"] = domain payload["Url"] = url req = clb_models.ModifyTargetWeightRequest() req.from_json_string(json.dumps(payload)) resp = self.clb.ModifyTargetWeight(req) request_id = sdk_body(resp)["RequestId"] self.wait_for_clb_task(request_id) def wait_for_clb_task(self, request_id: str) -> None: deadline = time.time() + 300 while time.time() < deadline: req = clb_models.DescribeTaskStatusRequest() req.from_json_string(json.dumps({"TaskId": request_id})) resp = self.clb.DescribeTaskStatus(req) body = sdk_body(resp) status = int(body["Status"]) if status == 0: return if status == 1: raise RuntimeError(f"CLB async task failed: {request_id}") time.sleep(2) raise RuntimeError(f"CLB async task timed out: {request_id}") def package_key(self) -> str: package_name = str(self.service_cfg["package_name"]) releases_prefix = str(self.config["cos"]["releases_prefix"]).strip("/") return f"{releases_prefix}/{self.release_id}/{self.service_name}/{package_name}" def sha_key(self) -> str: package_name = str(self.service_cfg["package_name"]) sha_name = package_name[:-4] + ".sha256" if package_name.endswith(".tgz") else f"{package_name}.sha256" releases_prefix = str(self.config["cos"]["releases_prefix"]).strip("/") return f"{releases_prefix}/{self.release_id}/{self.service_name}/{sha_name}" def presign_download_url(self, key: str) -> str: return self.cos.get_presigned_url( Method="GET", Bucket=self.bucket, Key=key, Expired=10800, ) def fetch_text_via_presigned_url(self, key: str) -> str: url = self.presign_download_url(key) with urllib.request.urlopen(url, timeout=30) as resp: return resp.read().decode("utf-8").strip() def cos_bucket_url(self) -> str: region = str(self.config["cos"]["region"]) return f"https://{self.bucket}.cos.{region}.myqcloud.com" def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Execute Tencent Cloud operations for app-deploy-platform.") parser.add_argument("--config", required=True, help="Path to config/prod.yaml") parser.add_argument("--action", required=True, choices=["deploy", "restart", "rollback"]) parser.add_argument("--service", required=True, help="Service name") parser.add_argument("--instance-id", required=True, help="Tencent Cloud CVM instance ID") parser.add_argument("--release-id", default="", help="Release ID for deploy or rollback") return parser.parse_args() def main() -> int: args = parse_args() config = load_yaml(Path(args.config).resolve()) operator = CloudOperator(config, args.service, args.instance_id, args.release_id) try: if args.action == "deploy": operator.deploy() elif args.action == "restart": operator.restart() else: operator.rollback() except TencentCloudSDKException as exc: log(f"tencent cloud sdk error: {exc}") return 1 except Exception as exc: log(str(exc)) return 1 return 0 if __name__ == "__main__": sys.exit(main())