2026-04-07 19:32:58 +08:00

527 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import base64
import json
import shlex
import sys
import time
import textwrap
import urllib.request
from pathlib import Path
from typing import Any
import yaml
from qcloud_cos import CosConfig, CosS3Client
from tencentcloud.clb.v20180317 import clb_client, models as clb_models
from tencentcloud.common import credential
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.tat.v20201028 import tat_client, models as tat_models
TERMINAL_TASK_STATUSES = {"SUCCESS", "FAILED", "TIMEOUT", "CANCELLED"}
def log(message: str) -> None:
ts = time.strftime("%Y-%m-%d %H:%M:%S %z")
print(f"[{ts}] {message}", flush=True)
def load_yaml(path: Path) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as fh:
data = yaml.safe_load(fh)
if not isinstance(data, dict):
raise RuntimeError(f"invalid yaml root in {path}")
return data
def decode_output(encoded: str | None) -> str:
if not encoded:
return ""
try:
return base64.b64decode(encoded).decode("utf-8", errors="replace")
except Exception:
return "<failed to decode output>"
def sdk_body(resp: Any) -> dict[str, Any]:
body = json.loads(resp.to_json_string())
if not isinstance(body, dict):
raise RuntimeError(f"unexpected sdk response type: {type(body).__name__}")
return body
def should_retry_without_output_cos(exc: TencentCloudSDKException) -> bool:
code = str(getattr(exc, "code", "") or "")
message = str(getattr(exc, "message", "") or "")
return code == "ResourceNotFound.RoleNotFound" and "TAT_QCSLinkedRoleInUploadInvocation" in message
def local_remote_deploy_script() -> str:
template_path = Path(__file__).with_name("remote_deploy.sh")
try:
content = template_path.read_text(encoding="utf-8")
except FileNotFoundError as exc:
raise RuntimeError(f"missing remote deploy template: {template_path}") from exc
if not content.strip():
raise RuntimeError(f"remote deploy template is empty: {template_path}")
return content.rstrip() + "\n"
class CloudOperator:
def __init__(self, config: dict[str, Any], service_name: str, instance_id: str, release_id: str | None) -> None:
self.config = config
self.service_name = service_name
self.instance_id = instance_id
self.release_id = release_id or ""
try:
self.service_cfg = dict(config["services"][service_name])
except KeyError as exc:
raise RuntimeError(f"unknown service: {service_name}") from exc
self.instance_cfg = next(
(item for item in self.service_cfg["instances"] if str(item["instance_id"]) == instance_id),
None,
)
if not self.instance_cfg:
raise RuntimeError(f"instance {instance_id} not found under service {service_name}")
cloud_cfg = dict(config.get("tencentcloud", {}))
secret_id = str(cloud_cfg.get("secret_id", "")).strip()
secret_key = str(cloud_cfg.get("secret_key", "")).strip()
session_token = str(cloud_cfg.get("session_token", "")).strip()
if not secret_id or not secret_key:
raise RuntimeError("missing tencentcloud.secret_id or tencentcloud.secret_key in config/prod.yaml")
region = str(config["region"])
self.bucket = str(config["cos"]["bucket"])
self.credential = credential.Credential(secret_id, secret_key, session_token or None)
self.tat = tat_client.TatClient(self.credential, str(config["tat"]["region"]))
self.clb = clb_client.ClbClient(self.credential, region)
self.cos = CosS3Client(
CosConfig(
Region=str(config["cos"]["region"]),
SecretId=secret_id,
SecretKey=secret_key,
Token=session_token or None,
Scheme="https",
Timeout=30,
EnableInternalDomain=False,
AutoSwitchDomainOnRetry=True,
)
)
def deploy(self) -> None:
self.ensure_agent_online()
self.ensure_remote_deploy_script_installed()
self.ensure_systemd_unit_installed()
package_url, sha256 = self.resolve_release_package()
if self.service_name == "gateway":
self.deploy_gateway(package_url, sha256)
return
self.run_remote_deploy(package_url, sha256)
def rollback(self) -> None:
if not self.release_id:
raise RuntimeError("rollback requires --release-id")
self.deploy()
def restart(self) -> None:
self.ensure_agent_online()
self.ensure_systemd_unit_installed()
unit_name = str(self.service_cfg["unit_name"])
health_url = str(self.service_cfg["health_url"])
command = " && ".join(
[
f"/usr/bin/systemctl restart {shlex.quote(unit_name)}",
f"/usr/bin/curl -fsS --max-time 3 {shlex.quote(health_url)} >/dev/null",
]
)
invocation_id = self.run_tat_command(command, f"restart-{self.service_name}")
task = self.wait_for_tat(invocation_id)
self.ensure_task_success(task, f"restart {self.service_name}")
log(f"restart completed for {self.service_name}@{self.instance_id}")
def deploy_gateway(self, package_url: str, sha256: str) -> None:
clb_cfg = dict(self.service_cfg["clb"])
target = next(
(item for item in clb_cfg["targets"] if str(item["instance_id"]) == self.instance_id),
None,
)
if not target:
raise RuntimeError(f"missing CLB target config for gateway instance {self.instance_id}")
original_weight = int(target.get("weight", clb_cfg.get("original_weight", 10)))
drain_seconds = int(self.service_cfg.get("rollout", {}).get("drain_seconds", 30))
self.set_gateway_weight(clb_cfg, target, 0)
try:
if drain_seconds > 0:
log(f"draining gateway {self.instance_id} for {drain_seconds}s")
time.sleep(drain_seconds)
self.run_remote_deploy(package_url, sha256)
except Exception:
log(f"gateway deploy failed on {self.instance_id}, restoring CLB weight")
self.set_gateway_weight(clb_cfg, target, original_weight)
raise
self.set_gateway_weight(clb_cfg, target, original_weight)
log(f"gateway deploy completed for {self.instance_id}")
def run_remote_deploy(self, package_url: str, sha256: str) -> None:
unit_name = str(self.service_cfg["unit_name"])
health_url = str(self.service_cfg["health_url"])
deploy_root = str(self.service_cfg["deploy_root"])
remote_script = str(self.config["tat"]["script_path"])
command = " ".join(
shlex.quote(part)
for part in [
remote_script,
"--service",
self.service_name,
"--release-id",
self.release_id,
"--package-url",
package_url,
"--sha256",
sha256,
"--health-url",
health_url,
"--unit-name",
unit_name,
"--deploy-root",
deploy_root,
]
)
invocation_id = self.run_tat_command(command, f"deploy-{self.service_name}-{self.release_id}")
task = self.wait_for_tat(invocation_id)
self.ensure_task_success(task, f"deploy {self.service_name}")
log(f"deploy completed for {self.service_name}@{self.instance_id} -> {self.release_id}")
def ensure_systemd_unit_installed(self) -> None:
unit_name = str(self.service_cfg["unit_name"])
unit_path = f"/etc/systemd/system/{unit_name}"
unit_content = self.systemd_unit_content().rstrip() + "\n"
command = textwrap.dedent(
f"""\
set -Eeuo pipefail
unit_path={shlex.quote(unit_path)}
tmp_file="$(mktemp)"
trap 'rm -f "$tmp_file"' EXIT
cat >"$tmp_file" <<'UNIT_EOF'
{unit_content}UNIT_EOF
if [[ ! -f "$unit_path" ]] || ! cmp -s "$tmp_file" "$unit_path"; then
install -D -m 0644 "$tmp_file" "$unit_path"
/usr/bin/systemctl daemon-reload
fi
/usr/bin/systemctl enable {shlex.quote(unit_name)} >/dev/null
"""
)
invocation_id = self.run_tat_command(command, f"ensure-unit-{self.service_name}")
task = self.wait_for_tat(invocation_id)
self.ensure_task_success(task, f"ensure systemd unit {unit_name}")
def ensure_remote_deploy_script_installed(self) -> None:
script_path = str(self.config["tat"]["script_path"])
script_dir = str(Path(script_path).parent)
script_content_b64 = base64.b64encode(local_remote_deploy_script().encode("utf-8")).decode("ascii")
command = textwrap.dedent(
f"""\
set -Eeuo pipefail
script_path={shlex.quote(script_path)}
script_dir={shlex.quote(script_dir)}
tmp_file="$(mktemp)"
trap 'rm -f "$tmp_file"' EXIT
mkdir -p "$script_dir"
TMP_FILE="$tmp_file" python3 - <<'PY'
import base64
import os
from pathlib import Path
Path(os.environ["TMP_FILE"]).write_bytes(base64.b64decode("{script_content_b64}"))
PY
if [[ ! -f "$script_path" ]] || ! cmp -s "$tmp_file" "$script_path"; then
install -m 0755 "$tmp_file" "$script_path"
fi
"""
)
invocation_id = self.run_tat_command(command, f"ensure-deploy-script-{self.service_name}")
task = self.wait_for_tat(invocation_id)
self.ensure_task_success(task, f"ensure deploy script {script_path}")
def systemd_unit_content(self) -> str:
deploy_root = str(self.service_cfg["deploy_root"]).rstrip("/")
binary_path = f"{deploy_root}/current/bin/{self.service_name}"
config_path = f"{deploy_root}/current/config/prod.yaml"
description = f"ChatApp {self.service_name.capitalize()} Service"
working_directory = f"{deploy_root}/current"
return textwrap.dedent(
f"""\
[Unit]
Description={description}
Wants=network-online.target
After=network-online.target
[Service]
Type=simple
User=root
Group=root
WorkingDirectory={working_directory}
ExecStart={binary_path} -config {config_path}
Restart=always
RestartSec=3
KillSignal=SIGTERM
TimeoutStopSec=30
LimitNOFILE=65535
[Install]
WantedBy=multi-user.target
"""
)
def run_tat_command(self, command: str, command_name: str) -> str:
payload: dict[str, Any] = {
"CommandName": command_name,
"Description": command_name,
"SaveCommand": False,
"Content": base64.b64encode(command.encode("utf-8")).decode("ascii"),
"CommandType": str(self.config["tat"].get("command_type", "SHELL")),
"WorkingDirectory": str(self.config["tat"]["working_directory"]),
"Timeout": int(self.config["tat"]["timeout_seconds"]),
"InstanceIds": [self.instance_id],
"Username": str(self.config["tat"].get("execution_user", "root")),
}
output_prefix = str(self.config["cos"].get("output_prefix", "")).strip("/")
if output_prefix:
payload["OutputCOSBucketUrl"] = self.cos_bucket_url()
payload["OutputCOSKeyPrefix"] = f"{output_prefix}/{self.release_id or 'adhoc'}/{self.service_name}/{self.instance_id}"
return self._run_tat_command(payload, allow_retry_without_output_cos=bool(output_prefix))
def _run_tat_command(self, payload: dict[str, Any], allow_retry_without_output_cos: bool) -> str:
req = tat_models.RunCommandRequest()
req.from_json_string(json.dumps(payload))
try:
resp = self.tat.RunCommand(req)
except TencentCloudSDKException as exc:
if allow_retry_without_output_cos and should_retry_without_output_cos(exc):
log(
"TAT missing linked role TAT_QCSLinkedRoleInUploadInvocation, "
"retrying RunCommand without OutputCOSBucketUrl"
)
retry_payload = dict(payload)
retry_payload.pop("OutputCOSBucketUrl", None)
retry_payload.pop("OutputCOSKeyPrefix", None)
return self._run_tat_command(retry_payload, allow_retry_without_output_cos=False)
raise
body = sdk_body(resp)
return str(body["InvocationId"])
def wait_for_tat(self, invocation_id: str) -> dict[str, Any]:
deadline = time.time() + int(self.config["tat"]["timeout_seconds"]) + 300
while time.time() < deadline:
req = tat_models.DescribeInvocationTasksRequest()
req.from_json_string(
json.dumps(
{
"HideOutput": False,
"Limit": 100,
"Filters": [{"Name": "invocation-id", "Values": [invocation_id]}],
}
)
)
resp = self.tat.DescribeInvocationTasks(req)
body = sdk_body(resp)
tasks = body.get("InvocationTaskSet", [])
task = next((item for item in tasks if str(item["InstanceId"]) == self.instance_id), None)
if not task:
time.sleep(2)
continue
status = str(task.get("TaskStatus"))
if status in TERMINAL_TASK_STATUSES:
return task
time.sleep(2)
raise RuntimeError(f"TAT invocation timed out: {invocation_id}")
def ensure_task_success(self, task: dict[str, Any], action_name: str) -> None:
status = str(task.get("TaskStatus"))
result = task.get("TaskResult", {})
exit_code = int(result.get("ExitCode", -1))
if status == "SUCCESS" and exit_code == 0:
return
output = decode_output(result.get("Output"))
raise RuntimeError(f"{action_name} failed: status={status} exit_code={exit_code}\n{output[-2000:]}")
def ensure_agent_online(self) -> None:
req = tat_models.DescribeAutomationAgentStatusRequest()
req.from_json_string(json.dumps({"InstanceIds": [self.instance_id], "Limit": 1}))
resp = self.tat.DescribeAutomationAgentStatus(req)
body = sdk_body(resp)
online = any(item.get("AgentStatus") == "Online" for item in body.get("AutomationAgentSet", []))
if not online:
raise RuntimeError(f"TAT agent is not online for {self.instance_id}")
def resolve_release_package(self) -> tuple[str, str]:
if not self.release_id:
raise RuntimeError("deploy requires --release-id")
package_key = self.package_key()
sha_key = self.sha_key()
sha256 = self.fetch_text_via_presigned_url(sha_key).strip().split()[0]
package_url = self.presign_download_url(package_key)
return package_url, sha256
def set_gateway_weight(self, clb_cfg: dict[str, Any], target: dict[str, Any], weight: int) -> None:
payload: dict[str, Any] = {
"LoadBalancerId": str(clb_cfg["load_balancer_id"]),
"ListenerId": str(clb_cfg["listener_id"]),
"Targets": [
{
"InstanceId": str(target["instance_id"]),
"Port": int(target.get("port", clb_cfg["backend_port"])),
}
],
"Weight": int(weight),
}
location_id, domain, url = self.resolve_gateway_rule(clb_cfg)
if location_id:
payload["LocationId"] = location_id
elif domain and url:
payload["Domain"] = domain
payload["Url"] = url
req = clb_models.ModifyTargetWeightRequest()
req.from_json_string(json.dumps(payload))
resp = self.clb.ModifyTargetWeight(req)
request_id = sdk_body(resp)["RequestId"]
self.wait_for_clb_task(request_id)
def resolve_gateway_rule(self, clb_cfg: dict[str, Any]) -> tuple[str, str, str]:
location_id = str(clb_cfg.get("location_id", "")).strip()
domain = str(clb_cfg.get("domain", "")).strip()
url = str(clb_cfg.get("url", "")).strip()
if location_id or (domain and url):
return location_id, domain, url
req = clb_models.DescribeListenersRequest()
req.from_json_string(
json.dumps(
{
"LoadBalancerId": str(clb_cfg["load_balancer_id"]),
"ListenerIds": [str(clb_cfg["listener_id"])],
}
)
)
resp = self.clb.DescribeListeners(req)
body = sdk_body(resp)
listeners = body.get("Listeners", [])
if not listeners:
raise RuntimeError(
f"CLB listener {clb_cfg['listener_id']} not found under load balancer {clb_cfg['load_balancer_id']}"
)
listener = listeners[0]
protocol = str(listener.get("Protocol", "")).upper()
if protocol not in {"HTTP", "HTTPS"}:
return "", "", ""
rules = listener.get("Rules") or []
if len(rules) == 1:
rule = rules[0]
return (
str(rule.get("LocationId", "")).strip(),
str(rule.get("Domain", "")).strip(),
str(rule.get("Url", "")).strip(),
)
raise RuntimeError(
f"listener {clb_cfg['listener_id']} is {protocol} and has {len(rules)} rules; "
"set gateway.clb.location_id or gateway.clb.domain+url in config/prod.yaml"
)
def wait_for_clb_task(self, request_id: str) -> None:
deadline = time.time() + 300
while time.time() < deadline:
req = clb_models.DescribeTaskStatusRequest()
req.from_json_string(json.dumps({"TaskId": request_id}))
resp = self.clb.DescribeTaskStatus(req)
body = sdk_body(resp)
status = int(body["Status"])
if status == 0:
return
if status == 1:
raise RuntimeError(f"CLB async task failed: {request_id}")
time.sleep(2)
raise RuntimeError(f"CLB async task timed out: {request_id}")
def package_key(self) -> str:
package_name = str(self.service_cfg["package_name"])
releases_prefix = str(self.config["cos"]["releases_prefix"]).strip("/")
return f"{releases_prefix}/{self.release_id}/{self.service_name}/{package_name}"
def sha_key(self) -> str:
package_name = str(self.service_cfg["package_name"])
sha_name = package_name[:-4] + ".sha256" if package_name.endswith(".tgz") else f"{package_name}.sha256"
releases_prefix = str(self.config["cos"]["releases_prefix"]).strip("/")
return f"{releases_prefix}/{self.release_id}/{self.service_name}/{sha_name}"
def presign_download_url(self, key: str) -> str:
return self.cos.get_presigned_url(
Method="GET",
Bucket=self.bucket,
Key=key,
Expired=10800,
)
def fetch_text_via_presigned_url(self, key: str) -> str:
url = self.presign_download_url(key)
with urllib.request.urlopen(url, timeout=30) as resp:
return resp.read().decode("utf-8").strip()
def cos_bucket_url(self) -> str:
region = str(self.config["cos"]["region"])
return f"https://{self.bucket}.cos.{region}.myqcloud.com"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Execute Tencent Cloud operations for app-deploy-platform.")
parser.add_argument("--config", required=True, help="Path to config/prod.yaml")
parser.add_argument("--action", required=True, choices=["deploy", "restart", "rollback"])
parser.add_argument("--service", required=True, help="Service name")
parser.add_argument("--instance-id", required=True, help="Tencent Cloud CVM instance ID")
parser.add_argument("--release-id", default="", help="Release ID for deploy or rollback")
return parser.parse_args()
def main() -> int:
args = parse_args()
config = load_yaml(Path(args.config).resolve())
operator = CloudOperator(config, args.service, args.instance_id, args.release_id)
try:
if args.action == "deploy":
operator.deploy()
elif args.action == "restart":
operator.restart()
else:
operator.rollback()
except TencentCloudSDKException as exc:
log(f"tencent cloud sdk error: {exc}")
return 1
except Exception as exc:
log(str(exc))
return 1
return 0
if __name__ == "__main__":
sys.exit(main())