bygdata/pkg/devtodev/devtodev.go

339 lines
7.4 KiB
Go

package devtodev
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"math/rand"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/klauspost/compress/zstd"
)
const (
defaultEndpoint = "https://api.devtodev.com/v2/analytics/report"
defaultUserAgent = "devtodev-go-sdk/0.1"
maxPayloadBytes = 2 * 1024 * 1024
)
// Compression controls request body encoding.
type Compression string
const (
CompressionNone Compression = "none"
CompressionGzip Compression = "gzip"
CompressionZstd Compression = "zstd"
)
// Client sends events to devtodev Data API 2.0.
type Client struct {
AppID string
Endpoint string
HTTP *http.Client
Retry RetryConfig
UserAgent string
Compression Compression
}
// Response captures HTTP response details for a send.
type Response struct {
StatusCode int
Headers http.Header
Body []byte
JSON map[string]interface{}
}
// APIError provides details for non-2xx responses.
type APIError struct {
StatusCode int
Message string
Body []byte
}
func (e *APIError) Error() string {
if e.Message != "" {
return fmt.Sprintf("request failed with status %d: %s", e.StatusCode, e.Message)
}
return fmt.Sprintf("request failed with status %d", e.StatusCode)
}
// RetryConfig configures retry behavior for transient failures.
type RetryConfig struct {
MaxAttempts int
BaseDelay time.Duration
MaxDelay time.Duration
Jitter bool
RetryStatuses map[int]struct{}
}
// NewClient creates a client with defaults.
func NewClient(appID string) *Client {
return &Client{
AppID: appID,
Endpoint: defaultEndpoint,
HTTP: &http.Client{Timeout: 10 * time.Second},
Retry: RetryConfig{
MaxAttempts: 3,
BaseDelay: 500 * time.Millisecond,
MaxDelay: 5 * time.Second,
Jitter: true,
RetryStatuses: map[int]struct{}{408: {}, 425: {}, 429: {}, 500: {}, 502: {}, 503: {}, 504: {}},
},
UserAgent: defaultUserAgent,
Compression: CompressionNone,
}
}
// Send sends a payload to devtodev.
func (c *Client) Send(ctx context.Context, payload Payload) error {
_, err := c.SendWithResponse(ctx, payload)
return err
}
// SendWithResponse sends a payload and returns the HTTP response details.
func (c *Client) SendWithResponse(ctx context.Context, payload Payload) (*Response, error) {
if c == nil {
return nil, errors.New("client is nil")
}
if c.AppID == "" {
return nil, errors.New("AppID is required")
}
if err := ValidatePayload(payload); err != nil {
return nil, err
}
endpoint := c.Endpoint
if endpoint == "" {
endpoint = defaultEndpoint
}
body, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("marshal payload: %w", err)
}
if len(body) > maxPayloadBytes {
return nil, fmt.Errorf("payload too large: %d bytes exceeds %d bytes", len(body), maxPayloadBytes)
}
contentType := "application/json"
payloadBytes := body
switch c.Compression {
case "", CompressionNone:
// no-op
case CompressionGzip:
contentType = "application/gzip"
payloadBytes, err = gzipCompress(body)
if err != nil {
return nil, fmt.Errorf("gzip compress: %w", err)
}
case CompressionZstd:
contentType = "application/zstd"
payloadBytes, err = zstdCompress(body)
if err != nil {
return nil, fmt.Errorf("zstd compress: %w", err)
}
default:
return nil, fmt.Errorf("unsupported compression: %s", c.Compression)
}
reqURL, err := withAppID(endpoint, c.AppID)
if err != nil {
return nil, fmt.Errorf("build url: %w", err)
}
httpClient := c.HTTP
if httpClient == nil {
httpClient = &http.Client{Timeout: 10 * time.Second}
}
attempts := max(1, c.Retry.MaxAttempts)
for i := 1; i <= attempts; i++ {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL, bytes.NewReader(payloadBytes))
if err != nil {
return nil, fmt.Errorf("new request: %w", err)
}
req.Header.Set("Content-Type", contentType)
if c.UserAgent != "" {
req.Header.Set("User-Agent", c.UserAgent)
}
resp, err := httpClient.Do(req)
var parsed *Response
if err == nil && resp != nil {
parsed, err = parseResponse(resp)
if err != nil {
return nil, fmt.Errorf("read response: %w", err)
}
}
if err == nil && resp != nil && resp.StatusCode >= 200 && resp.StatusCode < 300 {
return parsed, nil
}
retry := shouldRetry(err, resp, c.Retry.RetryStatuses)
if !retry || i == attempts {
if err != nil {
return parsed, fmt.Errorf("request failed: %w", err)
}
return parsed, apiError(parsed)
}
wait := backoff(c.Retry, i)
if wait > 0 {
timer := time.NewTimer(wait)
select {
case <-ctx.Done():
timer.Stop()
return nil, ctx.Err()
case <-timer.C:
}
}
}
return nil, errors.New("unexpected retry loop exit")
}
func withAppID(endpoint, appID string) (string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return "", err
}
q := u.Query()
if q.Get("appId") == "" {
q.Set("appId", appID)
}
u.RawQuery = q.Encode()
return u.String(), nil
}
func shouldRetry(err error, resp *http.Response, retryStatuses map[int]struct{}) bool {
if err != nil {
var netErr net.Error
if errors.As(err, &netErr) {
return true
}
if strings.Contains(err.Error(), "timeout") {
return true
}
return true
}
if resp == nil {
return true
}
if _, ok := retryStatuses[resp.StatusCode]; ok {
return true
}
if resp.StatusCode >= 500 {
return true
}
return false
}
func backoff(cfg RetryConfig, attempt int) time.Duration {
if cfg.BaseDelay <= 0 {
return 0
}
exp := math.Pow(2, float64(attempt-1))
delay := time.Duration(float64(cfg.BaseDelay) * exp)
if cfg.MaxDelay > 0 && delay > cfg.MaxDelay {
delay = cfg.MaxDelay
}
if cfg.Jitter {
maxJitter := int64(delay / 2)
if maxJitter > 0 {
delay = delay/2 + time.Duration(rand.Int63n(maxJitter))
}
}
return delay
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
func gzipCompress(data []byte) ([]byte, error) {
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)
if _, err := zw.Write(data); err != nil {
zw.Close()
return nil, err
}
if err := zw.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func zstdCompress(data []byte) ([]byte, error) {
var buf bytes.Buffer
encoder, err := zstd.NewWriter(&buf)
if err != nil {
return nil, err
}
if _, err := encoder.Write(data); err != nil {
encoder.Close()
return nil, err
}
if err := encoder.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func parseResponse(resp *http.Response) (*Response, error) {
if resp == nil {
return nil, nil
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
out := &Response{
StatusCode: resp.StatusCode,
Headers: resp.Header.Clone(),
Body: body,
}
if isJSONResponse(resp.Header.Get("Content-Type")) && len(body) > 0 {
var m map[string]interface{}
if err := json.Unmarshal(body, &m); err == nil {
out.JSON = m
}
}
return out, nil
}
func apiError(resp *Response) error {
if resp == nil {
return &APIError{StatusCode: 0, Message: "no response"}
}
msg := ""
if resp.JSON != nil {
if v, ok := resp.JSON["message"].(string); ok {
msg = v
} else if v, ok := resp.JSON["error"].(string); ok {
msg = v
}
}
return &APIError{
StatusCode: resp.StatusCode,
Message: msg,
Body: resp.Body,
}
}
func isJSONResponse(contentType string) bool {
return strings.Contains(contentType, "application/json") || strings.Contains(contentType, "+json")
}