2026-04-07 12:30:27 +08:00

365 lines
14 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import hashlib
import json
import os
import shutil
import socket
import subprocess
import tarfile
import time
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
import requests
import yaml
from qcloud_cos import CosConfig, CosS3Client
def log(message: str) -> None:
ts = time.strftime("%Y-%m-%d %H:%M:%S %z")
print(f"[{ts}] {message}", flush=True)
def load_yaml(path: Path) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as fh:
data = yaml.safe_load(fh)
if not isinstance(data, dict):
raise RuntimeError(f"invalid yaml root in {path}")
return data
def normalize_remote(remote: str) -> str:
value = remote.strip()
if not value:
return ""
if value.endswith(".git"):
value = value[:-4]
if value.startswith("git@") and ":" in value:
return value.split(":", 1)[1].strip("/").lower()
parsed = urlparse(value)
if parsed.scheme and parsed.netloc:
return parsed.path.strip("/").lower()
return value.strip("/").lower()
def git_host(remote: str) -> str:
value = remote.strip()
if not value:
return ""
if value.startswith("git@") and ":" in value:
return value.split("@", 1)[1].split(":", 1)[0].strip().lower()
parsed = urlparse(value)
if parsed.netloc:
return parsed.netloc.strip().lower()
return ""
def run_command(
argv: list[str],
*,
cwd: Path | None = None,
env: dict[str, str] | None = None,
allow_failure: bool = False,
) -> subprocess.CompletedProcess[str]:
log(f"run: {' '.join(argv)}")
proc = subprocess.Popen(
argv,
cwd=str(cwd) if cwd else None,
env=env,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
)
output_parts: list[str] = []
assert proc.stdout is not None
for line in proc.stdout:
output_parts.append(line)
print(line, end="", flush=True)
returncode = proc.wait()
stdout = "".join(output_parts)
completed = subprocess.CompletedProcess(argv, returncode, stdout)
if returncode != 0 and not allow_failure:
raise RuntimeError(f"command failed ({returncode}): {' '.join(argv)}")
return completed
class BuildOperator:
def __init__(self, config: dict[str, Any], service_name: str, release_id: str, branch: str | None) -> None:
self.config = config
self.service_name = service_name
self.release_id = release_id
services = dict(config.get("services", {}))
if service_name not in services:
raise RuntimeError(f"unknown service: {service_name}")
self.service_cfg = dict(services[service_name])
self.build_cfg = dict(self.service_cfg.get("build", {}))
self.global_build_cfg = dict(config.get("build", {}))
self.branch = branch or self.build_cfg.get("default_branch") or "main"
cloud_cfg = dict(config.get("tencentcloud", {}))
self.secret_id = str(cloud_cfg.get("secret_id", "")).strip()
self.secret_key = str(cloud_cfg.get("secret_key", "")).strip()
self.session_token = str(cloud_cfg.get("session_token", "")).strip()
if not self.secret_id or not self.secret_key:
raise RuntimeError("missing tencentcloud.secret_id or tencentcloud.secret_key in config/prod.yaml")
self.cos_region = str(config["cos"]["region"])
self.bucket = str(config["cos"]["bucket"])
self.releases_prefix = str(config["cos"]["releases_prefix"]).strip("/")
self.workspace_root = Path(str(self.global_build_cfg.get("workspace_root", "/workspace/builds"))).resolve()
self.workspace_root.mkdir(parents=True, exist_ok=True)
self.repo_search_roots = [
Path(path).resolve() for path in self.global_build_cfg.get("repo_search_roots", []) if str(path).strip()
]
self.repo_url = self.resolve_repo_url()
self.git_ssh_command = str(self.global_build_cfg.get("git_ssh_command", "")).strip()
self.build_host = str(self.global_build_cfg.get("build_host", "")).strip() or socket.gethostname()
self.cos = CosS3Client(
CosConfig(
Region=self.cos_region,
SecretId=self.secret_id,
SecretKey=self.secret_key,
Token=self.session_token or None,
Scheme="https",
Timeout=30,
EnableInternalDomain=False,
AutoSwitchDomainOnRetry=True,
)
)
def resolve_repo_url(self) -> str:
repo_url = str(self.build_cfg.get("repo_url", "")).strip()
if repo_url:
return repo_url
repo_slug = str(self.service_cfg.get("repo", "")).strip()
clone_prefix = str(self.global_build_cfg.get("gitea_clone_prefix", "")).strip()
if repo_slug and clone_prefix:
if clone_prefix.startswith("git@"):
return f"{clone_prefix.rstrip(':')}/{repo_slug}.git".replace(":/", ":")
return f"{clone_prefix.rstrip('/')}/{repo_slug}.git"
if repo_slug:
return repo_slug
raise RuntimeError(f"missing build.repo_url for service {self.service_name}")
def repo_candidates(self) -> list[str]:
candidates = [str(item).strip() for item in self.build_cfg.get("repo_candidates", []) if str(item).strip()]
if self.service_name not in candidates:
candidates.append(self.service_name)
return candidates
def repo_matches(self, repo_path: Path) -> bool:
if not (repo_path / ".git").exists():
return False
desired = {
normalize_remote(self.repo_url),
normalize_remote(str(self.service_cfg.get("repo", ""))),
}
for remote_name in ("origin", "upstream"):
proc = run_command(["git", "remote", "get-url", remote_name], cwd=repo_path, allow_failure=True)
if proc.returncode == 0 and normalize_remote(proc.stdout) in desired:
return True
return False
def working_env(self) -> dict[str, str]:
env = os.environ.copy()
env["CGO_ENABLED"] = "0"
env["GOOS"] = "linux"
env["GOARCH"] = "amd64"
private_host = git_host(self.repo_url) or git_host(str(self.global_build_cfg.get("gitea_clone_prefix", "")))
if private_host:
env.setdefault("GOPRIVATE", private_host)
env.setdefault("GONOSUMDB", private_host)
if self.git_ssh_command:
env["GIT_SSH_COMMAND"] = self.git_ssh_command
return env
def find_repo(self) -> tuple[Path, bool]:
for root in self.repo_search_roots:
if not root.exists():
continue
for candidate in self.repo_candidates():
direct = (root / candidate).resolve()
if direct.exists() and self.repo_matches(direct):
return direct, False
for child in root.iterdir():
if child.is_dir() and self.repo_matches(child.resolve()):
return child.resolve(), False
clone_root = self.workspace_root / "repos" / self.service_name
clone_root.parent.mkdir(parents=True, exist_ok=True)
if (clone_root / ".git").exists():
return clone_root, True
if clone_root.exists():
shutil.rmtree(clone_root)
run_command(["git", "clone", self.repo_url, str(clone_root)], env=self.working_env())
return clone_root, True
def repo_is_dirty(self, repo_path: Path) -> bool:
proc = run_command(["git", "status", "--porcelain"], cwd=repo_path, allow_failure=True)
return proc.returncode == 0 and bool(proc.stdout.strip())
def ensure_buildable_repo(self) -> Path:
repo_path, managed_clone = self.find_repo()
if not managed_clone and self.repo_is_dirty(repo_path):
log(f"repo {repo_path} has local changes, use isolated clone instead")
clone_root = self.workspace_root / "repos" / self.service_name
if clone_root.exists() and not (clone_root / ".git").exists():
shutil.rmtree(clone_root)
if not (clone_root / ".git").exists():
run_command(["git", "clone", self.repo_url, str(clone_root)], env=self.working_env())
repo_path = clone_root
managed_clone = True
checkout_env = self.working_env()
run_command(["git", "fetch", "--all", "--prune"], cwd=repo_path, env=checkout_env, allow_failure=not managed_clone)
run_command(["git", "checkout", self.branch], cwd=repo_path, env=checkout_env)
run_command(["git", "pull", "--ff-only", "origin", self.branch], cwd=repo_path, env=checkout_env, allow_failure=not managed_clone)
return repo_path
def build(self) -> dict[str, Any]:
repo_path = self.ensure_buildable_repo()
work_dir = (repo_path / str(self.build_cfg.get("work_dir", ".")).strip()).resolve()
if not work_dir.exists():
raise RuntimeError(f"build.work_dir does not exist: {work_dir}")
commit_sha = run_command(["git", "rev-parse", "HEAD"], cwd=repo_path).stdout.strip()
package_name = str(self.service_cfg["package_name"])
binary_name = str(self.build_cfg.get("binary_name", self.service_name))
build_target = str(self.build_cfg.get("build_target", "."))
config_source = (work_dir / str(self.build_cfg.get("config_source", "config/prod.yaml"))).resolve()
if not config_source.exists():
raise RuntimeError(f"config file not found: {config_source}")
out_dir = self.workspace_root / "artifacts" / self.service_name / self.release_id
pkg_dir = out_dir / "package"
if out_dir.exists():
shutil.rmtree(out_dir)
(pkg_dir / "bin").mkdir(parents=True, exist_ok=True)
(pkg_dir / "config").mkdir(parents=True, exist_ok=True)
build_env = self.working_env()
build_env.update(
{
"SERVICE_NAME": self.service_name,
"RELEASE_ID": self.release_id,
"BINARY_NAME": binary_name,
"BUILD_TARGET": build_target,
"CONFIG_SOURCE": str(config_source),
"OUT_DIR": str(out_dir),
"PKG_DIR": str(pkg_dir),
}
)
custom_build = str(self.build_cfg.get("build_command", "")).strip()
if custom_build:
run_command(["/bin/sh", "-lc", custom_build], cwd=work_dir, env=build_env)
else:
run_command(["go", "test", "./..."], cwd=work_dir, env=build_env)
run_command(
[
"go",
"build",
"-trimpath",
"-ldflags=-s -w",
"-o",
str(pkg_dir / "bin" / binary_name),
build_target,
],
cwd=work_dir,
env=build_env,
)
shutil.copy2(config_source, pkg_dir / "config" / "prod.yaml")
tgz_path = out_dir / package_name
with tarfile.open(tgz_path, "w:gz") as tar:
for item in pkg_dir.iterdir():
tar.add(item, arcname=item.name)
sha256 = hashlib.sha256(tgz_path.read_bytes()).hexdigest()
sha_path = out_dir / f"{self.service_name}.sha256"
sha_path.write_text(f"{sha256}\n", encoding="utf-8")
cos_key = f"{self.releases_prefix}/{self.release_id}/{self.service_name}/{package_name}"
cos_sha_key = f"{self.releases_prefix}/{self.release_id}/{self.service_name}/{self.service_name}.sha256"
self.upload_file(tgz_path, cos_key)
self.upload_file(sha_path, cos_sha_key)
return {
"service_name": self.service_name,
"release_id": self.release_id,
"branch": self.branch,
"repo_url": self.repo_url,
"repo_path": str(repo_path),
"commit_sha": commit_sha,
"build_host": self.build_host,
"cos_key": cos_key,
"artifact_url": self.cos_object_url(cos_key),
"sha256": sha256,
}
def upload_file(self, local_path: Path, key: str) -> None:
log(f"upload: {local_path} -> {key}")
upload_url = self.cos.get_presigned_url(
Method="PUT",
Bucket=self.bucket,
Key=key,
Expired=3600,
)
body = local_path.read_bytes()
session = requests.Session()
session.trust_env = False
response = session.put(
upload_url,
data=body,
headers={
"Content-Length": str(len(body)),
"Content-Type": "application/octet-stream",
},
timeout=(10, 60),
allow_redirects=False,
)
if response.status_code < 200 or response.status_code >= 300:
raise RuntimeError(f"upload {key} failed with status {response.status_code}: {response.text[:500]}")
def cos_object_url(self, key: str) -> str:
return f"https://{self.bucket}.cos.{self.cos_region}.myqcloud.com/{key}"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Build chatapp service and upload artifact to COS.")
parser.add_argument("--config", required=True, help="Path to config/prod.yaml")
parser.add_argument("--service", required=True, help="Service name")
parser.add_argument("--release-id", required=True, help="Release ID")
parser.add_argument("--branch", default="", help="Git branch")
return parser.parse_args()
def main() -> int:
args = parse_args()
cfg = load_yaml(Path(args.config).resolve())
operator = BuildOperator(cfg, args.service, args.release_id, args.branch or None)
result = operator.build()
print(f"BUILD_RESULT_JSON={json.dumps(result, ensure_ascii=False)}", flush=True)
return 0
if __name__ == "__main__":
raise SystemExit(main())