#!/usr/bin/env python3 from __future__ import annotations import argparse import hashlib import json import os import shutil import socket import subprocess import tarfile import time from pathlib import Path from typing import Any from urllib.parse import urlparse import requests import yaml from qcloud_cos import CosConfig, CosS3Client def log(message: str) -> None: ts = time.strftime("%Y-%m-%d %H:%M:%S %z") print(f"[{ts}] {message}", flush=True) def load_yaml(path: Path) -> dict[str, Any]: with path.open("r", encoding="utf-8") as fh: data = yaml.safe_load(fh) if not isinstance(data, dict): raise RuntimeError(f"invalid yaml root in {path}") return data def normalize_remote(remote: str) -> str: value = remote.strip() if not value: return "" if value.endswith(".git"): value = value[:-4] if value.startswith("git@") and ":" in value: return value.split(":", 1)[1].strip("/").lower() parsed = urlparse(value) if parsed.scheme and parsed.netloc: return parsed.path.strip("/").lower() return value.strip("/").lower() def git_host(remote: str) -> str: value = remote.strip() if not value: return "" if value.startswith("git@") and ":" in value: return value.split("@", 1)[1].split(":", 1)[0].strip().lower() parsed = urlparse(value) if parsed.netloc: return parsed.netloc.strip().lower() return "" def run_command( argv: list[str], *, cwd: Path | None = None, env: dict[str, str] | None = None, allow_failure: bool = False, ) -> subprocess.CompletedProcess[str]: log(f"run: {' '.join(argv)}") proc = subprocess.Popen( argv, cwd=str(cwd) if cwd else None, env=env, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, ) output_parts: list[str] = [] assert proc.stdout is not None for line in proc.stdout: output_parts.append(line) print(line, end="", flush=True) returncode = proc.wait() stdout = "".join(output_parts) completed = subprocess.CompletedProcess(argv, returncode, stdout) if returncode != 0 and not allow_failure: raise RuntimeError(f"command failed ({returncode}): {' '.join(argv)}") return completed class BuildOperator: def __init__(self, config: dict[str, Any], service_name: str, release_id: str, branch: str | None) -> None: self.config = config self.service_name = service_name self.release_id = release_id services = dict(config.get("services", {})) if service_name not in services: raise RuntimeError(f"unknown service: {service_name}") self.service_cfg = dict(services[service_name]) self.build_cfg = dict(self.service_cfg.get("build", {})) self.global_build_cfg = dict(config.get("build", {})) self.branch = branch or self.build_cfg.get("default_branch") or "main" cloud_cfg = dict(config.get("tencentcloud", {})) self.secret_id = str(cloud_cfg.get("secret_id", "")).strip() self.secret_key = str(cloud_cfg.get("secret_key", "")).strip() self.session_token = str(cloud_cfg.get("session_token", "")).strip() if not self.secret_id or not self.secret_key: raise RuntimeError("missing tencentcloud.secret_id or tencentcloud.secret_key in config/prod.yaml") self.cos_region = str(config["cos"]["region"]) self.bucket = str(config["cos"]["bucket"]) self.releases_prefix = str(config["cos"]["releases_prefix"]).strip("/") self.workspace_root = Path(str(self.global_build_cfg.get("workspace_root", "/workspace/builds"))).resolve() self.workspace_root.mkdir(parents=True, exist_ok=True) self.repo_search_roots = [ Path(path).resolve() for path in self.global_build_cfg.get("repo_search_roots", []) if str(path).strip() ] self.repo_url = self.resolve_repo_url() self.git_ssh_command = str(self.global_build_cfg.get("git_ssh_command", "")).strip() self.build_host = str(self.global_build_cfg.get("build_host", "")).strip() or socket.gethostname() self.cos = CosS3Client( CosConfig( Region=self.cos_region, SecretId=self.secret_id, SecretKey=self.secret_key, Token=self.session_token or None, Scheme="https", Timeout=30, EnableInternalDomain=False, AutoSwitchDomainOnRetry=True, ) ) def resolve_repo_url(self) -> str: repo_url = str(self.build_cfg.get("repo_url", "")).strip() if repo_url: return repo_url repo_slug = str(self.service_cfg.get("repo", "")).strip() clone_prefix = str(self.global_build_cfg.get("gitea_clone_prefix", "")).strip() if repo_slug and clone_prefix: if clone_prefix.startswith("git@"): return f"{clone_prefix.rstrip(':')}/{repo_slug}.git".replace(":/", ":") return f"{clone_prefix.rstrip('/')}/{repo_slug}.git" if repo_slug: return repo_slug raise RuntimeError(f"missing build.repo_url for service {self.service_name}") def repo_candidates(self) -> list[str]: candidates = [str(item).strip() for item in self.build_cfg.get("repo_candidates", []) if str(item).strip()] if self.service_name not in candidates: candidates.append(self.service_name) return candidates def repo_matches(self, repo_path: Path) -> bool: if not (repo_path / ".git").exists(): return False desired = { normalize_remote(self.repo_url), normalize_remote(str(self.service_cfg.get("repo", ""))), } for remote_name in ("origin", "upstream"): proc = run_command(["git", "remote", "get-url", remote_name], cwd=repo_path, allow_failure=True) if proc.returncode == 0 and normalize_remote(proc.stdout) in desired: return True return False def working_env(self) -> dict[str, str]: env = os.environ.copy() env["CGO_ENABLED"] = "0" env["GOOS"] = "linux" env["GOARCH"] = "amd64" private_host = git_host(self.repo_url) or git_host(str(self.global_build_cfg.get("gitea_clone_prefix", ""))) if private_host: env.setdefault("GOPRIVATE", private_host) env.setdefault("GONOSUMDB", private_host) if self.git_ssh_command: env["GIT_SSH_COMMAND"] = self.git_ssh_command return env def find_repo(self) -> tuple[Path, bool]: for root in self.repo_search_roots: if not root.exists(): continue for candidate in self.repo_candidates(): direct = (root / candidate).resolve() if direct.exists() and self.repo_matches(direct): return direct, False for child in root.iterdir(): if child.is_dir() and self.repo_matches(child.resolve()): return child.resolve(), False clone_root = self.workspace_root / "repos" / self.service_name clone_root.parent.mkdir(parents=True, exist_ok=True) if (clone_root / ".git").exists(): return clone_root, True if clone_root.exists(): shutil.rmtree(clone_root) run_command(["git", "clone", self.repo_url, str(clone_root)], env=self.working_env()) return clone_root, True def repo_is_dirty(self, repo_path: Path) -> bool: proc = run_command(["git", "status", "--porcelain"], cwd=repo_path, allow_failure=True) return proc.returncode == 0 and bool(proc.stdout.strip()) def ensure_buildable_repo(self) -> Path: repo_path, managed_clone = self.find_repo() if not managed_clone and self.repo_is_dirty(repo_path): log(f"repo {repo_path} has local changes, use isolated clone instead") clone_root = self.workspace_root / "repos" / self.service_name if clone_root.exists() and not (clone_root / ".git").exists(): shutil.rmtree(clone_root) if not (clone_root / ".git").exists(): run_command(["git", "clone", self.repo_url, str(clone_root)], env=self.working_env()) repo_path = clone_root managed_clone = True checkout_env = self.working_env() run_command(["git", "fetch", "--all", "--prune"], cwd=repo_path, env=checkout_env, allow_failure=not managed_clone) run_command(["git", "checkout", self.branch], cwd=repo_path, env=checkout_env) run_command(["git", "pull", "--ff-only", "origin", self.branch], cwd=repo_path, env=checkout_env, allow_failure=not managed_clone) return repo_path def build(self) -> dict[str, Any]: repo_path = self.ensure_buildable_repo() work_dir = (repo_path / str(self.build_cfg.get("work_dir", ".")).strip()).resolve() if not work_dir.exists(): raise RuntimeError(f"build.work_dir does not exist: {work_dir}") commit_sha = run_command(["git", "rev-parse", "HEAD"], cwd=repo_path).stdout.strip() package_name = str(self.service_cfg["package_name"]) binary_name = str(self.build_cfg.get("binary_name", self.service_name)) build_target = str(self.build_cfg.get("build_target", ".")) config_source = (work_dir / str(self.build_cfg.get("config_source", "config/prod.yaml"))).resolve() if not config_source.exists(): raise RuntimeError(f"config file not found: {config_source}") out_dir = self.workspace_root / "artifacts" / self.service_name / self.release_id pkg_dir = out_dir / "package" if out_dir.exists(): shutil.rmtree(out_dir) (pkg_dir / "bin").mkdir(parents=True, exist_ok=True) (pkg_dir / "config").mkdir(parents=True, exist_ok=True) build_env = self.working_env() build_env.update( { "SERVICE_NAME": self.service_name, "RELEASE_ID": self.release_id, "BINARY_NAME": binary_name, "BUILD_TARGET": build_target, "CONFIG_SOURCE": str(config_source), "OUT_DIR": str(out_dir), "PKG_DIR": str(pkg_dir), } ) custom_build = str(self.build_cfg.get("build_command", "")).strip() if custom_build: run_command(["/bin/sh", "-lc", custom_build], cwd=work_dir, env=build_env) else: run_command(["go", "test", "./..."], cwd=work_dir, env=build_env) run_command( [ "go", "build", "-trimpath", "-ldflags=-s -w", "-o", str(pkg_dir / "bin" / binary_name), build_target, ], cwd=work_dir, env=build_env, ) shutil.copy2(config_source, pkg_dir / "config" / "prod.yaml") tgz_path = out_dir / package_name with tarfile.open(tgz_path, "w:gz") as tar: for item in pkg_dir.iterdir(): tar.add(item, arcname=item.name) sha256 = hashlib.sha256(tgz_path.read_bytes()).hexdigest() sha_path = out_dir / f"{self.service_name}.sha256" sha_path.write_text(f"{sha256}\n", encoding="utf-8") cos_key = f"{self.releases_prefix}/{self.release_id}/{self.service_name}/{package_name}" cos_sha_key = f"{self.releases_prefix}/{self.release_id}/{self.service_name}/{self.service_name}.sha256" self.upload_file(tgz_path, cos_key) self.upload_file(sha_path, cos_sha_key) return { "service_name": self.service_name, "release_id": self.release_id, "branch": self.branch, "repo_url": self.repo_url, "repo_path": str(repo_path), "commit_sha": commit_sha, "build_host": self.build_host, "cos_key": cos_key, "artifact_url": self.cos_object_url(cos_key), "sha256": sha256, } def upload_file(self, local_path: Path, key: str) -> None: log(f"upload: {local_path} -> {key}") upload_url = self.cos.get_presigned_url( Method="PUT", Bucket=self.bucket, Key=key, Expired=3600, ) body = local_path.read_bytes() session = requests.Session() session.trust_env = False response = session.put( upload_url, data=body, headers={ "Content-Length": str(len(body)), "Content-Type": "application/octet-stream", }, timeout=(10, 60), allow_redirects=False, ) if response.status_code < 200 or response.status_code >= 300: raise RuntimeError(f"upload {key} failed with status {response.status_code}: {response.text[:500]}") def cos_object_url(self, key: str) -> str: return f"https://{self.bucket}.cos.{self.cos_region}.myqcloud.com/{key}" def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Build chatapp service and upload artifact to COS.") parser.add_argument("--config", required=True, help="Path to config/prod.yaml") parser.add_argument("--service", required=True, help="Service name") parser.add_argument("--release-id", required=True, help="Release ID") parser.add_argument("--branch", default="", help="Git branch") return parser.parse_args() def main() -> int: args = parse_args() cfg = load_yaml(Path(args.config).resolve()) operator = BuildOperator(cfg, args.service, args.release_id, args.branch or None) result = operator.build() print(f"BUILD_RESULT_JSON={json.dumps(result, ensure_ascii=False)}", flush=True) return 0 if __name__ == "__main__": raise SystemExit(main())