package sqs import ( "bygdata/global" "bygdata/model/awssqs" "bygdata/service/account" "context" "encoding/json" "fmt" "os" "os/signal" "strings" "sync" "syscall" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" "go.uber.org/zap" ) func ProcessSqsMessage() { sqsUrl := global.GVA_CONFIG.AWS.AwsSqsUrl global.GVA_LOG.Info("[SQS初始化] AwsSqsUrl: %s", zap.String("AwsSqsUrl", sqsUrl), zap.String("AwsSqsAccessKey", global.GVA_CONFIG.AWS.AwsSqsAccessKey)) if sqsUrl == "" { global.GVA_LOG.Warn("Not found sqs url config. cannot start aws sqs monitor process.") return } var err error // 创建 AWS 会话 sess, err := NewAwsSqsSession() if err != nil { global.GVA_LOG.Warn("Failed to create AWS session:%s", zap.Error(err)) os.Exit(1) } // 创建 SQS 服务客户端 svc := sqs.New(sess) global.GVA_LOG.Info("Aws sqs monitor process start.") // 创建 context 用于优雅退出 ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 创建一个通道来接收中断信号 sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) // 创建有缓冲的消息通道,避免阻塞 // 缓冲区大小设置为 100,可根据实际情况调整 msgsCh := make(chan *sqs.Message, 100) // 创建 WaitGroup 以等待所有处理 goroutine 完成 var wg sync.WaitGroup // 限制并发处理的 goroutine 数量,避免资源耗尽 // 使用带缓冲的 channel 作为 semaphore maxWorkers := 50 // 最大并发处理数,可根据实际情况调整 semaphore := make(chan struct{}, maxWorkers) // 启动一个 goroutine 来接收消息 wg.Add(1) go receiveMessages(ctx, svc, sqsUrl, msgsCh, &wg) // 启动一个 goroutine 来处理消息 wg.Add(1) go func() { defer wg.Done() for { select { case <-ctx.Done(): // context 被取消,退出消息处理循环 return case msg, ok := <-msgsCh: if !ok { // 通道已关闭 return } // 获取 semaphore,限制并发数 semaphore <- struct{}{} wg.Add(1) go func(m *sqs.Message) { defer func() { <-semaphore // 释放 semaphore wg.Done() }() processMessage(svc, sqsUrl, m) }(msg) } } }() // 等待中断信号 <-sigCh global.GVA_LOG.Info("Received interrupt signal. Shutting down gracefully...") // 取消 context,通知所有 goroutine 退出 cancel() // 关闭消息通道,停止接收新消息 close(msgsCh) // 等待所有 goroutine 完成 global.GVA_LOG.Info("Waiting for all goroutines to finish...") wg.Wait() global.GVA_LOG.Info("All goroutines finished. Exiting...") } // 接收 SQS 消息并将其发送到通道 func receiveMessages(ctx context.Context, svc *sqs.SQS, queueURL string, msgsCh chan<- *sqs.Message, wg *sync.WaitGroup) { defer wg.Done() // 减少 WaitGroup 的计数 for { // 检查 context 是否已取消 select { case <-ctx.Done(): global.GVA_LOG.Info("receiveMessages: context cancelled, exiting...") return default: } // 接收消息 // VisibilityTimeout: 消息可见性超时(秒),在这期间消息对其他消费者不可见 // 如果在这期间没有删除消息,消息会自动重新变为可见,实现重复消费 // MessageAttributeNames: 获取消息属性,包括 ApproximateReceiveCount(接收次数) result, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{ QueueUrl: aws.String(queueURL), MaxNumberOfMessages: aws.Int64(10), // 每次最多接收 10 条消息 WaitTimeSeconds: aws.Int64(20), // 等待时间最多为 20 秒 VisibilityTimeout: aws.Int64(300), // 可见性超时 300 秒(5分钟),可根据实际处理时间调整 }) if err != nil { global.GVA_LOG.Error("Error receiving messages", zap.Error(err)) // 如果 context 已取消,直接退出 select { case <-ctx.Done(): return default: // 等待一段时间后重试,避免快速重试导致资源浪费 select { case <-ctx.Done(): return case <-time.After(time.Second * 5): continue } } } // 将收到的消息发送到通道 for _, msg := range result.Messages { select { case <-ctx.Done(): // context 已取消,不再发送新消息 return case msgsCh <- msg: // 消息已成功发送 } } } } // 处理消息 func processMessage(svc *sqs.SQS, queueURL string, msg *sqs.Message) { msgId := *msg.MessageId global.GVA_LOG.Info("Received message:msgId:%s, body:%s", zap.String("msgId", msgId), zap.String("body", *msg.Body)) // 获取消息的重试次数(接收次数) receiveCount := getReceiveCount(msg) maxRetries := int64(3) // 最大重试次数,可根据实际情况调整 // 如果超过最大重试次数,记录错误并删除消息 if receiveCount > maxRetries { global.GVA_LOG.Error("Message exceeded max retries, deleting message", zap.String("msgId", msgId), zap.Int64("receiveCount", receiveCount), zap.Int64("maxRetries", maxRetries), zap.String("body", *msg.Body)) deleteMessage(svc, queueURL, msg) return } // 如果这是重试,记录日志 if receiveCount > 1 { global.GVA_LOG.Info("Retrying message", zap.String("msgId", msgId), zap.Int64("retryCount", receiveCount-1)) } // 如果处理时间可能较长,延长可见性超时 // 这里可以根据实际处理逻辑动态调整 //extendVisibilityTimeout(svc, queueURL, msg, 600) // 延长到 10 分钟 // 在这里处理接收到的消息 var sqsMessage awssqs.SqsMessage err := json.Unmarshal([]byte(*msg.Body), &sqsMessage) if err != nil { global.GVA_LOG.Warn("Sqs message unmarshal error, will retry", zap.Error(err), zap.String("msgId", msgId), zap.Int64("receiveCount", receiveCount)) // 解析失败时不删除消息,让它自动重新变为可见,实现重复消费 // 通过将可见性超时设为 0,立即让消息重新变为可见 // changeMessageVisibility(svc, queueURL, msg, 0) deleteMessage(svc, queueURL, msg) return } // 处理不同类型的消息 var processErr error global.GVA_LOG.Info("Processing create user action", zap.String("msgId", msgId), zap.Int32("msg_action", int32(sqsMessage.Action)), zap.String("msg_content", sqsMessage.Content)) if sqsMessage.Action == awssqs.SqsActionCreateUser { global.GVA_LOG.Info("Processing SqsActionCreateUser", zap.String("msgId", msgId)) account.CreateUser(sqsMessage.Content) } else if sqsMessage.Action == awssqs.SqsActionIncreaseBetTotal { global.GVA_LOG.Info("Processing SqsActionIncreaseBetTotal", zap.String("msgId", msgId)) } else if sqsMessage.Action == awssqs.SqsActionDecreaseBetTotal { global.GVA_LOG.Info("Processing SqsActionDecreaseBetTotal", zap.String("msgId", msgId)) } else if sqsMessage.Action == awssqs.SqsActionDeductWithdrawLimitAfterTransfer { // 处理"回收后才结算"的情况:创建账变记录扣除稽核流水(余额不变) global.GVA_LOG.Info("Processing SqsActionDeductWithdrawLimitAfterTransfer", zap.String("msgId", msgId)) } else if sqsMessage.Action == awssqs.SqsActionInviteCratesCheck { global.GVA_LOG.Info("Processing SqsActionInviteCratesCheck", zap.String("msgId", msgId)) } else if sqsMessage.Action == awssqs.SqsActionInviteBonusCheck { global.GVA_LOG.Info("Processing SqsActionInviteBonusCheck", zap.String("msgId", msgId)) } else if sqsMessage.Action == awssqs.SqsActionLoginIpCheck { global.GVA_LOG.Info("Processing SqsActionLoginIpCheck", zap.String("msgId", msgId)) account.CreateUserCheck(sqsMessage.Content) } else if sqsMessage.Action == awssqs.SqsActionExportData { global.GVA_LOG.Info("Processing SqsActionExportData", zap.String("msgId", msgId)) } else if sqsMessage.Action == awssqs.SqsActionSendEmailVerifyCode { global.GVA_LOG.Info("Processing SqsActionSendEmailVerifyCode", zap.String("msgId", msgId)) } else if sqsMessage.Action == awssqs.SqsActionDailyQuestProcess { global.GVA_LOG.Info("Processing SqsActionDailyQuestProcess", zap.String("msgId", msgId)) } else if sqsMessage.Action == awssqs.SqsActionDepositSuccess { var req awssqs.SqsActionDepositSuccessContent err = json.Unmarshal([]byte(sqsMessage.Content), &req) if err != nil { return } fmt.Fprintf(os.Stderr, "[SQS处理] 解析充值成功消息成功: userno=%s, orderno=%s, depositTimes=%d, depositAmount=%s\n", req.Userno, req.Orderno, req.DepositTimes, req.DepositAmount.String()) } else if sqsMessage.Action == awssqs.SqsActionVisit { global.GVA_LOG.Info("Processing SqsActionVisit", zap.String("msgId", msgId)) account.UpdateUser(sqsMessage.Content) } else if sqsMessage.Action == awssqs.SqsActionSaveBonus { global.GVA_LOG.Info("Processing SqsActionSaveBonus", zap.String("msgId", msgId)) } else if sqsMessage.Action == awssqs.SqsActionGenDepositDailyClaim { global.GVA_LOG.Info("Processing SqsActionGenDepositDailyClaim", zap.String("msgId", msgId)) } else { global.GVA_LOG.Warn("Unknown action", zap.String("action", string(sqsMessage.Action)), zap.String("msgId", msgId)) // 未知操作,记录错误但不重试(直接删除) deleteMessage(svc, queueURL, msg) return } // 根据处理结果决定是否删除消息 if processErr != nil { global.GVA_LOG.Warn("Message processing failed, will retry", zap.Error(processErr), zap.String("msgId", msgId), zap.Int64("receiveCount", receiveCount)) // 处理失败时不删除消息,让它自动重新变为可见,实现重复消费 // 通过将可见性超时设为 0,立即让消息重新变为可见 changeMessageVisibility(svc, queueURL, msg, 0) } else { // 处理成功,删除消息 global.GVA_LOG.Info("Message processed successfully, deleting message", zap.String("msgId", msgId)) deleteMessage(svc, queueURL, msg) } } // 获取消息的接收次数(重试次数) func getReceiveCount(msg *sqs.Message) int64 { if msg.Attributes == nil { return 1 } if countStr, ok := msg.Attributes["ApproximateReceiveCount"]; ok && countStr != nil { // 尝试解析为整数 var count int64 if _, err := fmt.Sscanf(*countStr, "%d", &count); err == nil { return count } } return 1 } // 延长消息的可见性超时(用于处理时间较长的消息) func extendVisibilityTimeout(svc *sqs.SQS, queueURL string, msg *sqs.Message, timeoutSeconds int64) { _, err := svc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ QueueUrl: aws.String(queueURL), ReceiptHandle: msg.ReceiptHandle, VisibilityTimeout: aws.Int64(timeoutSeconds), }) if err != nil { global.GVA_LOG.Warn("Failed to extend message visibility timeout", zap.Error(err), zap.String("msgId", *msg.MessageId)) } else { global.GVA_LOG.Debug("Extended message visibility timeout", zap.String("msgId", *msg.MessageId), zap.Int64("timeoutSeconds", timeoutSeconds)) } } // 改变消息的可见性(用于立即让消息重新变为可见,实现重复消费) // timeoutSeconds 为 0 时,消息立即重新变为可见 func changeMessageVisibility(svc *sqs.SQS, queueURL string, msg *sqs.Message, timeoutSeconds int64) { _, err := svc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{ QueueUrl: aws.String(queueURL), ReceiptHandle: msg.ReceiptHandle, VisibilityTimeout: aws.Int64(timeoutSeconds), }) if err != nil { global.GVA_LOG.Error("Failed to change message visibility", zap.Error(err), zap.String("msgId", *msg.MessageId)) } else { if timeoutSeconds == 0 { global.GVA_LOG.Info("Message visibility set to 0, message will be immediately available for retry", zap.String("msgId", *msg.MessageId)) } else { global.GVA_LOG.Debug("Changed message visibility", zap.String("msgId", *msg.MessageId), zap.Int64("timeoutSeconds", timeoutSeconds)) } } } // 删除已处理的消息 func deleteMessage(svc *sqs.SQS, queueURL string, msg *sqs.Message) { _, err := svc.DeleteMessage(&sqs.DeleteMessageInput{ QueueUrl: aws.String(queueURL), ReceiptHandle: msg.ReceiptHandle, }) if err != nil { global.GVA_LOG.Error("Failed to delete message", zap.Error(err), zap.String("msgId", *msg.MessageId)) } else { global.GVA_LOG.Debug("Message deleted successfully", zap.String("msgId", *msg.MessageId)) } } func NewAwsSqsSession() (*session.Session, error) { // 从 SQS URL 中提取区域,如果 SQS URL 存在 sqsRegion := global.GVA_CONFIG.AWS.SqsRegion // 默认使用 S3Region sqsUrl := global.GVA_CONFIG.AWS.AwsSqsUrl if sqsUrl != "" { // 从 URL 中提取区域,例如: https://sqs.sa-east-1.amazonaws.com/... -> sa-east-1 // 格式: https://sqs.{region}.amazonaws.com/... if strings.HasPrefix(sqsUrl, "https://sqs.") { parts := strings.Split(sqsUrl, ".") if len(parts) >= 2 { // parts[1] 应该是区域名,例如 "sa-east-1" sqsRegion = parts[1] } } } // 获取凭证(优先从环境变量读取,否则从配置文件读取) accessKey := global.GVA_CONFIG.AWS.AwsSqsAccessKey secretKey := global.GVA_CONFIG.AWS.AwsSqsSecretKey if accessKey == "" || secretKey == "" { return nil, fmt.Errorf("AWS SQS credentials not configured. Please set AWS_SQS_ACCESS_KEY and AWS_SQS_SECRET_KEY environment variables or configure in config file") } // 创建一个AWS会话,使用提供的凭证 return session.NewSession(&aws.Config{ Region: aws.String(sqsRegion), Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""), }) }