bygdata/service/account/devtodev.go

354 lines
11 KiB
Go

package account
import (
"bygdata/global"
"bygdata/model/awssqs"
"context"
"encoding/json"
"fmt"
"runtime/debug"
"sync"
"time"
devtodev "devtodev-sdk"
"github.com/shopspring/decimal"
"go.uber.org/zap"
)
var (
devToDevOnce sync.Once
devToDevClient *devtodev.Client
devToDevSourceTypeSet map[int32]string
currentBalanceLastAt sync.Map
)
const (
devToDevEventRegister = "register"
devToDevEventWithdraw = "withdraw"
devToDevWalletEventDeposit = "deposit"
devToDevWalletEventWithdraw = "withdraw"
devToDevWalletEventAccrual = "currency_accrual"
devToDevWalletEventSpent = "currency_spent"
devToDevWalletEventVirtualPayment = "virtual_currency_payment"
devToDevDefaultCurrency = "USD"
devToDevDefaultPlatform = "web"
devToDevDefaultBalanceMinIntervalS = int64(86400)
)
func ReportUserBehaviorEvent(ctx context.Context, action awssqs.SqsAction, req awssqs.SqsActionUserBehaviorContent) {
defer recoverDevToDevPanic("user_behavior", req.Userno)
reporter := newDevToDevReporter(req.Userno, req.GetDeviceID())
if reporter == nil {
logUserBehaviorReporterSkip(req)
return
}
now := time.Now().UnixMilli()
pkg := baseDevtodevPackage(reporter, req.Ip)
switch action {
case awssqs.SqsActionUserBehaviorRegister:
deviceInfoEvent := devtodev.NewDeviceInfoEvent(now, buildDeviceInfoFields(req.Header))
pkg.Append(deviceInfoEvent)
parameters := map[string]interface{}{}
if req.Phone != "" {
parameters["phone"] = req.Phone
}
registerEvent := devtodev.NewCustomEvent(now, 1, devToDevEventRegister, parameters, buildCommonFields(action, "", ""))
pkg.Append(registerEvent)
if _, err := pkg.Report(ctx); err != nil {
logDevToDevError("register_bundle", req.Userno, err)
}
case awssqs.SqsActionUserBehaviorLogin:
sessionStartEvent := devtodev.NewSessionStartEvent(now, 1, buildCommonFields(action, "", ""))
pkg.Append(sessionStartEvent)
if _, err := pkg.Report(ctx); err != nil {
logDevToDevError("login_bundle", req.Userno, err)
}
}
}
func ReportWalletBalanceChangeEvent(ctx context.Context, req awssqs.SqsActionWalletBalanceChangeContent) {
defer recoverDevToDevPanic("wallet_balance_change", req.Userno)
reporter := newDevToDevReporter(req.Userno, req.GetDeviceID())
if reporter == nil {
logDevToDevSkip("wallet_balance_change", req.Userno, "reporter unavailable: missing app-id/userno/deviceId")
return
}
now := time.Now().UnixMilli()
amount, err := decimal.NewFromString(req.Amount)
if err != nil {
logDevToDevError("parse_amount", req.Userno, err)
return
}
pkg := baseDevtodevPackage(reporter, "")
if shouldReportCurrentBalance(req.Userno) {
if balance, ok := parseDecimalFloat(req.AfterBalance); ok {
balanceEvent := devtodev.NewCurrentBalanceEvent(now, 1, map[string]float64{defaultCurrency(): balance}, buildCommonFields(awssqs.SqsActionWalletBalanceChange, req.SourceId, req.RecordNo))
pkg.Append(balanceEvent)
} else {
logDevToDevSkip("current_balance", req.Userno, fmt.Sprintf("invalid afterBalance: %q", req.AfterBalance))
}
}
matchedEvent := false
switch walletSourceTypeKind(req.SourceType) {
case devToDevWalletEventDeposit:
matchedEvent = true
price, ok := decimalAbsFloat(amount)
if !ok {
logDevToDevSkip("real_currency_payment", req.Userno, fmt.Sprintf("invalid amount: %q", req.Amount))
return
}
paymentEvent := devtodev.NewRealPaymentEvent(now, 1, resolveProductID(req), req.SourceId, price, defaultCurrency(), buildCommonFields(awssqs.SqsActionWalletBalanceChange, req.SourceId, req.RecordNo))
pkg.Append(paymentEvent)
case devToDevWalletEventWithdraw:
matchedEvent = true
value, ok := decimalAbsFloat(amount)
if !ok {
logDevToDevSkip("withdraw", req.Userno, fmt.Sprintf("invalid amount: %q", req.Amount))
return
}
withdrawEvent := devtodev.NewCustomEvent(now, 1, devToDevEventWithdraw, map[string]interface{}{
"amount": value,
"sourceId": req.SourceId,
"recordNo": req.RecordNo,
}, buildCommonFields(awssqs.SqsActionWalletBalanceChange, req.SourceId, req.RecordNo))
pkg.Append(withdrawEvent)
case devToDevWalletEventAccrual:
matchedEvent = true
value, ok := decimalAbsFloat(amount)
if !ok {
logDevToDevSkip("currency_accrual", req.Userno, fmt.Sprintf("invalid amount: %q", req.Amount))
return
}
accrualEvent := devtodev.NewCurrencyAccrualEvent(now, 1, nil, map[string]map[string]float64{
"default": {defaultCurrency(): value},
}, buildCommonFields(awssqs.SqsActionWalletBalanceChange, req.SourceId, req.RecordNo))
pkg.Append(accrualEvent)
case devToDevWalletEventVirtualPayment:
matchedEvent = true
value, ok := decimalAbsFloat(amount)
if !ok {
logDevToDevSkip("virtual_currency_payment", req.Userno, fmt.Sprintf("invalid amount: %q", req.Amount))
return
}
virtualPaymentEvent := devtodev.NewVirtualCurrencyPaymentEvent(now, 1, 1, map[string]float64{defaultCurrency(): value}, "wallet_source_type", fmt.Sprintf("%d", req.SourceType), buildCommonFields(awssqs.SqsActionWalletBalanceChange, req.SourceId, req.RecordNo))
pkg.Append(virtualPaymentEvent)
case devToDevWalletEventSpent:
matchedEvent = true
value, ok := decimalAbsFloat(amount)
if !ok {
logDevToDevSkip("currency_spent", req.Userno, fmt.Sprintf("invalid amount: %q", req.Amount))
return
}
spentEvent := devtodev.NewCustomEvent(now, 1, devToDevWalletEventSpent, map[string]interface{}{
"amount": value,
"sourceId": req.SourceId,
"recordNo": req.RecordNo,
"gameId": req.GameId,
"sourceType": req.SourceType,
}, buildCommonFields(awssqs.SqsActionWalletBalanceChange, req.SourceId, req.RecordNo))
pkg.Append(spentEvent)
}
if !matchedEvent {
logDevToDevSkip("wallet_balance_change", req.Userno, fmt.Sprintf("no sourceType mapping for sourceType=%d", req.SourceType))
}
if len(pkg.Events()) == 0 {
logDevToDevSkip("wallet_balance_change", req.Userno, "no valid events to report")
return
}
if _, err := pkg.Report(ctx); err != nil {
logDevToDevError("wallet_bundle", req.Userno, err)
}
}
func newDevToDevReporter(userno, deviceID string) *devtodev.Reporter {
client := getDevToDevClient()
if client == nil || userno == "" || deviceID == "" {
return nil
}
reporter := devtodev.NewReporter(client, deviceID)
reporter.UserID = userno
return reporter
}
func baseDevtodevPackage(reporter *devtodev.Reporter, ip string) *devtodev.DevtodevPackage {
return reporter.NewPackage().
WithPlatform(defaultPlatform()).
WithIP(ip).
WithAppVersion(global.GVA_CONFIG.DevToDev.AppVersion).
WithBundle(global.GVA_CONFIG.DevToDev.Bundle)
}
func getDevToDevClient() *devtodev.Client {
devToDevOnce.Do(func() {
cfg := global.GVA_CONFIG.DevToDev
if cfg.AppID == "" {
return
}
client := devtodev.NewClient(cfg.AppID)
devToDevClient = client
devToDevSourceTypeSet = make(map[int32]string)
registerSourceTypes(devToDevWalletEventDeposit, cfg.DepositSourceTypes)
registerSourceTypes(devToDevWalletEventWithdraw, cfg.WithdrawSourceTypes)
registerSourceTypes(devToDevWalletEventAccrual, cfg.CurrencyAccrualSourceTypes)
registerSourceTypes(devToDevWalletEventSpent, cfg.VirtualCurrencySpentSourceTypes)
registerSourceTypes(devToDevWalletEventVirtualPayment, cfg.VirtualCurrencyPaymentSourceTypes)
})
return devToDevClient
}
func registerSourceTypes(kind string, values []int32) {
for _, value := range values {
devToDevSourceTypeSet[value] = kind
}
}
func walletSourceTypeKind(sourceType int32) string {
if len(devToDevSourceTypeSet) == 0 {
return ""
}
return devToDevSourceTypeSet[sourceType]
}
func shouldReportCurrentBalance(userno string) bool {
interval := global.GVA_CONFIG.DevToDev.CurrentBalanceMinIntervalSeconds
if interval <= 0 {
interval = devToDevDefaultBalanceMinIntervalS
}
now := time.Now().Unix()
if value, ok := currentBalanceLastAt.Load(userno); ok {
lastAt, ok := value.(int64)
if ok && now-lastAt < interval {
return false
}
}
currentBalanceLastAt.Store(userno, now)
return true
}
func buildDeviceInfoFields(header string) map[string]interface{} {
if header == "" {
return nil
}
var values map[string]interface{}
if err := json.Unmarshal([]byte(header), &values); err == nil && len(values) > 0 {
return map[string]interface{}{"header": values}
}
return map[string]interface{}{"header_raw": header}
}
func buildCommonFields(action awssqs.SqsAction, sourceID, recordNo string) map[string]interface{} {
fields := map[string]interface{}{
"sqsAction": action.GetName(),
}
if sourceID != "" {
fields["sourceId"] = sourceID
}
if recordNo != "" {
fields["recordNo"] = recordNo
}
return fields
}
func resolveProductID(req awssqs.SqsActionWalletBalanceChangeContent) string {
if req.GameId != "" {
return req.GameId
}
if req.SourceId != "" {
return req.SourceId
}
return fmt.Sprintf("wallet_source_type_%d", req.SourceType)
}
func defaultCurrency() string {
if global.GVA_CONFIG.DevToDev.DefaultCurrency != "" {
return global.GVA_CONFIG.DevToDev.DefaultCurrency
}
return devToDevDefaultCurrency
}
func defaultPlatform() string {
if global.GVA_CONFIG.DevToDev.Platform != "" {
return global.GVA_CONFIG.DevToDev.Platform
}
return devToDevDefaultPlatform
}
func parseDecimalFloat(value string) (float64, bool) {
parsed, err := decimal.NewFromString(value)
if err != nil {
return 0, false
}
floatValue, _ := parsed.Float64()
return floatValue, true
}
func decimalAbsFloat(value decimal.Decimal) (float64, bool) {
floatValue, _ := value.Abs().Float64()
return floatValue, true
}
func logDevToDevError(eventName, userno string, err error) {
global.GVA_LOG.Warn("devtodev report failed", zap.String("event", eventName), zap.String("userno", userno), zap.Error(err))
}
func logDevToDevSkip(scene, userno, reason string) {
global.GVA_LOG.Warn("devtodev report skipped", zap.String("scene", scene), zap.String("userno", userno), zap.String("reason", reason))
}
func logUserBehaviorReporterSkip(req awssqs.SqsActionUserBehaviorContent) {
switch {
case global.GVA_CONFIG.DevToDev.AppID == "":
logDevToDevSkip("user_behavior", req.Userno, "reporter unavailable: missing app-id")
case req.Userno == "":
logDevToDevSkip("user_behavior", req.Userno, "reporter unavailable: missing userno")
case req.Header == "":
global.GVA_LOG.Warn(
"devtodev report skipped",
zap.String("scene", "user_behavior"),
zap.String("userno", req.Userno),
zap.String("reason", "reporter unavailable: missing deviceId, header is empty"),
zap.Strings("deviceIdCandidateKeys", deviceIDCandidateKeys()),
)
default:
global.GVA_LOG.Warn(
"devtodev report skipped",
zap.String("scene", "user_behavior"),
zap.String("userno", req.Userno),
zap.String("reason", "reporter unavailable: missing deviceId in header"),
zap.Strings("deviceIdCandidateKeys", deviceIDCandidateKeys()),
zap.String("headerPreview", truncateForLog(req.Header, 512)),
)
}
}
func deviceIDCandidateKeys() []string {
return []string{"deviceId", "deviceID", "device_id", "device-id", "x-device-id", "X-Device-Id", "X-DEVICE-ID"}
}
func truncateForLog(value string, max int) string {
if max <= 0 || len(value) <= max {
return value
}
return value[:max] + "...(truncated)"
}
func recoverDevToDevPanic(scene, userno string) {
if r := recover(); r != nil {
global.GVA_LOG.Error(
"devtodev report panic recovered",
zap.String("scene", scene),
zap.String("userno", userno),
zap.Any("panic", r),
zap.ByteString("stack", debug.Stack()),
)
}
}