chatapp-deploy-platform/ops-scripts/tencent_operator.py
2026-04-07 17:01:26 +08:00

369 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import base64
import json
import shlex
import sys
import time
import urllib.request
from pathlib import Path
from typing import Any
import yaml
from qcloud_cos import CosConfig, CosS3Client
from tencentcloud.clb.v20180317 import clb_client, models as clb_models
from tencentcloud.common import credential
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.tat.v20201028 import tat_client, models as tat_models
TERMINAL_TASK_STATUSES = {"SUCCESS", "FAILED", "TIMEOUT", "CANCELLED"}
def log(message: str) -> None:
ts = time.strftime("%Y-%m-%d %H:%M:%S %z")
print(f"[{ts}] {message}", flush=True)
def load_yaml(path: Path) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as fh:
data = yaml.safe_load(fh)
if not isinstance(data, dict):
raise RuntimeError(f"invalid yaml root in {path}")
return data
def decode_output(encoded: str | None) -> str:
if not encoded:
return ""
try:
return base64.b64decode(encoded).decode("utf-8", errors="replace")
except Exception:
return "<failed to decode output>"
def sdk_body(resp: Any) -> dict[str, Any]:
body = json.loads(resp.to_json_string())
if not isinstance(body, dict):
raise RuntimeError(f"unexpected sdk response type: {type(body).__name__}")
return body
class CloudOperator:
def __init__(self, config: dict[str, Any], service_name: str, instance_id: str, release_id: str | None) -> None:
self.config = config
self.service_name = service_name
self.instance_id = instance_id
self.release_id = release_id or ""
try:
self.service_cfg = dict(config["services"][service_name])
except KeyError as exc:
raise RuntimeError(f"unknown service: {service_name}") from exc
self.instance_cfg = next(
(item for item in self.service_cfg["instances"] if str(item["instance_id"]) == instance_id),
None,
)
if not self.instance_cfg:
raise RuntimeError(f"instance {instance_id} not found under service {service_name}")
cloud_cfg = dict(config.get("tencentcloud", {}))
secret_id = str(cloud_cfg.get("secret_id", "")).strip()
secret_key = str(cloud_cfg.get("secret_key", "")).strip()
session_token = str(cloud_cfg.get("session_token", "")).strip()
if not secret_id or not secret_key:
raise RuntimeError("missing tencentcloud.secret_id or tencentcloud.secret_key in config/prod.yaml")
region = str(config["region"])
self.bucket = str(config["cos"]["bucket"])
self.credential = credential.Credential(secret_id, secret_key, session_token or None)
self.tat = tat_client.TatClient(self.credential, str(config["tat"]["region"]))
self.clb = clb_client.ClbClient(self.credential, region)
self.cos = CosS3Client(
CosConfig(
Region=str(config["cos"]["region"]),
SecretId=secret_id,
SecretKey=secret_key,
Token=session_token or None,
Scheme="https",
Timeout=30,
EnableInternalDomain=False,
AutoSwitchDomainOnRetry=True,
)
)
def deploy(self) -> None:
self.ensure_agent_online()
package_url, sha256 = self.resolve_release_package()
if self.service_name == "gateway":
self.deploy_gateway(package_url, sha256)
return
self.run_remote_deploy(package_url, sha256)
def rollback(self) -> None:
if not self.release_id:
raise RuntimeError("rollback requires --release-id")
self.deploy()
def restart(self) -> None:
self.ensure_agent_online()
unit_name = str(self.service_cfg["unit_name"])
health_url = str(self.service_cfg["health_url"])
command = " && ".join(
[
f"/usr/bin/systemctl restart {shlex.quote(unit_name)}",
f"/usr/bin/curl -fsS --max-time 3 {shlex.quote(health_url)} >/dev/null",
]
)
invocation_id = self.run_tat_command(command, f"restart-{self.service_name}")
task = self.wait_for_tat(invocation_id)
self.ensure_task_success(task, f"restart {self.service_name}")
log(f"restart completed for {self.service_name}@{self.instance_id}")
def deploy_gateway(self, package_url: str, sha256: str) -> None:
clb_cfg = dict(self.service_cfg["clb"])
target = next(
(item for item in clb_cfg["targets"] if str(item["instance_id"]) == self.instance_id),
None,
)
if not target:
raise RuntimeError(f"missing CLB target config for gateway instance {self.instance_id}")
original_weight = int(target.get("weight", clb_cfg.get("original_weight", 10)))
drain_seconds = int(self.service_cfg.get("rollout", {}).get("drain_seconds", 30))
self.set_gateway_weight(clb_cfg, target, 0)
try:
if drain_seconds > 0:
log(f"draining gateway {self.instance_id} for {drain_seconds}s")
time.sleep(drain_seconds)
self.run_remote_deploy(package_url, sha256)
except Exception:
log(f"gateway deploy failed on {self.instance_id}, restoring CLB weight")
self.set_gateway_weight(clb_cfg, target, original_weight)
raise
self.set_gateway_weight(clb_cfg, target, original_weight)
log(f"gateway deploy completed for {self.instance_id}")
def run_remote_deploy(self, package_url: str, sha256: str) -> None:
unit_name = str(self.service_cfg["unit_name"])
health_url = str(self.service_cfg["health_url"])
deploy_root = str(self.service_cfg["deploy_root"])
remote_script = str(self.config["tat"]["script_path"])
command = " ".join(
shlex.quote(part)
for part in [
remote_script,
"--service",
self.service_name,
"--release-id",
self.release_id,
"--package-url",
package_url,
"--sha256",
sha256,
"--health-url",
health_url,
"--unit-name",
unit_name,
"--deploy-root",
deploy_root,
]
)
invocation_id = self.run_tat_command(command, f"deploy-{self.service_name}-{self.release_id}")
task = self.wait_for_tat(invocation_id)
self.ensure_task_success(task, f"deploy {self.service_name}")
log(f"deploy completed for {self.service_name}@{self.instance_id} -> {self.release_id}")
def run_tat_command(self, command: str, command_name: str) -> str:
payload: dict[str, Any] = {
"CommandName": command_name,
"Description": command_name,
"SaveCommand": False,
"Content": base64.b64encode(command.encode("utf-8")).decode("ascii"),
"CommandType": str(self.config["tat"].get("command_type", "SHELL")),
"WorkingDirectory": str(self.config["tat"]["working_directory"]),
"Timeout": int(self.config["tat"]["timeout_seconds"]),
"InstanceIds": [self.instance_id],
"Username": str(self.config["tat"].get("execution_user", "root")),
}
output_prefix = str(self.config["cos"].get("output_prefix", "")).strip("/")
if output_prefix:
payload["OutputCOSBucketUrl"] = self.cos_bucket_url()
payload["OutputCOSKeyPrefix"] = f"{output_prefix}/{self.release_id or 'adhoc'}/{self.service_name}/{self.instance_id}"
req = tat_models.RunCommandRequest()
req.from_json_string(json.dumps(payload))
resp = self.tat.RunCommand(req)
body = sdk_body(resp)
return str(body["InvocationId"])
def wait_for_tat(self, invocation_id: str) -> dict[str, Any]:
deadline = time.time() + int(self.config["tat"]["timeout_seconds"]) + 300
while time.time() < deadline:
req = tat_models.DescribeInvocationTasksRequest()
req.from_json_string(
json.dumps(
{
"HideOutput": False,
"Limit": 100,
"Filters": [{"Name": "invocation-id", "Values": [invocation_id]}],
}
)
)
resp = self.tat.DescribeInvocationTasks(req)
body = sdk_body(resp)
tasks = body.get("InvocationTaskSet", [])
task = next((item for item in tasks if str(item["InstanceId"]) == self.instance_id), None)
if not task:
time.sleep(2)
continue
status = str(task.get("TaskStatus"))
if status in TERMINAL_TASK_STATUSES:
return task
time.sleep(2)
raise RuntimeError(f"TAT invocation timed out: {invocation_id}")
def ensure_task_success(self, task: dict[str, Any], action_name: str) -> None:
status = str(task.get("TaskStatus"))
result = task.get("TaskResult", {})
exit_code = int(result.get("ExitCode", -1))
if status == "SUCCESS" and exit_code == 0:
return
output = decode_output(result.get("Output"))
raise RuntimeError(f"{action_name} failed: status={status} exit_code={exit_code}\n{output[-2000:]}")
def ensure_agent_online(self) -> None:
req = tat_models.DescribeAutomationAgentStatusRequest()
req.from_json_string(json.dumps({"InstanceIds": [self.instance_id], "Limit": 1}))
resp = self.tat.DescribeAutomationAgentStatus(req)
body = sdk_body(resp)
online = any(item.get("AgentStatus") == "Online" for item in body.get("AutomationAgentSet", []))
if not online:
raise RuntimeError(f"TAT agent is not online for {self.instance_id}")
def resolve_release_package(self) -> tuple[str, str]:
if not self.release_id:
raise RuntimeError("deploy requires --release-id")
package_key = self.package_key()
sha_key = self.sha_key()
sha256 = self.fetch_text_via_presigned_url(sha_key).strip().split()[0]
package_url = self.presign_download_url(package_key)
return package_url, sha256
def set_gateway_weight(self, clb_cfg: dict[str, Any], target: dict[str, Any], weight: int) -> None:
payload: dict[str, Any] = {
"LoadBalancerId": str(clb_cfg["load_balancer_id"]),
"ListenerId": str(clb_cfg["listener_id"]),
"Targets": [
{
"InstanceId": str(target["instance_id"]),
"Port": int(target.get("port", clb_cfg["backend_port"])),
}
],
"Weight": int(weight),
}
location_id = str(clb_cfg.get("location_id", "")).strip()
domain = str(clb_cfg.get("domain", "")).strip()
url = str(clb_cfg.get("url", "")).strip()
if location_id:
payload["LocationId"] = location_id
elif domain and url:
payload["Domain"] = domain
payload["Url"] = url
req = clb_models.ModifyTargetWeightRequest()
req.from_json_string(json.dumps(payload))
resp = self.clb.ModifyTargetWeight(req)
request_id = sdk_body(resp)["RequestId"]
self.wait_for_clb_task(request_id)
def wait_for_clb_task(self, request_id: str) -> None:
deadline = time.time() + 300
while time.time() < deadline:
req = clb_models.DescribeTaskStatusRequest()
req.from_json_string(json.dumps({"TaskId": request_id}))
resp = self.clb.DescribeTaskStatus(req)
body = sdk_body(resp)
status = int(body["Status"])
if status == 0:
return
if status == 1:
raise RuntimeError(f"CLB async task failed: {request_id}")
time.sleep(2)
raise RuntimeError(f"CLB async task timed out: {request_id}")
def package_key(self) -> str:
package_name = str(self.service_cfg["package_name"])
releases_prefix = str(self.config["cos"]["releases_prefix"]).strip("/")
return f"{releases_prefix}/{self.release_id}/{self.service_name}/{package_name}"
def sha_key(self) -> str:
package_name = str(self.service_cfg["package_name"])
sha_name = package_name[:-4] + ".sha256" if package_name.endswith(".tgz") else f"{package_name}.sha256"
releases_prefix = str(self.config["cos"]["releases_prefix"]).strip("/")
return f"{releases_prefix}/{self.release_id}/{self.service_name}/{sha_name}"
def presign_download_url(self, key: str) -> str:
return self.cos.get_presigned_url(
Method="GET",
Bucket=self.bucket,
Key=key,
Expired=10800,
)
def fetch_text_via_presigned_url(self, key: str) -> str:
url = self.presign_download_url(key)
with urllib.request.urlopen(url, timeout=30) as resp:
return resp.read().decode("utf-8").strip()
def cos_bucket_url(self) -> str:
region = str(self.config["cos"]["region"])
return f"https://{self.bucket}.cos.{region}.myqcloud.com"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Execute Tencent Cloud operations for app-deploy-platform.")
parser.add_argument("--config", required=True, help="Path to config/prod.yaml")
parser.add_argument("--action", required=True, choices=["deploy", "restart", "rollback"])
parser.add_argument("--service", required=True, help="Service name")
parser.add_argument("--instance-id", required=True, help="Tencent Cloud CVM instance ID")
parser.add_argument("--release-id", default="", help="Release ID for deploy or rollback")
return parser.parse_args()
def main() -> int:
args = parse_args()
config = load_yaml(Path(args.config).resolve())
operator = CloudOperator(config, args.service, args.instance_id, args.release_id)
try:
if args.action == "deploy":
operator.deploy()
elif args.action == "restart":
operator.restart()
else:
operator.rollback()
except TencentCloudSDKException as exc:
log(f"tencent cloud sdk error: {exc}")
return 1
except Exception as exc:
log(str(exc))
return 1
return 0
if __name__ == "__main__":
sys.exit(main())