package queue import ( "context" "encoding/json" rocketmq "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "app-deploy-platform/backend/internal/config" ) type RocketMQQueue struct { topic string producer rocketmq.Producer consumer rocketmq.PushConsumer } func NewRocketMQQueue(cfg config.Config) (*RocketMQQueue, error) { prod, err := rocketmq.NewProducer( producer.WithNameServer([]string{cfg.Queue.RocketMQNameserver}), producer.WithRetry(2), ) if err != nil { return nil, err } if err := prod.Start(); err != nil { return nil, err } pushConsumer, err := rocketmq.NewPushConsumer( consumer.WithNameServer([]string{cfg.Queue.RocketMQNameserver}), consumer.WithGroupName(cfg.Queue.Group), ) if err != nil { _ = prod.Shutdown() return nil, err } return &RocketMQQueue{ topic: cfg.Queue.Topic, producer: prod, consumer: pushConsumer, }, nil } func (q *RocketMQQueue) Publish(ctx context.Context, job Job) error { payload, err := json.Marshal(job) if err != nil { return err } _, err = q.producer.SendSync(ctx, primitive.NewMessage(q.topic, payload)) return err } func (q *RocketMQQueue) StartConsumer(_ context.Context, handler Handler) error { if err := q.consumer.Subscribe(q.topic, consumer.MessageSelector{}, func(ctx context.Context, messages ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for _, message := range messages { var job Job if err := json.Unmarshal(message.Body, &job); err != nil { return consumer.ConsumeRetryLater, err } if err := handler(ctx, job); err != nil { return consumer.ConsumeRetryLater, err } } return consumer.ConsumeSuccess, nil }); err != nil { return err } return q.consumer.Start() } func (q *RocketMQQueue) Close() error { if q.consumer != nil { _ = q.consumer.Shutdown() } if q.producer != nil { _ = q.producer.Shutdown() } return nil }