bygdata/api/v1/sqs/aws-sqs.go
2026-02-12 00:16:08 +00:00

340 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package sqs
import (
"bygdata/global"
"bygdata/model/awssqs"
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"go.uber.org/zap"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
)
func ProcessSqsMessage() {
sqsUrl := global.GVA_CONFIG.AWS.AwsSqsUrl
global.GVA_LOG.Info("[SQS初始化] AwsSqsUrl: %s", zap.String("AwsSqsUrl", sqsUrl), zap.String("AwsSqsAccessKey", global.GVA_CONFIG.AWS.AwsSqsAccessKey))
if sqsUrl == "" {
global.GVA_LOG.Warn("Not found sqs url config. cannot start aws sqs monitor process.")
return
}
var err error
// 创建 AWS 会话
sess, err := NewAwsSqsSession()
if err != nil {
global.GVA_LOG.Warn("Failed to create AWS session:%s", zap.Error(err))
os.Exit(1)
}
// 创建 SQS 服务客户端
svc := sqs.New(sess)
global.GVA_LOG.Info("Aws sqs monitor process start.")
// 创建 context 用于优雅退出
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 创建一个通道来接收中断信号
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// 创建有缓冲的消息通道,避免阻塞
// 缓冲区大小设置为 100可根据实际情况调整
msgsCh := make(chan *sqs.Message, 100)
// 创建 WaitGroup 以等待所有处理 goroutine 完成
var wg sync.WaitGroup
// 限制并发处理的 goroutine 数量,避免资源耗尽
// 使用带缓冲的 channel 作为 semaphore
maxWorkers := 50 // 最大并发处理数,可根据实际情况调整
semaphore := make(chan struct{}, maxWorkers)
// 启动一个 goroutine 来接收消息
wg.Add(1)
go receiveMessages(ctx, svc, sqsUrl, msgsCh, &wg)
// 启动一个 goroutine 来处理消息
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
// context 被取消,退出消息处理循环
return
case msg, ok := <-msgsCh:
if !ok {
// 通道已关闭
return
}
// 获取 semaphore限制并发数
semaphore <- struct{}{}
wg.Add(1)
go func(m *sqs.Message) {
defer func() {
<-semaphore // 释放 semaphore
wg.Done()
}()
processMessage(svc, sqsUrl, m)
}(msg)
}
}
}()
// 等待中断信号
<-sigCh
global.GVA_LOG.Info("Received interrupt signal. Shutting down gracefully...")
// 取消 context通知所有 goroutine 退出
cancel()
// 关闭消息通道,停止接收新消息
close(msgsCh)
// 等待所有 goroutine 完成
global.GVA_LOG.Info("Waiting for all goroutines to finish...")
wg.Wait()
global.GVA_LOG.Info("All goroutines finished. Exiting...")
}
// 接收 SQS 消息并将其发送到通道
func receiveMessages(ctx context.Context, svc *sqs.SQS, queueURL string, msgsCh chan<- *sqs.Message, wg *sync.WaitGroup) {
defer wg.Done() // 减少 WaitGroup 的计数
for {
// 检查 context 是否已取消
select {
case <-ctx.Done():
global.GVA_LOG.Info("receiveMessages: context cancelled, exiting...")
return
default:
}
// 接收消息
// VisibilityTimeout: 消息可见性超时(秒),在这期间消息对其他消费者不可见
// 如果在这期间没有删除消息,消息会自动重新变为可见,实现重复消费
// MessageAttributeNames: 获取消息属性,包括 ApproximateReceiveCount接收次数
result, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: aws.Int64(10), // 每次最多接收 10 条消息
WaitTimeSeconds: aws.Int64(20), // 等待时间最多为 20 秒
VisibilityTimeout: aws.Int64(300), // 可见性超时 300 秒5分钟可根据实际处理时间调整
})
if err != nil {
global.GVA_LOG.Error("Error receiving messages", zap.Error(err))
// 如果 context 已取消,直接退出
select {
case <-ctx.Done():
return
default:
// 等待一段时间后重试,避免快速重试导致资源浪费
select {
case <-ctx.Done():
return
case <-time.After(time.Second * 5):
continue
}
}
}
// 将收到的消息发送到通道
for _, msg := range result.Messages {
select {
case <-ctx.Done():
// context 已取消,不再发送新消息
return
case msgsCh <- msg:
// 消息已成功发送
}
}
}
}
// 处理消息
func processMessage(svc *sqs.SQS, queueURL string, msg *sqs.Message) {
msgId := *msg.MessageId
global.GVA_LOG.Info("Received message:msgId:%s, body:%s", zap.String("msgId", msgId), zap.String("body", *msg.Body))
// 获取消息的重试次数(接收次数)
receiveCount := getReceiveCount(msg)
maxRetries := int64(3) // 最大重试次数,可根据实际情况调整
// 如果超过最大重试次数,记录错误并删除消息
if receiveCount > maxRetries {
global.GVA_LOG.Error("Message exceeded max retries, deleting message",
zap.String("msgId", msgId),
zap.Int64("receiveCount", receiveCount),
zap.Int64("maxRetries", maxRetries),
zap.String("body", *msg.Body))
deleteMessage(svc, queueURL, msg)
return
}
// 如果这是重试,记录日志
if receiveCount > 1 {
global.GVA_LOG.Info("Retrying message",
zap.String("msgId", msgId),
zap.Int64("retryCount", receiveCount-1))
}
// 如果处理时间可能较长,延长可见性超时
// 这里可以根据实际处理逻辑动态调整
extendVisibilityTimeout(svc, queueURL, msg, 600) // 延长到 10 分钟
// 在这里处理接收到的消息
var sqsMessage awssqs.SqsMessage
err := json.Unmarshal([]byte(*msg.Body), &sqsMessage)
if err != nil {
global.GVA_LOG.Warn("Sqs message unmarshal error, will retry",
zap.Error(err),
zap.String("msgId", msgId),
zap.Int64("receiveCount", receiveCount))
// 解析失败时不删除消息,让它自动重新变为可见,实现重复消费
// 通过将可见性超时设为 0立即让消息重新变为可见
changeMessageVisibility(svc, queueURL, msg, 0)
return
}
// 处理不同类型的消息
var processErr error
if sqsMessage.Action == awssqs.SqsActionCreateUser {
// TODO: 实现创建用户的逻辑
global.GVA_LOG.Info("Processing create user action", zap.String("msgId", msgId))
// 这里应该调用实际的处理逻辑
// processErr = handleCreateUser(sqsMessage)
} else {
global.GVA_LOG.Warn("Unknown action",
zap.String("action", string(sqsMessage.Action)),
zap.String("msgId", msgId))
// 未知操作,记录错误但不重试(直接删除)
deleteMessage(svc, queueURL, msg)
return
}
// 根据处理结果决定是否删除消息
if processErr != nil {
global.GVA_LOG.Warn("Message processing failed, will retry",
zap.Error(processErr),
zap.String("msgId", msgId),
zap.Int64("receiveCount", receiveCount))
// 处理失败时不删除消息,让它自动重新变为可见,实现重复消费
// 通过将可见性超时设为 0立即让消息重新变为可见
changeMessageVisibility(svc, queueURL, msg, 0)
} else {
// 处理成功,删除消息
global.GVA_LOG.Info("Message processed successfully, deleting message", zap.String("msgId", msgId))
deleteMessage(svc, queueURL, msg)
}
}
// 获取消息的接收次数(重试次数)
func getReceiveCount(msg *sqs.Message) int64 {
if msg.Attributes == nil {
return 1
}
if countStr, ok := msg.Attributes["ApproximateReceiveCount"]; ok && countStr != nil {
// 尝试解析为整数
var count int64
if _, err := fmt.Sscanf(*countStr, "%d", &count); err == nil {
return count
}
}
return 1
}
// 延长消息的可见性超时(用于处理时间较长的消息)
func extendVisibilityTimeout(svc *sqs.SQS, queueURL string, msg *sqs.Message, timeoutSeconds int64) {
_, err := svc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: msg.ReceiptHandle,
VisibilityTimeout: aws.Int64(timeoutSeconds),
})
if err != nil {
global.GVA_LOG.Warn("Failed to extend message visibility timeout",
zap.Error(err),
zap.String("msgId", *msg.MessageId))
} else {
global.GVA_LOG.Debug("Extended message visibility timeout",
zap.String("msgId", *msg.MessageId),
zap.Int64("timeoutSeconds", timeoutSeconds))
}
}
// 改变消息的可见性(用于立即让消息重新变为可见,实现重复消费)
// timeoutSeconds 为 0 时,消息立即重新变为可见
func changeMessageVisibility(svc *sqs.SQS, queueURL string, msg *sqs.Message, timeoutSeconds int64) {
_, err := svc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: msg.ReceiptHandle,
VisibilityTimeout: aws.Int64(timeoutSeconds),
})
if err != nil {
global.GVA_LOG.Error("Failed to change message visibility",
zap.Error(err),
zap.String("msgId", *msg.MessageId))
} else {
if timeoutSeconds == 0 {
global.GVA_LOG.Info("Message visibility set to 0, message will be immediately available for retry",
zap.String("msgId", *msg.MessageId))
} else {
global.GVA_LOG.Debug("Changed message visibility",
zap.String("msgId", *msg.MessageId),
zap.Int64("timeoutSeconds", timeoutSeconds))
}
}
}
// 删除已处理的消息
func deleteMessage(svc *sqs.SQS, queueURL string, msg *sqs.Message) {
_, err := svc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: msg.ReceiptHandle,
})
if err != nil {
global.GVA_LOG.Error("Failed to delete message", zap.Error(err), zap.String("msgId", *msg.MessageId))
} else {
global.GVA_LOG.Debug("Message deleted successfully", zap.String("msgId", *msg.MessageId))
}
}
func NewAwsSqsSession() (*session.Session, error) {
// 从 SQS URL 中提取区域,如果 SQS URL 存在
sqsRegion := global.GVA_CONFIG.AWS.SqsRegion // 默认使用 S3Region
sqsUrl := global.GVA_CONFIG.AWS.AwsSqsUrl
if sqsUrl != "" {
// 从 URL 中提取区域,例如: https://sqs.sa-east-1.amazonaws.com/... -> sa-east-1
// 格式: https://sqs.{region}.amazonaws.com/...
if strings.HasPrefix(sqsUrl, "https://sqs.") {
parts := strings.Split(sqsUrl, ".")
if len(parts) >= 2 {
// parts[1] 应该是区域名,例如 "sa-east-1"
sqsRegion = parts[1]
}
}
}
// 获取凭证(优先从环境变量读取,否则从配置文件读取)
accessKey := global.GVA_CONFIG.AWS.AwsSqsAccessKey
secretKey := global.GVA_CONFIG.AWS.AwsSqsSecretKey
if accessKey == "" || secretKey == "" {
return nil, fmt.Errorf("AWS SQS credentials not configured. Please set AWS_SQS_ACCESS_KEY and AWS_SQS_SECRET_KEY environment variables or configure in config file")
}
// 创建一个AWS会话使用提供的凭证
return session.NewSession(&aws.Config{
Region: aws.String(sqsRegion),
Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""),
})
}