345 lines
11 KiB
Go
345 lines
11 KiB
Go
package sqs
|
||
|
||
import (
|
||
"bygdata/global"
|
||
"bygdata/model/awssqs"
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"github.com/aws/aws-sdk-go/aws"
|
||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||
"github.com/aws/aws-sdk-go/aws/session"
|
||
"github.com/aws/aws-sdk-go/service/sqs"
|
||
"go.uber.org/zap"
|
||
"os"
|
||
"os/signal"
|
||
"strings"
|
||
"sync"
|
||
"syscall"
|
||
"time"
|
||
)
|
||
|
||
func ProcessSqsMessage() {
|
||
sqsUrl := global.GVA_CONFIG.AWS.AwsSqsUrl
|
||
global.GVA_LOG.Info("[SQS初始化] AwsSqsUrl: %s", zap.String("AwsSqsUrl", sqsUrl), zap.String("AwsSqsAccessKey", global.GVA_CONFIG.AWS.AwsSqsAccessKey))
|
||
if sqsUrl == "" {
|
||
global.GVA_LOG.Warn("Not found sqs url config. cannot start aws sqs monitor process.")
|
||
return
|
||
}
|
||
|
||
var err error
|
||
// 创建 AWS 会话
|
||
sess, err := NewAwsSqsSession()
|
||
if err != nil {
|
||
global.GVA_LOG.Warn("Failed to create AWS session:%s", zap.Error(err))
|
||
os.Exit(1)
|
||
}
|
||
|
||
// 创建 SQS 服务客户端
|
||
svc := sqs.New(sess)
|
||
global.GVA_LOG.Info("Aws sqs monitor process start.")
|
||
|
||
// 创建 context 用于优雅退出
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
defer cancel()
|
||
|
||
// 创建一个通道来接收中断信号
|
||
sigCh := make(chan os.Signal, 1)
|
||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||
|
||
// 创建有缓冲的消息通道,避免阻塞
|
||
// 缓冲区大小设置为 100,可根据实际情况调整
|
||
msgsCh := make(chan *sqs.Message, 100)
|
||
|
||
// 创建 WaitGroup 以等待所有处理 goroutine 完成
|
||
var wg sync.WaitGroup
|
||
|
||
// 限制并发处理的 goroutine 数量,避免资源耗尽
|
||
// 使用带缓冲的 channel 作为 semaphore
|
||
maxWorkers := 50 // 最大并发处理数,可根据实际情况调整
|
||
semaphore := make(chan struct{}, maxWorkers)
|
||
|
||
// 启动一个 goroutine 来接收消息
|
||
wg.Add(1)
|
||
go receiveMessages(ctx, svc, sqsUrl, msgsCh, &wg)
|
||
|
||
// 启动一个 goroutine 来处理消息
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
// context 被取消,退出消息处理循环
|
||
return
|
||
case msg, ok := <-msgsCh:
|
||
if !ok {
|
||
// 通道已关闭
|
||
return
|
||
}
|
||
// 获取 semaphore,限制并发数
|
||
semaphore <- struct{}{}
|
||
wg.Add(1)
|
||
go func(m *sqs.Message) {
|
||
defer func() {
|
||
<-semaphore // 释放 semaphore
|
||
wg.Done()
|
||
}()
|
||
processMessage(svc, sqsUrl, m)
|
||
}(msg)
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 等待中断信号
|
||
<-sigCh
|
||
global.GVA_LOG.Info("Received interrupt signal. Shutting down gracefully...")
|
||
|
||
// 取消 context,通知所有 goroutine 退出
|
||
cancel()
|
||
|
||
// 关闭消息通道,停止接收新消息
|
||
close(msgsCh)
|
||
|
||
// 等待所有 goroutine 完成
|
||
global.GVA_LOG.Info("Waiting for all goroutines to finish...")
|
||
wg.Wait()
|
||
global.GVA_LOG.Info("All goroutines finished. Exiting...")
|
||
}
|
||
|
||
// 接收 SQS 消息并将其发送到通道
|
||
func receiveMessages(ctx context.Context, svc *sqs.SQS, queueURL string, msgsCh chan<- *sqs.Message, wg *sync.WaitGroup) {
|
||
defer wg.Done() // 减少 WaitGroup 的计数
|
||
|
||
for {
|
||
// 检查 context 是否已取消
|
||
select {
|
||
case <-ctx.Done():
|
||
global.GVA_LOG.Info("receiveMessages: context cancelled, exiting...")
|
||
return
|
||
default:
|
||
}
|
||
|
||
// 接收消息
|
||
// VisibilityTimeout: 消息可见性超时(秒),在这期间消息对其他消费者不可见
|
||
// 如果在这期间没有删除消息,消息会自动重新变为可见,实现重复消费
|
||
// MessageAttributeNames: 获取消息属性,包括 ApproximateReceiveCount(接收次数)
|
||
result, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
|
||
QueueUrl: aws.String(queueURL),
|
||
MaxNumberOfMessages: aws.Int64(10), // 每次最多接收 10 条消息
|
||
WaitTimeSeconds: aws.Int64(20), // 等待时间最多为 20 秒
|
||
VisibilityTimeout: aws.Int64(300), // 可见性超时 300 秒(5分钟),可根据实际处理时间调整
|
||
})
|
||
if err != nil {
|
||
global.GVA_LOG.Error("Error receiving messages", zap.Error(err))
|
||
// 如果 context 已取消,直接退出
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
default:
|
||
// 等待一段时间后重试,避免快速重试导致资源浪费
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-time.After(time.Second * 5):
|
||
continue
|
||
}
|
||
}
|
||
}
|
||
|
||
// 将收到的消息发送到通道
|
||
for _, msg := range result.Messages {
|
||
select {
|
||
case <-ctx.Done():
|
||
// context 已取消,不再发送新消息
|
||
return
|
||
case msgsCh <- msg:
|
||
// 消息已成功发送
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 处理消息
|
||
func processMessage(svc *sqs.SQS, queueURL string, msg *sqs.Message) {
|
||
msgId := *msg.MessageId
|
||
global.GVA_LOG.Info("Received message:msgId:%s, body:%s", zap.String("msgId", msgId), zap.String("body", *msg.Body))
|
||
|
||
// 获取消息的重试次数(接收次数)
|
||
receiveCount := getReceiveCount(msg)
|
||
maxRetries := int64(3) // 最大重试次数,可根据实际情况调整
|
||
|
||
// 如果超过最大重试次数,记录错误并删除消息
|
||
if receiveCount > maxRetries {
|
||
global.GVA_LOG.Error("Message exceeded max retries, deleting message",
|
||
zap.String("msgId", msgId),
|
||
zap.Int64("receiveCount", receiveCount),
|
||
zap.Int64("maxRetries", maxRetries),
|
||
zap.String("body", *msg.Body))
|
||
deleteMessage(svc, queueURL, msg)
|
||
return
|
||
}
|
||
|
||
// 如果这是重试,记录日志
|
||
if receiveCount > 1 {
|
||
global.GVA_LOG.Info("Retrying message",
|
||
zap.String("msgId", msgId),
|
||
zap.Int64("retryCount", receiveCount-1))
|
||
}
|
||
|
||
// 如果处理时间可能较长,延长可见性超时
|
||
// 这里可以根据实际处理逻辑动态调整
|
||
extendVisibilityTimeout(svc, queueURL, msg, 600) // 延长到 10 分钟
|
||
|
||
// 在这里处理接收到的消息
|
||
var sqsMessage awssqs.SqsMessage
|
||
err := json.Unmarshal([]byte(*msg.Body), &sqsMessage)
|
||
if err != nil {
|
||
global.GVA_LOG.Warn("Sqs message unmarshal error, will retry",
|
||
zap.Error(err),
|
||
zap.String("msgId", msgId),
|
||
zap.Int64("receiveCount", receiveCount))
|
||
// 解析失败时不删除消息,让它自动重新变为可见,实现重复消费
|
||
// 通过将可见性超时设为 0,立即让消息重新变为可见
|
||
changeMessageVisibility(svc, queueURL, msg, 0)
|
||
return
|
||
}
|
||
|
||
// 处理不同类型的消息
|
||
var processErr error
|
||
|
||
global.GVA_LOG.Info("Processing create user action", zap.String("msgId", msgId),
|
||
zap.Int32("msg_action", int32(sqsMessage.Action)), zap.String("msg_content", sqsMessage.Content))
|
||
|
||
|
||
if sqsMessage.Action == awssqs.SqsActionCreateUser {
|
||
// TODO: 实现创建用户的逻辑
|
||
global.GVA_LOG.Info("Processing create user action", zap.String("msgId", msgId))
|
||
// 这里应该调用实际的处理逻辑
|
||
// processErr = handleCreateUser(sqsMessage)
|
||
} else {
|
||
global.GVA_LOG.Warn("Unknown action",
|
||
zap.String("action", string(sqsMessage.Action)),
|
||
zap.String("msgId", msgId))
|
||
// 未知操作,记录错误但不重试(直接删除)
|
||
deleteMessage(svc, queueURL, msg)
|
||
return
|
||
}
|
||
|
||
// 根据处理结果决定是否删除消息
|
||
if processErr != nil {
|
||
global.GVA_LOG.Warn("Message processing failed, will retry",
|
||
zap.Error(processErr),
|
||
zap.String("msgId", msgId),
|
||
zap.Int64("receiveCount", receiveCount))
|
||
// 处理失败时不删除消息,让它自动重新变为可见,实现重复消费
|
||
// 通过将可见性超时设为 0,立即让消息重新变为可见
|
||
changeMessageVisibility(svc, queueURL, msg, 0)
|
||
} else {
|
||
// 处理成功,删除消息
|
||
global.GVA_LOG.Info("Message processed successfully, deleting message", zap.String("msgId", msgId))
|
||
deleteMessage(svc, queueURL, msg)
|
||
}
|
||
}
|
||
|
||
// 获取消息的接收次数(重试次数)
|
||
func getReceiveCount(msg *sqs.Message) int64 {
|
||
if msg.Attributes == nil {
|
||
return 1
|
||
}
|
||
if countStr, ok := msg.Attributes["ApproximateReceiveCount"]; ok && countStr != nil {
|
||
// 尝试解析为整数
|
||
var count int64
|
||
if _, err := fmt.Sscanf(*countStr, "%d", &count); err == nil {
|
||
return count
|
||
}
|
||
}
|
||
return 1
|
||
}
|
||
|
||
// 延长消息的可见性超时(用于处理时间较长的消息)
|
||
func extendVisibilityTimeout(svc *sqs.SQS, queueURL string, msg *sqs.Message, timeoutSeconds int64) {
|
||
_, err := svc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
|
||
QueueUrl: aws.String(queueURL),
|
||
ReceiptHandle: msg.ReceiptHandle,
|
||
VisibilityTimeout: aws.Int64(timeoutSeconds),
|
||
})
|
||
if err != nil {
|
||
global.GVA_LOG.Warn("Failed to extend message visibility timeout",
|
||
zap.Error(err),
|
||
zap.String("msgId", *msg.MessageId))
|
||
} else {
|
||
global.GVA_LOG.Debug("Extended message visibility timeout",
|
||
zap.String("msgId", *msg.MessageId),
|
||
zap.Int64("timeoutSeconds", timeoutSeconds))
|
||
}
|
||
}
|
||
|
||
// 改变消息的可见性(用于立即让消息重新变为可见,实现重复消费)
|
||
// timeoutSeconds 为 0 时,消息立即重新变为可见
|
||
func changeMessageVisibility(svc *sqs.SQS, queueURL string, msg *sqs.Message, timeoutSeconds int64) {
|
||
_, err := svc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
|
||
QueueUrl: aws.String(queueURL),
|
||
ReceiptHandle: msg.ReceiptHandle,
|
||
VisibilityTimeout: aws.Int64(timeoutSeconds),
|
||
})
|
||
if err != nil {
|
||
global.GVA_LOG.Error("Failed to change message visibility",
|
||
zap.Error(err),
|
||
zap.String("msgId", *msg.MessageId))
|
||
} else {
|
||
if timeoutSeconds == 0 {
|
||
global.GVA_LOG.Info("Message visibility set to 0, message will be immediately available for retry",
|
||
zap.String("msgId", *msg.MessageId))
|
||
} else {
|
||
global.GVA_LOG.Debug("Changed message visibility",
|
||
zap.String("msgId", *msg.MessageId),
|
||
zap.Int64("timeoutSeconds", timeoutSeconds))
|
||
}
|
||
}
|
||
}
|
||
|
||
// 删除已处理的消息
|
||
func deleteMessage(svc *sqs.SQS, queueURL string, msg *sqs.Message) {
|
||
_, err := svc.DeleteMessage(&sqs.DeleteMessageInput{
|
||
QueueUrl: aws.String(queueURL),
|
||
ReceiptHandle: msg.ReceiptHandle,
|
||
})
|
||
if err != nil {
|
||
global.GVA_LOG.Error("Failed to delete message", zap.Error(err), zap.String("msgId", *msg.MessageId))
|
||
} else {
|
||
global.GVA_LOG.Debug("Message deleted successfully", zap.String("msgId", *msg.MessageId))
|
||
}
|
||
}
|
||
|
||
func NewAwsSqsSession() (*session.Session, error) {
|
||
// 从 SQS URL 中提取区域,如果 SQS URL 存在
|
||
sqsRegion := global.GVA_CONFIG.AWS.SqsRegion // 默认使用 S3Region
|
||
sqsUrl := global.GVA_CONFIG.AWS.AwsSqsUrl
|
||
if sqsUrl != "" {
|
||
// 从 URL 中提取区域,例如: https://sqs.sa-east-1.amazonaws.com/... -> sa-east-1
|
||
// 格式: https://sqs.{region}.amazonaws.com/...
|
||
if strings.HasPrefix(sqsUrl, "https://sqs.") {
|
||
parts := strings.Split(sqsUrl, ".")
|
||
if len(parts) >= 2 {
|
||
// parts[1] 应该是区域名,例如 "sa-east-1"
|
||
sqsRegion = parts[1]
|
||
}
|
||
}
|
||
}
|
||
|
||
// 获取凭证(优先从环境变量读取,否则从配置文件读取)
|
||
accessKey := global.GVA_CONFIG.AWS.AwsSqsAccessKey
|
||
secretKey := global.GVA_CONFIG.AWS.AwsSqsSecretKey
|
||
|
||
if accessKey == "" || secretKey == "" {
|
||
return nil, fmt.Errorf("AWS SQS credentials not configured. Please set AWS_SQS_ACCESS_KEY and AWS_SQS_SECRET_KEY environment variables or configure in config file")
|
||
}
|
||
|
||
// 创建一个AWS会话,使用提供的凭证
|
||
return session.NewSession(&aws.Config{
|
||
Region: aws.String(sqsRegion),
|
||
Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""),
|
||
})
|
||
}
|