#!/usr/bin/env python3 from __future__ import annotations import argparse import base64 import json import shlex import sys import time import urllib.request from pathlib import Path from typing import Any import yaml from qcloud_cos import CosConfig, CosS3Client from tencentcloud.clb.v20180317 import clb_client, models as clb_models from tencentcloud.common import credential from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException from tencentcloud.tat.v20201028 import tat_client, models as tat_models TERMINAL_TASK_STATUSES = {"SUCCESS", "FAILED", "TIMEOUT", "CANCELLED"} def log(message: str) -> None: ts = time.strftime("%Y-%m-%d %H:%M:%S %z") print(f"[{ts}] {message}", flush=True) def load_yaml(path: Path) -> dict[str, Any]: with path.open("r", encoding="utf-8") as fh: data = yaml.safe_load(fh) if not isinstance(data, dict): raise RuntimeError(f"invalid yaml root in {path}") return data def decode_output(encoded: str | None) -> str: if not encoded: return "" try: return base64.b64decode(encoded).decode("utf-8", errors="replace") except Exception: return "" def sdk_body(resp: Any) -> dict[str, Any]: body = json.loads(resp.to_json_string()) if not isinstance(body, dict): raise RuntimeError(f"unexpected sdk response type: {type(body).__name__}") return body def should_retry_without_output_cos(exc: TencentCloudSDKException) -> bool: code = str(getattr(exc, "code", "") or "") message = str(getattr(exc, "message", "") or "") return code == "ResourceNotFound.RoleNotFound" and "TAT_QCSLinkedRoleInUploadInvocation" in message class CloudOperator: def __init__(self, config: dict[str, Any], service_name: str, instance_id: str, release_id: str | None) -> None: self.config = config self.service_name = service_name self.instance_id = instance_id self.release_id = release_id or "" try: self.service_cfg = dict(config["services"][service_name]) except KeyError as exc: raise RuntimeError(f"unknown service: {service_name}") from exc self.instance_cfg = next( (item for item in self.service_cfg["instances"] if str(item["instance_id"]) == instance_id), None, ) if not self.instance_cfg: raise RuntimeError(f"instance {instance_id} not found under service {service_name}") cloud_cfg = dict(config.get("tencentcloud", {})) secret_id = str(cloud_cfg.get("secret_id", "")).strip() secret_key = str(cloud_cfg.get("secret_key", "")).strip() session_token = str(cloud_cfg.get("session_token", "")).strip() if not secret_id or not secret_key: raise RuntimeError("missing tencentcloud.secret_id or tencentcloud.secret_key in config/prod.yaml") region = str(config["region"]) self.bucket = str(config["cos"]["bucket"]) self.credential = credential.Credential(secret_id, secret_key, session_token or None) self.tat = tat_client.TatClient(self.credential, str(config["tat"]["region"])) self.clb = clb_client.ClbClient(self.credential, region) self.cos = CosS3Client( CosConfig( Region=str(config["cos"]["region"]), SecretId=secret_id, SecretKey=secret_key, Token=session_token or None, Scheme="https", Timeout=30, EnableInternalDomain=False, AutoSwitchDomainOnRetry=True, ) ) def deploy(self) -> None: self.ensure_agent_online() package_url, sha256 = self.resolve_release_package() if self.service_name == "gateway": self.deploy_gateway(package_url, sha256) return self.run_remote_deploy(package_url, sha256) def rollback(self) -> None: if not self.release_id: raise RuntimeError("rollback requires --release-id") self.deploy() def restart(self) -> None: self.ensure_agent_online() unit_name = str(self.service_cfg["unit_name"]) health_url = str(self.service_cfg["health_url"]) command = " && ".join( [ f"/usr/bin/systemctl restart {shlex.quote(unit_name)}", f"/usr/bin/curl -fsS --max-time 3 {shlex.quote(health_url)} >/dev/null", ] ) invocation_id = self.run_tat_command(command, f"restart-{self.service_name}") task = self.wait_for_tat(invocation_id) self.ensure_task_success(task, f"restart {self.service_name}") log(f"restart completed for {self.service_name}@{self.instance_id}") def deploy_gateway(self, package_url: str, sha256: str) -> None: clb_cfg = dict(self.service_cfg["clb"]) target = next( (item for item in clb_cfg["targets"] if str(item["instance_id"]) == self.instance_id), None, ) if not target: raise RuntimeError(f"missing CLB target config for gateway instance {self.instance_id}") original_weight = int(target.get("weight", clb_cfg.get("original_weight", 10))) drain_seconds = int(self.service_cfg.get("rollout", {}).get("drain_seconds", 30)) self.set_gateway_weight(clb_cfg, target, 0) try: if drain_seconds > 0: log(f"draining gateway {self.instance_id} for {drain_seconds}s") time.sleep(drain_seconds) self.run_remote_deploy(package_url, sha256) except Exception: log(f"gateway deploy failed on {self.instance_id}, restoring CLB weight") self.set_gateway_weight(clb_cfg, target, original_weight) raise self.set_gateway_weight(clb_cfg, target, original_weight) log(f"gateway deploy completed for {self.instance_id}") def run_remote_deploy(self, package_url: str, sha256: str) -> None: unit_name = str(self.service_cfg["unit_name"]) health_url = str(self.service_cfg["health_url"]) deploy_root = str(self.service_cfg["deploy_root"]) remote_script = str(self.config["tat"]["script_path"]) command = " ".join( shlex.quote(part) for part in [ remote_script, "--service", self.service_name, "--release-id", self.release_id, "--package-url", package_url, "--sha256", sha256, "--health-url", health_url, "--unit-name", unit_name, "--deploy-root", deploy_root, ] ) invocation_id = self.run_tat_command(command, f"deploy-{self.service_name}-{self.release_id}") task = self.wait_for_tat(invocation_id) self.ensure_task_success(task, f"deploy {self.service_name}") log(f"deploy completed for {self.service_name}@{self.instance_id} -> {self.release_id}") def run_tat_command(self, command: str, command_name: str) -> str: payload: dict[str, Any] = { "CommandName": command_name, "Description": command_name, "SaveCommand": False, "Content": base64.b64encode(command.encode("utf-8")).decode("ascii"), "CommandType": str(self.config["tat"].get("command_type", "SHELL")), "WorkingDirectory": str(self.config["tat"]["working_directory"]), "Timeout": int(self.config["tat"]["timeout_seconds"]), "InstanceIds": [self.instance_id], "Username": str(self.config["tat"].get("execution_user", "root")), } output_prefix = str(self.config["cos"].get("output_prefix", "")).strip("/") if output_prefix: payload["OutputCOSBucketUrl"] = self.cos_bucket_url() payload["OutputCOSKeyPrefix"] = f"{output_prefix}/{self.release_id or 'adhoc'}/{self.service_name}/{self.instance_id}" return self._run_tat_command(payload, allow_retry_without_output_cos=bool(output_prefix)) def _run_tat_command(self, payload: dict[str, Any], allow_retry_without_output_cos: bool) -> str: req = tat_models.RunCommandRequest() req.from_json_string(json.dumps(payload)) try: resp = self.tat.RunCommand(req) except TencentCloudSDKException as exc: if allow_retry_without_output_cos and should_retry_without_output_cos(exc): log( "TAT missing linked role TAT_QCSLinkedRoleInUploadInvocation, " "retrying RunCommand without OutputCOSBucketUrl" ) retry_payload = dict(payload) retry_payload.pop("OutputCOSBucketUrl", None) retry_payload.pop("OutputCOSKeyPrefix", None) return self._run_tat_command(retry_payload, allow_retry_without_output_cos=False) raise body = sdk_body(resp) return str(body["InvocationId"]) def wait_for_tat(self, invocation_id: str) -> dict[str, Any]: deadline = time.time() + int(self.config["tat"]["timeout_seconds"]) + 300 while time.time() < deadline: req = tat_models.DescribeInvocationTasksRequest() req.from_json_string( json.dumps( { "HideOutput": False, "Limit": 100, "Filters": [{"Name": "invocation-id", "Values": [invocation_id]}], } ) ) resp = self.tat.DescribeInvocationTasks(req) body = sdk_body(resp) tasks = body.get("InvocationTaskSet", []) task = next((item for item in tasks if str(item["InstanceId"]) == self.instance_id), None) if not task: time.sleep(2) continue status = str(task.get("TaskStatus")) if status in TERMINAL_TASK_STATUSES: return task time.sleep(2) raise RuntimeError(f"TAT invocation timed out: {invocation_id}") def ensure_task_success(self, task: dict[str, Any], action_name: str) -> None: status = str(task.get("TaskStatus")) result = task.get("TaskResult", {}) exit_code = int(result.get("ExitCode", -1)) if status == "SUCCESS" and exit_code == 0: return output = decode_output(result.get("Output")) raise RuntimeError(f"{action_name} failed: status={status} exit_code={exit_code}\n{output[-2000:]}") def ensure_agent_online(self) -> None: req = tat_models.DescribeAutomationAgentStatusRequest() req.from_json_string(json.dumps({"InstanceIds": [self.instance_id], "Limit": 1})) resp = self.tat.DescribeAutomationAgentStatus(req) body = sdk_body(resp) online = any(item.get("AgentStatus") == "Online" for item in body.get("AutomationAgentSet", [])) if not online: raise RuntimeError(f"TAT agent is not online for {self.instance_id}") def resolve_release_package(self) -> tuple[str, str]: if not self.release_id: raise RuntimeError("deploy requires --release-id") package_key = self.package_key() sha_key = self.sha_key() sha256 = self.fetch_text_via_presigned_url(sha_key).strip().split()[0] package_url = self.presign_download_url(package_key) return package_url, sha256 def set_gateway_weight(self, clb_cfg: dict[str, Any], target: dict[str, Any], weight: int) -> None: payload: dict[str, Any] = { "LoadBalancerId": str(clb_cfg["load_balancer_id"]), "ListenerId": str(clb_cfg["listener_id"]), "Targets": [ { "InstanceId": str(target["instance_id"]), "Port": int(target.get("port", clb_cfg["backend_port"])), } ], "Weight": int(weight), } location_id = str(clb_cfg.get("location_id", "")).strip() domain = str(clb_cfg.get("domain", "")).strip() url = str(clb_cfg.get("url", "")).strip() if location_id: payload["LocationId"] = location_id elif domain and url: payload["Domain"] = domain payload["Url"] = url req = clb_models.ModifyTargetWeightRequest() req.from_json_string(json.dumps(payload)) resp = self.clb.ModifyTargetWeight(req) request_id = sdk_body(resp)["RequestId"] self.wait_for_clb_task(request_id) def wait_for_clb_task(self, request_id: str) -> None: deadline = time.time() + 300 while time.time() < deadline: req = clb_models.DescribeTaskStatusRequest() req.from_json_string(json.dumps({"TaskId": request_id})) resp = self.clb.DescribeTaskStatus(req) body = sdk_body(resp) status = int(body["Status"]) if status == 0: return if status == 1: raise RuntimeError(f"CLB async task failed: {request_id}") time.sleep(2) raise RuntimeError(f"CLB async task timed out: {request_id}") def package_key(self) -> str: package_name = str(self.service_cfg["package_name"]) releases_prefix = str(self.config["cos"]["releases_prefix"]).strip("/") return f"{releases_prefix}/{self.release_id}/{self.service_name}/{package_name}" def sha_key(self) -> str: package_name = str(self.service_cfg["package_name"]) sha_name = package_name[:-4] + ".sha256" if package_name.endswith(".tgz") else f"{package_name}.sha256" releases_prefix = str(self.config["cos"]["releases_prefix"]).strip("/") return f"{releases_prefix}/{self.release_id}/{self.service_name}/{sha_name}" def presign_download_url(self, key: str) -> str: return self.cos.get_presigned_url( Method="GET", Bucket=self.bucket, Key=key, Expired=10800, ) def fetch_text_via_presigned_url(self, key: str) -> str: url = self.presign_download_url(key) with urllib.request.urlopen(url, timeout=30) as resp: return resp.read().decode("utf-8").strip() def cos_bucket_url(self) -> str: region = str(self.config["cos"]["region"]) return f"https://{self.bucket}.cos.{region}.myqcloud.com" def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Execute Tencent Cloud operations for app-deploy-platform.") parser.add_argument("--config", required=True, help="Path to config/prod.yaml") parser.add_argument("--action", required=True, choices=["deploy", "restart", "rollback"]) parser.add_argument("--service", required=True, help="Service name") parser.add_argument("--instance-id", required=True, help="Tencent Cloud CVM instance ID") parser.add_argument("--release-id", default="", help="Release ID for deploy or rollback") return parser.parse_args() def main() -> int: args = parse_args() config = load_yaml(Path(args.config).resolve()) operator = CloudOperator(config, args.service, args.instance_id, args.release_id) try: if args.action == "deploy": operator.deploy() elif args.action == "restart": operator.restart() else: operator.rollback() except TencentCloudSDKException as exc: log(f"tencent cloud sdk error: {exc}") return 1 except Exception as exc: log(str(exc)) return 1 return 0 if __name__ == "__main__": sys.exit(main())