87 lines
2.0 KiB
Go
87 lines
2.0 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
|
|
rocketmq "github.com/apache/rocketmq-client-go/v2"
|
|
"github.com/apache/rocketmq-client-go/v2/consumer"
|
|
"github.com/apache/rocketmq-client-go/v2/primitive"
|
|
"github.com/apache/rocketmq-client-go/v2/producer"
|
|
|
|
"app-deploy-platform/backend/internal/config"
|
|
)
|
|
|
|
type RocketMQQueue struct {
|
|
topic string
|
|
producer rocketmq.Producer
|
|
consumer rocketmq.PushConsumer
|
|
}
|
|
|
|
func NewRocketMQQueue(cfg config.Config) (*RocketMQQueue, error) {
|
|
prod, err := rocketmq.NewProducer(
|
|
producer.WithNameServer([]string{cfg.Queue.RocketMQNameserver}),
|
|
producer.WithRetry(2),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := prod.Start(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pushConsumer, err := rocketmq.NewPushConsumer(
|
|
consumer.WithNameServer([]string{cfg.Queue.RocketMQNameserver}),
|
|
consumer.WithGroupName(cfg.Queue.Group),
|
|
)
|
|
if err != nil {
|
|
_ = prod.Shutdown()
|
|
return nil, err
|
|
}
|
|
|
|
return &RocketMQQueue{
|
|
topic: cfg.Queue.Topic,
|
|
producer: prod,
|
|
consumer: pushConsumer,
|
|
}, nil
|
|
}
|
|
|
|
func (q *RocketMQQueue) Publish(ctx context.Context, job Job) error {
|
|
payload, err := json.Marshal(job)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = q.producer.SendSync(ctx, primitive.NewMessage(q.topic, payload))
|
|
return err
|
|
}
|
|
|
|
func (q *RocketMQQueue) StartConsumer(_ context.Context, handler Handler) error {
|
|
if err := q.consumer.Subscribe(q.topic, consumer.MessageSelector{}, func(ctx context.Context, messages ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
|
|
for _, message := range messages {
|
|
var job Job
|
|
if err := json.Unmarshal(message.Body, &job); err != nil {
|
|
return consumer.ConsumeRetryLater, err
|
|
}
|
|
if err := handler(ctx, job); err != nil {
|
|
return consumer.ConsumeRetryLater, err
|
|
}
|
|
}
|
|
return consumer.ConsumeSuccess, nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return q.consumer.Start()
|
|
}
|
|
|
|
func (q *RocketMQQueue) Close() error {
|
|
if q.consumer != nil {
|
|
_ = q.consumer.Shutdown()
|
|
}
|
|
if q.producer != nil {
|
|
_ = q.producer.Shutdown()
|
|
}
|
|
return nil
|
|
}
|