2026-04-07 11:48:19 +08:00

87 lines
2.0 KiB
Go

package queue
import (
"context"
"encoding/json"
rocketmq "github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"app-deploy-platform/backend/internal/config"
)
type RocketMQQueue struct {
topic string
producer rocketmq.Producer
consumer rocketmq.PushConsumer
}
func NewRocketMQQueue(cfg config.Config) (*RocketMQQueue, error) {
prod, err := rocketmq.NewProducer(
producer.WithNameServer([]string{cfg.Queue.RocketMQNameserver}),
producer.WithRetry(2),
)
if err != nil {
return nil, err
}
if err := prod.Start(); err != nil {
return nil, err
}
pushConsumer, err := rocketmq.NewPushConsumer(
consumer.WithNameServer([]string{cfg.Queue.RocketMQNameserver}),
consumer.WithGroupName(cfg.Queue.Group),
)
if err != nil {
_ = prod.Shutdown()
return nil, err
}
return &RocketMQQueue{
topic: cfg.Queue.Topic,
producer: prod,
consumer: pushConsumer,
}, nil
}
func (q *RocketMQQueue) Publish(ctx context.Context, job Job) error {
payload, err := json.Marshal(job)
if err != nil {
return err
}
_, err = q.producer.SendSync(ctx, primitive.NewMessage(q.topic, payload))
return err
}
func (q *RocketMQQueue) StartConsumer(_ context.Context, handler Handler) error {
if err := q.consumer.Subscribe(q.topic, consumer.MessageSelector{}, func(ctx context.Context, messages ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, message := range messages {
var job Job
if err := json.Unmarshal(message.Body, &job); err != nil {
return consumer.ConsumeRetryLater, err
}
if err := handler(ctx, job); err != nil {
return consumer.ConsumeRetryLater, err
}
}
return consumer.ConsumeSuccess, nil
}); err != nil {
return err
}
return q.consumer.Start()
}
func (q *RocketMQQueue) Close() error {
if q.consumer != nil {
_ = q.consumer.Shutdown()
}
if q.producer != nil {
_ = q.producer.Shutdown()
}
return nil
}