2026-04-07 13:19:50 +08:00

1101 lines
33 KiB
Go

package service
import (
"context"
"errors"
"fmt"
"log"
"sort"
"strings"
"time"
"github.com/redis/go-redis/v9"
"gorm.io/gorm"
"app-deploy-platform/backend/internal/config"
"app-deploy-platform/backend/internal/model"
"app-deploy-platform/backend/internal/orchestrator"
"app-deploy-platform/backend/internal/queue"
)
const (
jobKindDeployment = "deployment"
jobKindBuild = "build"
jobKindReleaseRun = "release_run"
)
type Manager struct {
db *gorm.DB
queue queue.JobQueue
deployExecutor orchestrator.Executor
buildExecutor orchestrator.BuildExecutor
redis *redis.Client
cfg config.Config
}
type CreateDeploymentRequest struct {
ServiceName string `json:"service_name"`
ReleaseID string `json:"release_id"`
Operation string `json:"operation"`
Operator string `json:"operator"`
HostIDs []uint `json:"host_ids"`
}
type CreateBuildRequest struct {
ServiceName string `json:"service_name"`
ReleaseID string `json:"release_id"`
Branch string `json:"branch"`
Operator string `json:"operator"`
}
type CreateReleaseRunRequest struct {
ReleaseID string `json:"release_id"`
Operator string `json:"operator"`
}
type Overview struct {
HostCount int64 `json:"host_count"`
ServiceCount int64 `json:"service_count"`
ReleaseCount int64 `json:"release_count"`
BuildCount int64 `json:"build_count"`
QueuedCount int64 `json:"queued_count"`
RunningCount int64 `json:"running_count"`
FailedCount int64 `json:"failed_count"`
SuccessfulCount int64 `json:"successful_count"`
BuildQueuedCount int64 `json:"build_queued_count"`
BuildRunningCount int64 `json:"build_running_count"`
BuildFailedCount int64 `json:"build_failed_count"`
Hosts []model.Host `json:"hosts"`
ServiceInstances []model.ServiceInstance `json:"service_instances"`
RecentDeployments []model.Deployment `json:"recent_deployments"`
RecentBuilds []model.BuildJob `json:"recent_builds"`
}
type CatalogService struct {
Name string `json:"name"`
Repo string `json:"repo"`
DefaultBranch string `json:"default_branch"`
PackageName string `json:"package_name"`
}
func NewManager(
db *gorm.DB,
jobQueue queue.JobQueue,
deployExecutor orchestrator.Executor,
buildExecutor orchestrator.BuildExecutor,
redisClient *redis.Client,
cfg config.Config,
) *Manager {
return &Manager{
db: db,
queue: jobQueue,
deployExecutor: deployExecutor,
buildExecutor: buildExecutor,
redis: redisClient,
cfg: cfg,
}
}
func (m *Manager) StartConsumer(ctx context.Context) {
if err := m.queue.StartConsumer(ctx, func(jobCtx context.Context, job queue.Job) error {
switch job.Kind {
case jobKindBuild:
return m.ProcessBuild(jobCtx, job.BuildID)
case jobKindReleaseRun:
return m.ProcessReleaseRun(jobCtx, job.ReleaseRunID)
case jobKindDeployment:
fallthrough
default:
return m.ProcessDeployment(jobCtx, job.DeploymentID)
}
}); err != nil {
log.Printf("start consumer failed: %v", err)
}
}
func (m *Manager) CreateBuild(ctx context.Context, req CreateBuildRequest) (*model.BuildJob, error) {
if req.ServiceName == "" {
return nil, errors.New("service_name is required")
}
if req.ReleaseID == "" {
return nil, errors.New("release_id is required")
}
serviceCfg, ok := m.cfg.Services[req.ServiceName]
if !ok {
return nil, fmt.Errorf("unknown service: %s", req.ServiceName)
}
now := time.Now()
buildJob := &model.BuildJob{
ServiceName: req.ServiceName,
ReleaseID: req.ReleaseID,
Branch: firstNonEmpty(req.Branch, serviceCfg.Build.DefaultBranch, "main"),
Status: model.BuildQueued,
Operator: req.Operator,
BuildHost: m.cfg.Build.BuildHost,
RepoURL: serviceCfg.Build.RepoURL,
StartedAt: &now,
}
if err := m.db.WithContext(ctx).Create(buildJob).Error; err != nil {
return nil, err
}
if err := m.queue.Publish(ctx, queue.Job{Kind: jobKindBuild, BuildID: buildJob.ID}); err != nil {
return nil, err
}
return m.GetBuild(ctx, buildJob.ID)
}
func (m *Manager) ProcessBuild(ctx context.Context, buildID uint) error {
buildJob, err := m.GetBuild(ctx, buildID)
if err != nil {
return err
}
startedAt := time.Now()
if err := m.db.WithContext(ctx).Model(&model.BuildJob{}).
Where("id = ?", buildID).
Updates(map[string]any{
"status": model.BuildRunning,
"started_at": &startedAt,
"log_output": "",
"log_excerpt": "",
"error_message": "",
}).Error; err != nil {
return err
}
var liveOutput strings.Builder
lastPersistedAt := time.Now()
lastPersistedLen := 0
persistLiveOutput := func(force bool) {
raw := liveOutput.String()
if !force {
if len(raw) == lastPersistedLen {
return
}
if time.Since(lastPersistedAt) < time.Second && len(raw)-lastPersistedLen < 1024 {
return
}
}
if err := m.db.Model(&model.BuildJob{}).
Where("id = ?", buildID).
Updates(map[string]any{
"log_output": raw,
"log_excerpt": trimLog(raw),
}).Error; err != nil {
log.Printf("persist build log %d: %v", buildID, err)
return
}
lastPersistedAt = time.Now()
lastPersistedLen = len(raw)
}
result, output, err := m.buildExecutor.Build(ctx, orchestrator.BuildContext{
ServiceName: buildJob.ServiceName,
ReleaseID: buildJob.ReleaseID,
Branch: buildJob.Branch,
OnOutput: func(chunk string) {
if chunk == "" {
return
}
liveOutput.WriteString(chunk)
persistLiveOutput(false)
},
})
rawOutput := output
if rawOutput == "" {
rawOutput = liveOutput.String()
}
if rawOutput != "" && liveOutput.Len() == 0 {
liveOutput.WriteString(rawOutput)
}
persistLiveOutput(true)
finishedAt := time.Now()
if err != nil {
return m.db.WithContext(ctx).Model(&model.BuildJob{}).
Where("id = ?", buildID).
Updates(map[string]any{
"status": model.BuildFailed,
"log_output": rawOutput,
"log_excerpt": trimLog(rawOutput),
"error_message": summarizeBuildError(err, rawOutput),
"finished_at": &finishedAt,
}).Error
}
release := &model.Release{}
txErr := m.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
if err := tx.Model(&model.BuildJob{}).
Where("id = ?", buildID).
Updates(map[string]any{
"status": model.BuildSuccess,
"repo_url": result.RepoURL,
"repo_path": result.RepoPath,
"commit_sha": result.CommitSHA,
"build_host": result.BuildHost,
"cos_key": result.COSKey,
"artifact_url": result.ArtifactURL,
"sha256": result.SHA256,
"log_output": rawOutput,
"log_excerpt": trimLog(rawOutput),
"finished_at": &finishedAt,
}).Error; err != nil {
return err
}
err := tx.Where("service_name = ? AND release_id = ?", result.ServiceName, result.ReleaseID).
Assign(model.Release{
ServiceName: result.ServiceName,
ReleaseID: result.ReleaseID,
GitSHA: result.CommitSHA,
COSKey: result.COSKey,
SHA256: result.SHA256,
ArtifactURL: result.ArtifactURL,
BuildHost: result.BuildHost,
}).
FirstOrCreate(release).Error
return err
})
if txErr != nil {
return txErr
}
return nil
}
func (m *Manager) CreateReleaseRun(ctx context.Context, req CreateReleaseRunRequest) (*model.ReleaseRun, error) {
if req.ReleaseID == "" {
return nil, errors.New("release_id is required")
}
order := m.releaseOrder()
if len(order) == 0 {
return nil, errors.New("release order is not configured")
}
for _, serviceName := range order {
var count int64
if err := m.db.WithContext(ctx).
Model(&model.Release{}).
Where("service_name = ? AND release_id = ?", serviceName, req.ReleaseID).
Count(&count).Error; err != nil {
return nil, err
}
if count == 0 {
return nil, fmt.Errorf("release %s not found for service %s", req.ReleaseID, serviceName)
}
}
now := time.Now()
run := &model.ReleaseRun{
ReleaseID: req.ReleaseID,
Status: model.DeploymentQueued,
TriggerSource: "ui",
Operator: req.Operator,
StartedAt: &now,
}
err := m.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
if err := tx.Create(run).Error; err != nil {
return err
}
for index, serviceName := range order {
step := &model.ReleaseRunStep{
ReleaseRunID: run.ID,
Sequence: index + 1,
ServiceName: serviceName,
Status: model.DeploymentQueued,
}
if err := tx.Create(step).Error; err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}
if err := m.queue.Publish(ctx, queue.Job{Kind: jobKindReleaseRun, ReleaseRunID: run.ID}); err != nil {
return nil, err
}
return m.GetReleaseRun(ctx, run.ID)
}
func (m *Manager) ProcessReleaseRun(ctx context.Context, runID uint) error {
run, err := m.GetReleaseRun(ctx, runID)
if err != nil {
return err
}
startedAt := time.Now()
if err := m.db.WithContext(ctx).Model(&model.ReleaseRun{}).
Where("id = ?", runID).
Updates(map[string]any{
"status": model.DeploymentRunning,
"started_at": &startedAt,
}).Error; err != nil {
return err
}
completedSteps := make([]model.ReleaseRunStep, 0, len(run.Steps))
for _, step := range run.Steps {
stepStartedAt := time.Now()
previousReleaseID, err := m.resolveServiceBaselineRelease(ctx, step.ServiceName)
if err != nil {
failedAt := time.Now()
_ = m.db.WithContext(ctx).Model(&model.ReleaseRunStep{}).
Where("id = ?", step.ID).
Updates(map[string]any{
"status": model.DeploymentFailed,
"error_message": err.Error(),
"finished_at": &failedAt,
}).Error
_ = m.db.WithContext(ctx).Model(&model.ReleaseRun{}).
Where("id = ?", runID).
Updates(map[string]any{
"status": model.DeploymentFailed,
"error_message": err.Error(),
"finished_at": &failedAt,
}).Error
return err
}
if err := m.db.WithContext(ctx).Model(&model.ReleaseRunStep{}).
Where("id = ?", step.ID).
Updates(map[string]any{
"status": model.DeploymentRunning,
"previous_release_id": previousReleaseID,
"started_at": &stepStartedAt,
}).Error; err != nil {
return err
}
step.PreviousReleaseID = previousReleaseID
deployment, err := m.createDeploymentRecord(ctx, CreateDeploymentRequest{
ServiceName: step.ServiceName,
ReleaseID: run.ReleaseID,
Operation: model.OperationDeploy,
Operator: run.Operator,
}, "release_run", false)
if err != nil {
stepFinishedAt := time.Now()
_ = m.db.WithContext(ctx).Model(&model.ReleaseRunStep{}).
Where("id = ?", step.ID).
Updates(map[string]any{
"status": model.DeploymentFailed,
"error_message": err.Error(),
"finished_at": &stepFinishedAt,
}).Error
_ = m.db.WithContext(ctx).Model(&model.ReleaseRun{}).
Where("id = ?", runID).
Updates(map[string]any{
"status": model.DeploymentFailed,
"error_message": err.Error(),
"finished_at": &stepFinishedAt,
}).Error
return err
}
if err := m.db.WithContext(ctx).Model(&model.ReleaseRunStep{}).
Where("id = ?", step.ID).
Update("deployment_id", deployment.ID).Error; err != nil {
return err
}
if err := m.ProcessDeployment(ctx, deployment.ID); err != nil {
latestDeployment, getErr := m.GetDeployment(ctx, deployment.ID)
logExcerpt := ""
if getErr == nil {
logExcerpt = deploymentLogExcerpt(latestDeployment)
}
stepFinishedAt := time.Now()
_ = m.db.WithContext(ctx).Model(&model.ReleaseRunStep{}).
Where("id = ?", step.ID).
Updates(map[string]any{
"status": model.DeploymentFailed,
"error_message": err.Error(),
"log_excerpt": trimLog(logExcerpt),
"finished_at": &stepFinishedAt,
}).Error
rollbackMessage := m.rollbackCompletedReleaseRunSteps(ctx, runID, run.ReleaseID, run.Operator, completedSteps)
runError := err.Error()
if rollbackMessage != "" {
runError = runError + "\n" + rollbackMessage
}
_ = m.db.WithContext(ctx).Model(&model.ReleaseRun{}).
Where("id = ?", runID).
Updates(map[string]any{
"status": model.DeploymentFailed,
"error_message": runError,
"finished_at": &stepFinishedAt,
}).Error
if rollbackMessage != "" {
return fmt.Errorf("%w; %s", err, rollbackMessage)
}
return err
}
latestDeployment, getErr := m.GetDeployment(ctx, deployment.ID)
if getErr != nil {
return getErr
}
stepFinishedAt := time.Now()
if err := m.db.WithContext(ctx).Model(&model.ReleaseRunStep{}).
Where("id = ?", step.ID).
Updates(map[string]any{
"status": model.DeploymentSuccess,
"log_excerpt": trimLog(deploymentLogExcerpt(latestDeployment)),
"finished_at": &stepFinishedAt,
}).Error; err != nil {
return err
}
step.Status = model.DeploymentSuccess
step.DeploymentID = &deployment.ID
completedSteps = append(completedSteps, step)
}
finishedAt := time.Now()
return m.db.WithContext(ctx).Model(&model.ReleaseRun{}).
Where("id = ?", runID).
Updates(map[string]any{
"status": model.DeploymentSuccess,
"finished_at": &finishedAt,
}).Error
}
func (m *Manager) CreateDeployment(ctx context.Context, req CreateDeploymentRequest) (*model.Deployment, error) {
return m.createDeploymentRecord(ctx, req, "ui", true)
}
func (m *Manager) createDeploymentRecord(ctx context.Context, req CreateDeploymentRequest, triggerSource string, publish bool) (*model.Deployment, error) {
if req.ServiceName == "" {
return nil, errors.New("service_name is required")
}
if req.Operation == "" {
req.Operation = model.OperationDeploy
}
if req.Operation != model.OperationRestart {
var loaded model.Release
err := m.db.WithContext(ctx).
Where("service_name = ? AND release_id = ?", req.ServiceName, req.ReleaseID).
Order("id desc").
First(&loaded).Error
if err != nil {
return nil, fmt.Errorf("release not found: %w", err)
}
}
instancesQuery := m.db.WithContext(ctx).Preload("Host").Where("service_name = ?", req.ServiceName)
if len(req.HostIDs) > 0 {
instancesQuery = instancesQuery.Where("host_id IN ?", req.HostIDs)
}
instancesQuery = instancesQuery.Order("host_id asc")
var instances []model.ServiceInstance
if err := instancesQuery.Find(&instances).Error; err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, errors.New("no service instances matched the request")
}
now := time.Now()
deployment := &model.Deployment{
ServiceName: req.ServiceName,
ReleaseID: req.ReleaseID,
Operation: req.Operation,
Status: model.DeploymentQueued,
TriggerSource: triggerSource,
Operator: req.Operator,
StartedAt: &now,
}
err := m.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
if err := tx.Create(deployment).Error; err != nil {
return err
}
for _, instance := range instances {
target := model.DeploymentTarget{
DeploymentID: deployment.ID,
ServiceInstanceID: instance.ID,
HostID: instance.HostID,
Status: model.DeploymentQueued,
Step: "queued",
}
if err := tx.Create(&target).Error; err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}
if publish {
if err := m.queue.Publish(ctx, queue.Job{Kind: jobKindDeployment, DeploymentID: deployment.ID}); err != nil {
return nil, err
}
}
return m.GetDeployment(ctx, deployment.ID)
}
func (m *Manager) CreateRestartForInstance(ctx context.Context, instanceID uint, operator string) (*model.Deployment, error) {
var instance model.ServiceInstance
if err := m.db.WithContext(ctx).First(&instance, instanceID).Error; err != nil {
return nil, err
}
return m.CreateDeployment(ctx, CreateDeploymentRequest{
ServiceName: instance.ServiceName,
Operation: model.OperationRestart,
Operator: operator,
HostIDs: []uint{instance.HostID},
})
}
func (m *Manager) CreateRollback(ctx context.Context, deploymentID uint, releaseID, operator string) (*model.Deployment, error) {
var source model.Deployment
if err := m.db.WithContext(ctx).First(&source, deploymentID).Error; err != nil {
return nil, err
}
if releaseID == "" {
releaseID = source.ReleaseID
}
return m.CreateDeployment(ctx, CreateDeploymentRequest{
ServiceName: source.ServiceName,
ReleaseID: releaseID,
Operation: model.OperationRollback,
Operator: operator,
})
}
func (m *Manager) ProcessDeployment(ctx context.Context, deploymentID uint) error {
deployment, err := m.GetDeployment(ctx, deploymentID)
if err != nil {
return err
}
now := time.Now()
if err := m.db.WithContext(ctx).Model(&model.Deployment{}).
Where("id = ?", deploymentID).
Updates(map[string]any{
"status": model.DeploymentRunning,
"started_at": &now,
}).Error; err != nil {
return err
}
var release *model.Release
if deployment.Operation != model.OperationRestart {
var loaded model.Release
if err := m.db.WithContext(ctx).
Where("service_name = ? AND release_id = ?", deployment.ServiceName, deployment.ReleaseID).
Order("id desc").
First(&loaded).Error; err != nil {
return err
}
release = &loaded
}
for _, target := range deployment.Targets {
targetStartedAt := time.Now()
if err := m.db.WithContext(ctx).Model(&model.DeploymentTarget{}).
Where("id = ?", target.ID).
Updates(map[string]any{
"status": model.DeploymentRunning,
"step": "executing",
"started_at": &targetStartedAt,
}).Error; err != nil {
return err
}
executionTarget := orchestrator.Target{
Host: target.Host,
Instance: target.ServiceInstance,
}
executionContext := orchestrator.Context{
Deployment: *deployment,
Release: release,
}
var output string
switch deployment.Operation {
case model.OperationRestart:
output, err = m.deployExecutor.Restart(ctx, executionTarget, executionContext)
case model.OperationRollback:
output, err = m.deployExecutor.Rollback(ctx, executionTarget, executionContext)
default:
output, err = m.deployExecutor.Deploy(ctx, executionTarget, executionContext)
}
targetFinishedAt := time.Now()
if err != nil {
errorMessage := summarizeDeploymentError(err, output)
_ = m.db.WithContext(ctx).Model(&model.DeploymentTarget{}).
Where("id = ?", target.ID).
Updates(map[string]any{
"status": model.DeploymentFailed,
"step": "failed",
"log_excerpt": trimLog(output),
"finished_at": &targetFinishedAt,
}).Error
_ = m.db.WithContext(ctx).Model(&model.Deployment{}).
Where("id = ?", deploymentID).
Updates(map[string]any{
"status": model.DeploymentFailed,
"error_message": errorMessage,
"finished_at": &targetFinishedAt,
}).Error
return errors.New(errorMessage)
}
updates := map[string]any{
"status": model.DeploymentSuccess,
"step": "completed",
"log_excerpt": trimLog(output),
"finished_at": &targetFinishedAt,
}
if err := m.db.WithContext(ctx).Model(&model.DeploymentTarget{}).
Where("id = ?", target.ID).
Updates(updates).Error; err != nil {
return err
}
if release != nil {
if err := m.db.WithContext(ctx).Model(&model.ServiceInstance{}).
Where("id = ?", target.ServiceInstanceID).
Update("current_release_id", release.ReleaseID).Error; err != nil {
return err
}
}
}
finishedAt := time.Now()
return m.db.WithContext(ctx).Model(&model.Deployment{}).
Where("id = ?", deploymentID).
Updates(map[string]any{
"status": model.DeploymentSuccess,
"finished_at": &finishedAt,
}).Error
}
func (m *Manager) GetOverview(ctx context.Context) (*Overview, error) {
overview := &Overview{}
if err := m.db.WithContext(ctx).Model(&model.Host{}).Count(&overview.HostCount).Error; err != nil {
return nil, err
}
if err := m.db.WithContext(ctx).Model(&model.ServiceInstance{}).Count(&overview.ServiceCount).Error; err != nil {
return nil, err
}
if err := m.db.WithContext(ctx).Model(&model.Release{}).Count(&overview.ReleaseCount).Error; err != nil {
return nil, err
}
if err := m.db.WithContext(ctx).Model(&model.BuildJob{}).Count(&overview.BuildCount).Error; err != nil {
return nil, err
}
if err := m.db.WithContext(ctx).Model(&model.Deployment{}).Where("status = ?", model.DeploymentQueued).Count(&overview.QueuedCount).Error; err != nil {
return nil, err
}
if err := m.db.WithContext(ctx).Model(&model.Deployment{}).Where("status = ?", model.DeploymentRunning).Count(&overview.RunningCount).Error; err != nil {
return nil, err
}
if err := m.db.WithContext(ctx).Model(&model.Deployment{}).Where("status = ?", model.DeploymentFailed).Count(&overview.FailedCount).Error; err != nil {
return nil, err
}
if err := m.db.WithContext(ctx).Model(&model.Deployment{}).Where("status = ?", model.DeploymentSuccess).Count(&overview.SuccessfulCount).Error; err != nil {
return nil, err
}
if err := m.db.WithContext(ctx).Model(&model.BuildJob{}).Where("status = ?", model.BuildQueued).Count(&overview.BuildQueuedCount).Error; err != nil {
return nil, err
}
if err := m.db.WithContext(ctx).Model(&model.BuildJob{}).Where("status = ?", model.BuildRunning).Count(&overview.BuildRunningCount).Error; err != nil {
return nil, err
}
if err := m.db.WithContext(ctx).Model(&model.BuildJob{}).Where("status = ?", model.BuildFailed).Count(&overview.BuildFailedCount).Error; err != nil {
return nil, err
}
if err := m.db.WithContext(ctx).Order("name asc").Find(&overview.Hosts).Error; err != nil {
return nil, err
}
if err := m.db.WithContext(ctx).Preload("Host").Order("service_name asc, host_id asc").Find(&overview.ServiceInstances).Error; err != nil {
return nil, err
}
if err := m.db.WithContext(ctx).Preload("Targets").Order("created_at desc").Limit(10).Find(&overview.RecentDeployments).Error; err != nil {
return nil, err
}
if err := m.db.WithContext(ctx).Order("created_at desc").Limit(10).Find(&overview.RecentBuilds).Error; err != nil {
return nil, err
}
return overview, nil
}
func (m *Manager) ListServices(_ context.Context) []CatalogService {
services := make([]CatalogService, 0, len(m.cfg.Services))
for name, serviceCfg := range m.cfg.Services {
services = append(services, CatalogService{
Name: name,
Repo: serviceCfg.Repo,
DefaultBranch: firstNonEmpty(serviceCfg.Build.DefaultBranch, "main"),
PackageName: serviceCfg.PackageName,
})
}
sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name })
return services
}
func (m *Manager) ListHosts(ctx context.Context) ([]model.Host, error) {
var hosts []model.Host
err := m.db.WithContext(ctx).Order("id asc").Find(&hosts).Error
return hosts, err
}
func (m *Manager) SaveHost(ctx context.Context, host *model.Host) error {
return m.db.WithContext(ctx).Save(host).Error
}
func (m *Manager) DeleteHost(ctx context.Context, id uint) error {
return m.db.WithContext(ctx).Delete(&model.Host{}, id).Error
}
func (m *Manager) ListInstances(ctx context.Context) ([]model.ServiceInstance, error) {
var instances []model.ServiceInstance
err := m.db.WithContext(ctx).Preload("Host").Order("service_name asc, host_id asc").Find(&instances).Error
return instances, err
}
func (m *Manager) SaveInstance(ctx context.Context, instance *model.ServiceInstance) error {
return m.db.WithContext(ctx).Save(instance).Error
}
func (m *Manager) DeleteInstance(ctx context.Context, id uint) error {
return m.db.WithContext(ctx).Delete(&model.ServiceInstance{}, id).Error
}
func (m *Manager) ListReleases(ctx context.Context, serviceName string) ([]model.Release, error) {
var releases []model.Release
query := m.db.WithContext(ctx).Order("created_at desc")
if serviceName != "" {
query = query.Where("service_name = ?", serviceName)
}
err := query.Find(&releases).Error
return releases, err
}
func (m *Manager) SaveRelease(ctx context.Context, release *model.Release) error {
return m.db.WithContext(ctx).Save(release).Error
}
func (m *Manager) ListBuilds(ctx context.Context) ([]model.BuildJob, error) {
var builds []model.BuildJob
err := m.db.WithContext(ctx).Order("created_at desc").Find(&builds).Error
return builds, err
}
func (m *Manager) GetBuild(ctx context.Context, buildID uint) (*model.BuildJob, error) {
var build model.BuildJob
err := m.db.WithContext(ctx).First(&build, buildID).Error
if err != nil {
return nil, err
}
return &build, nil
}
func (m *Manager) ListReleaseRuns(ctx context.Context) ([]model.ReleaseRun, error) {
var runs []model.ReleaseRun
err := m.db.WithContext(ctx).
Preload("Steps", func(db *gorm.DB) *gorm.DB {
return db.Order("sequence asc")
}).
Preload("Steps.Deployment").
Preload("Steps.RollbackDeployment").
Order("created_at desc").
Find(&runs).Error
return runs, err
}
func (m *Manager) GetReleaseRun(ctx context.Context, runID uint) (*model.ReleaseRun, error) {
var run model.ReleaseRun
err := m.db.WithContext(ctx).
Preload("Steps", func(db *gorm.DB) *gorm.DB {
return db.Order("sequence asc")
}).
Preload("Steps.Deployment").
Preload("Steps.Deployment.Targets", func(db *gorm.DB) *gorm.DB {
return db.Order("id asc")
}).
Preload("Steps.Deployment.Targets.Host").
Preload("Steps.Deployment.Targets.ServiceInstance").
Preload("Steps.RollbackDeployment").
Preload("Steps.RollbackDeployment.Targets", func(db *gorm.DB) *gorm.DB {
return db.Order("id asc")
}).
Preload("Steps.RollbackDeployment.Targets.Host").
Preload("Steps.RollbackDeployment.Targets.ServiceInstance").
First(&run, runID).Error
if err != nil {
return nil, err
}
return &run, nil
}
func (m *Manager) ListDeployments(ctx context.Context) ([]model.Deployment, error) {
var deployments []model.Deployment
err := m.db.WithContext(ctx).Preload("Targets").Order("created_at desc").Find(&deployments).Error
return deployments, err
}
func (m *Manager) GetDeployment(ctx context.Context, deploymentID uint) (*model.Deployment, error) {
var deployment model.Deployment
err := m.db.WithContext(ctx).
Preload("Targets.Host").
Preload("Targets.ServiceInstance").
First(&deployment, deploymentID).Error
if err != nil {
return nil, err
}
return &deployment, nil
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
if value != "" {
return value
}
}
return ""
}
func (m *Manager) releaseOrder() []string {
order := make([]string, 0, len(m.cfg.Release.Order))
for _, serviceName := range m.cfg.Release.Order {
if _, ok := m.cfg.Services[serviceName]; ok {
order = append(order, serviceName)
}
}
return order
}
func trimLog(raw string) string {
const maxLen = 12000
if len(raw) <= maxLen {
return raw
}
return raw[len(raw)-maxLen:]
}
func summarizeBuildError(err error, output string) string {
lines := strings.Split(output, "\n")
for i := len(lines) - 1; i >= 0; i-- {
line := strings.TrimSpace(lines[i])
if line == "" || strings.HasPrefix(line, "BUILD_RESULT_JSON=") {
continue
}
if line != err.Error() {
return line
}
}
return err.Error()
}
func summarizeDeploymentError(err error, output string) string {
lines := strings.Split(output, "\n")
for i := len(lines) - 1; i >= 0; i-- {
line := strings.TrimSpace(lines[i])
if line == "" || line == err.Error() {
continue
}
if line == "Traceback (most recent call last):" || strings.HasPrefix(line, "File ") {
continue
}
if strings.HasPrefix(line, "RuntimeError:") {
line = strings.TrimSpace(strings.TrimPrefix(line, "RuntimeError:"))
}
if line != "" {
return line
}
}
return err.Error()
}
func deploymentLogExcerpt(deployment *model.Deployment) string {
if deployment == nil {
return ""
}
var chunks []string
for _, target := range deployment.Targets {
if target.LogExcerpt == "" {
continue
}
chunks = append(chunks, fmt.Sprintf("[%s] %s", target.Host.Name, target.LogExcerpt))
}
return trimLog(strings.Join(chunks, "\n"))
}
func (m *Manager) resolveServiceBaselineRelease(ctx context.Context, serviceName string) (string, error) {
var instances []model.ServiceInstance
if err := m.db.WithContext(ctx).
Where("service_name = ?", serviceName).
Order("host_id asc").
Find(&instances).Error; err != nil {
return "", err
}
if len(instances) == 0 {
return "", fmt.Errorf("no service instances found for %s", serviceName)
}
distinct := make([]string, 0, 2)
seen := map[string]struct{}{}
for _, instance := range instances {
releaseID := strings.TrimSpace(instance.CurrentReleaseID)
if _, ok := seen[releaseID]; ok {
continue
}
seen[releaseID] = struct{}{}
distinct = append(distinct, releaseID)
}
if len(distinct) > 1 {
return "", fmt.Errorf("service %s instances have inconsistent current_release_id: %s", serviceName, strings.Join(distinct, ", "))
}
return distinct[0], nil
}
func (m *Manager) rollbackCompletedReleaseRunSteps(
ctx context.Context,
runID uint,
currentReleaseID string,
operator string,
completedSteps []model.ReleaseRunStep,
) string {
if len(completedSteps) == 0 {
return ""
}
messages := make([]string, 0, len(completedSteps))
for index := len(completedSteps) - 1; index >= 0; index-- {
step := completedSteps[index]
message := m.rollbackReleaseRunStep(ctx, runID, currentReleaseID, operator, step)
if message != "" {
messages = append(messages, message)
}
}
if len(messages) == 0 {
return ""
}
return strings.Join(messages, "\n")
}
func (m *Manager) rollbackReleaseRunStep(
ctx context.Context,
runID uint,
currentReleaseID string,
operator string,
step model.ReleaseRunStep,
) string {
now := time.Now()
if strings.TrimSpace(step.PreviousReleaseID) == "" {
_ = m.db.WithContext(ctx).Model(&model.ReleaseRunStep{}).
Where("id = ?", step.ID).
Updates(map[string]any{
"rollback_status": model.StatusSkipped,
"rollback_error_message": "no previous release id recorded, rollback skipped",
"rollback_started_at": &now,
"rollback_finished_at": &now,
}).Error
return fmt.Sprintf("rollback skipped for %s: no previous release id recorded", step.ServiceName)
}
if step.PreviousReleaseID == currentReleaseID {
_ = m.db.WithContext(ctx).Model(&model.ReleaseRunStep{}).
Where("id = ?", step.ID).
Updates(map[string]any{
"rollback_status": model.StatusSkipped,
"rollback_error_message": "previous release matches current release, rollback skipped",
"rollback_started_at": &now,
"rollback_finished_at": &now,
}).Error
return fmt.Sprintf("rollback skipped for %s: previous release matches current release", step.ServiceName)
}
if err := m.db.WithContext(ctx).Model(&model.ReleaseRunStep{}).
Where("id = ?", step.ID).
Updates(map[string]any{
"rollback_status": model.DeploymentRunning,
"rollback_started_at": &now,
}).Error; err != nil {
return fmt.Sprintf("rollback start update failed for %s: %v", step.ServiceName, err)
}
deployment, err := m.createDeploymentRecord(ctx, CreateDeploymentRequest{
ServiceName: step.ServiceName,
ReleaseID: step.PreviousReleaseID,
Operation: model.OperationRollback,
Operator: operator,
}, "release_run_rollback", false)
if err != nil {
finishedAt := time.Now()
_ = m.db.WithContext(ctx).Model(&model.ReleaseRunStep{}).
Where("id = ?", step.ID).
Updates(map[string]any{
"rollback_status": model.DeploymentFailed,
"rollback_error_message": err.Error(),
"rollback_finished_at": &finishedAt,
}).Error
return fmt.Sprintf("rollback create failed for %s -> %s: %v", step.ServiceName, step.PreviousReleaseID, err)
}
if err := m.db.WithContext(ctx).Model(&model.ReleaseRunStep{}).
Where("id = ?", step.ID).
Update("rollback_deployment_id", deployment.ID).Error; err != nil {
return fmt.Sprintf("rollback deployment link failed for %s: %v", step.ServiceName, err)
}
if err := m.ProcessDeployment(ctx, deployment.ID); err != nil {
finishedAt := time.Now()
latestDeployment, getErr := m.GetDeployment(ctx, deployment.ID)
logExcerpt := ""
if getErr == nil {
logExcerpt = deploymentLogExcerpt(latestDeployment)
}
_ = m.db.WithContext(ctx).Model(&model.ReleaseRunStep{}).
Where("id = ?", step.ID).
Updates(map[string]any{
"rollback_status": model.DeploymentFailed,
"rollback_error_message": err.Error(),
"rollback_log_excerpt": trimLog(logExcerpt),
"rollback_finished_at": &finishedAt,
}).Error
return fmt.Sprintf("rollback failed for %s -> %s: %v", step.ServiceName, step.PreviousReleaseID, err)
}
latestDeployment, getErr := m.GetDeployment(ctx, deployment.ID)
logExcerpt := ""
if getErr == nil {
logExcerpt = deploymentLogExcerpt(latestDeployment)
}
finishedAt := time.Now()
_ = m.db.WithContext(ctx).Model(&model.ReleaseRunStep{}).
Where("id = ?", step.ID).
Updates(map[string]any{
"rollback_status": model.DeploymentSuccess,
"rollback_log_excerpt": trimLog(logExcerpt),
"rollback_finished_at": &finishedAt,
}).Error
return fmt.Sprintf("rollback success for %s -> %s", step.ServiceName, step.PreviousReleaseID)
}