package account import ( "bygdata/global" "bygdata/model/awssqs" "context" "encoding/json" "fmt" "runtime/debug" "sync" "time" devtodev "devtodev-sdk" "github.com/shopspring/decimal" "go.uber.org/zap" ) var ( devToDevOnce sync.Once devToDevClient *devtodev.Client devToDevSourceTypeSet map[int32]string currentBalanceLastAt sync.Map ) const ( devToDevEventRegister = "register" devToDevEventWithdraw = "withdraw" devToDevWalletEventDeposit = "deposit" devToDevWalletEventWithdraw = "withdraw" devToDevWalletEventAccrual = "currency_accrual" devToDevWalletEventSpent = "currency_spent" devToDevWalletEventVirtualPayment = "virtual_currency_payment" devToDevDefaultCurrency = "USD" devToDevDefaultPlatform = "web" devToDevDefaultBalanceMinIntervalS = int64(86400) ) func ReportUserBehaviorEvent(ctx context.Context, action awssqs.SqsAction, req awssqs.SqsActionUserBehaviorContent) { defer recoverDevToDevPanic("user_behavior", req.Userno) reporter := newDevToDevReporter(req.Userno, req.GetDeviceID()) if reporter == nil { logUserBehaviorReporterSkip(req) return } now := time.Now().UnixMilli() pkg := baseDevtodevPackage(reporter, req.Ip) switch action { case awssqs.SqsActionUserBehaviorRegister: deviceInfoEvent := devtodev.NewDeviceInfoEvent(now, buildDeviceInfoFields(req.Header)) pkg.Append(deviceInfoEvent) parameters := map[string]interface{}{} if req.Phone != "" { parameters["phone"] = req.Phone } registerEvent := devtodev.NewCustomEvent(now, 1, devToDevEventRegister, parameters, buildCommonFields(action, "", "")) pkg.Append(registerEvent) if _, err := pkg.Report(ctx); err != nil { logDevToDevError("register_bundle", req.Userno, err) } case awssqs.SqsActionUserBehaviorLogin: sessionStartEvent := devtodev.NewSessionStartEvent(now, 1, buildCommonFields(action, "", "")) pkg.Append(sessionStartEvent) if _, err := pkg.Report(ctx); err != nil { logDevToDevError("login_bundle", req.Userno, err) } } } func ReportWalletBalanceChangeEvent(ctx context.Context, req awssqs.SqsActionWalletBalanceChangeContent) { defer recoverDevToDevPanic("wallet_balance_change", req.Userno) reporter := newDevToDevReporter(req.Userno, req.GetDeviceID()) if reporter == nil { logDevToDevSkip("wallet_balance_change", req.Userno, "reporter unavailable: missing app-id/userno/deviceId") return } now := time.Now().UnixMilli() amount, err := decimal.NewFromString(req.Amount) if err != nil { logDevToDevError("parse_amount", req.Userno, err) return } pkg := baseDevtodevPackage(reporter, "") if shouldReportCurrentBalance(req.Userno) { if balance, ok := parseDecimalFloat(req.AfterBalance); ok { balanceEvent := devtodev.NewCurrentBalanceEvent(now, 1, map[string]float64{defaultCurrency(): balance}, buildCommonFields(awssqs.SqsActionWalletBalanceChange, req.SourceId, req.RecordNo)) pkg.Append(balanceEvent) } else { logDevToDevSkip("current_balance", req.Userno, fmt.Sprintf("invalid afterBalance: %q", req.AfterBalance)) } } matchedEvent := false switch walletSourceTypeKind(req.SourceType) { case devToDevWalletEventDeposit: matchedEvent = true price, ok := decimalAbsFloat(amount) if !ok { logDevToDevSkip("real_currency_payment", req.Userno, fmt.Sprintf("invalid amount: %q", req.Amount)) return } paymentEvent := devtodev.NewRealPaymentEvent(now, 1, resolveProductID(req), req.SourceId, price, defaultCurrency(), buildCommonFields(awssqs.SqsActionWalletBalanceChange, req.SourceId, req.RecordNo)) pkg.Append(paymentEvent) case devToDevWalletEventWithdraw: matchedEvent = true value, ok := decimalAbsFloat(amount) if !ok { logDevToDevSkip("withdraw", req.Userno, fmt.Sprintf("invalid amount: %q", req.Amount)) return } withdrawEvent := devtodev.NewCustomEvent(now, 1, devToDevEventWithdraw, map[string]interface{}{ "amount": value, "sourceId": req.SourceId, "recordNo": req.RecordNo, }, buildCommonFields(awssqs.SqsActionWalletBalanceChange, req.SourceId, req.RecordNo)) pkg.Append(withdrawEvent) case devToDevWalletEventAccrual: matchedEvent = true value, ok := decimalAbsFloat(amount) if !ok { logDevToDevSkip("currency_accrual", req.Userno, fmt.Sprintf("invalid amount: %q", req.Amount)) return } accrualEvent := devtodev.NewCurrencyAccrualEvent(now, 1, nil, map[string]map[string]float64{ "default": {defaultCurrency(): value}, }, buildCommonFields(awssqs.SqsActionWalletBalanceChange, req.SourceId, req.RecordNo)) pkg.Append(accrualEvent) case devToDevWalletEventVirtualPayment: matchedEvent = true value, ok := decimalAbsFloat(amount) if !ok { logDevToDevSkip("virtual_currency_payment", req.Userno, fmt.Sprintf("invalid amount: %q", req.Amount)) return } virtualPaymentEvent := devtodev.NewVirtualCurrencyPaymentEvent(now, 1, 1, map[string]float64{defaultCurrency(): value}, "wallet_source_type", fmt.Sprintf("%d", req.SourceType), buildCommonFields(awssqs.SqsActionWalletBalanceChange, req.SourceId, req.RecordNo)) pkg.Append(virtualPaymentEvent) case devToDevWalletEventSpent: matchedEvent = true value, ok := decimalAbsFloat(amount) if !ok { logDevToDevSkip("currency_spent", req.Userno, fmt.Sprintf("invalid amount: %q", req.Amount)) return } spentEvent := devtodev.NewCustomEvent(now, 1, devToDevWalletEventSpent, map[string]interface{}{ "amount": value, "sourceId": req.SourceId, "recordNo": req.RecordNo, "gameId": req.GameId, "sourceType": req.SourceType, }, buildCommonFields(awssqs.SqsActionWalletBalanceChange, req.SourceId, req.RecordNo)) pkg.Append(spentEvent) } if !matchedEvent { logDevToDevSkip("wallet_balance_change", req.Userno, fmt.Sprintf("no sourceType mapping for sourceType=%d", req.SourceType)) } if len(pkg.Events()) == 0 { logDevToDevSkip("wallet_balance_change", req.Userno, "no valid events to report") return } if _, err := pkg.Report(ctx); err != nil { logDevToDevError("wallet_bundle", req.Userno, err) } } func newDevToDevReporter(userno, deviceID string) *devtodev.Reporter { client := getDevToDevClient() if client == nil || userno == "" || deviceID == "" { return nil } reporter := devtodev.NewReporter(client, deviceID) reporter.UserID = userno return reporter } func baseDevtodevPackage(reporter *devtodev.Reporter, ip string) *devtodev.DevtodevPackage { return reporter.NewPackage(). WithPlatform(defaultPlatform()). WithIP(ip). WithAppVersion(global.GVA_CONFIG.DevToDev.AppVersion). WithBundle(global.GVA_CONFIG.DevToDev.Bundle) } func getDevToDevClient() *devtodev.Client { devToDevOnce.Do(func() { cfg := global.GVA_CONFIG.DevToDev if cfg.AppID == "" { return } client := devtodev.NewClient(cfg.AppID) devToDevClient = client devToDevSourceTypeSet = make(map[int32]string) registerSourceTypes(devToDevWalletEventDeposit, cfg.DepositSourceTypes) registerSourceTypes(devToDevWalletEventWithdraw, cfg.WithdrawSourceTypes) registerSourceTypes(devToDevWalletEventAccrual, cfg.CurrencyAccrualSourceTypes) registerSourceTypes(devToDevWalletEventSpent, cfg.VirtualCurrencySpentSourceTypes) registerSourceTypes(devToDevWalletEventVirtualPayment, cfg.VirtualCurrencyPaymentSourceTypes) }) return devToDevClient } func registerSourceTypes(kind string, values []int32) { for _, value := range values { devToDevSourceTypeSet[value] = kind } } func walletSourceTypeKind(sourceType int32) string { if len(devToDevSourceTypeSet) == 0 { return "" } return devToDevSourceTypeSet[sourceType] } func shouldReportCurrentBalance(userno string) bool { interval := global.GVA_CONFIG.DevToDev.CurrentBalanceMinIntervalSeconds if interval <= 0 { interval = devToDevDefaultBalanceMinIntervalS } now := time.Now().Unix() if value, ok := currentBalanceLastAt.Load(userno); ok { lastAt, ok := value.(int64) if ok && now-lastAt < interval { return false } } currentBalanceLastAt.Store(userno, now) return true } func buildDeviceInfoFields(header string) map[string]interface{} { if header == "" { return nil } var values map[string]interface{} if err := json.Unmarshal([]byte(header), &values); err == nil && len(values) > 0 { return map[string]interface{}{"header": values} } return map[string]interface{}{"header_raw": header} } func buildCommonFields(action awssqs.SqsAction, sourceID, recordNo string) map[string]interface{} { fields := map[string]interface{}{ "sqsAction": action.GetName(), } if sourceID != "" { fields["sourceId"] = sourceID } if recordNo != "" { fields["recordNo"] = recordNo } return fields } func resolveProductID(req awssqs.SqsActionWalletBalanceChangeContent) string { if req.GameId != "" { return req.GameId } if req.SourceId != "" { return req.SourceId } return fmt.Sprintf("wallet_source_type_%d", req.SourceType) } func defaultCurrency() string { if global.GVA_CONFIG.DevToDev.DefaultCurrency != "" { return global.GVA_CONFIG.DevToDev.DefaultCurrency } return devToDevDefaultCurrency } func defaultPlatform() string { if global.GVA_CONFIG.DevToDev.Platform != "" { return global.GVA_CONFIG.DevToDev.Platform } return devToDevDefaultPlatform } func parseDecimalFloat(value string) (float64, bool) { parsed, err := decimal.NewFromString(value) if err != nil { return 0, false } floatValue, _ := parsed.Float64() return floatValue, true } func decimalAbsFloat(value decimal.Decimal) (float64, bool) { floatValue, _ := value.Abs().Float64() return floatValue, true } func logDevToDevError(eventName, userno string, err error) { global.GVA_LOG.Warn("devtodev report failed", zap.String("event", eventName), zap.String("userno", userno), zap.Error(err)) } func logDevToDevSkip(scene, userno, reason string) { global.GVA_LOG.Warn("devtodev report skipped", zap.String("scene", scene), zap.String("userno", userno), zap.String("reason", reason)) } func logUserBehaviorReporterSkip(req awssqs.SqsActionUserBehaviorContent) { switch { case global.GVA_CONFIG.DevToDev.AppID == "": logDevToDevSkip("user_behavior", req.Userno, "reporter unavailable: missing app-id") case req.Userno == "": logDevToDevSkip("user_behavior", req.Userno, "reporter unavailable: missing userno") case req.Header == "": global.GVA_LOG.Warn( "devtodev report skipped", zap.String("scene", "user_behavior"), zap.String("userno", req.Userno), zap.String("reason", "reporter unavailable: missing deviceId, header is empty"), zap.Strings("deviceIdCandidateKeys", deviceIDCandidateKeys()), ) default: global.GVA_LOG.Warn( "devtodev report skipped", zap.String("scene", "user_behavior"), zap.String("userno", req.Userno), zap.String("reason", "reporter unavailable: missing deviceId in header"), zap.Strings("deviceIdCandidateKeys", deviceIDCandidateKeys()), zap.String("headerPreview", truncateForLog(req.Header, 512)), ) } } func deviceIDCandidateKeys() []string { return []string{"deviceId", "deviceID", "device_id", "device-id", "x-device-id", "X-Device-Id", "X-DEVICE-ID"} } func truncateForLog(value string, max int) string { if max <= 0 || len(value) <= max { return value } return value[:max] + "...(truncated)" } func recoverDevToDevPanic(scene, userno string) { if r := recover(); r != nil { global.GVA_LOG.Error( "devtodev report panic recovered", zap.String("scene", scene), zap.String("userno", userno), zap.Any("panic", r), zap.ByteString("stack", debug.Stack()), ) } }