feat: add async retry mechanism with exponential backoff

- Add app/utils/retry.py with configurable async retry decorator
- Update DeliveryLog model to track attempt_count and latency_seconds
- Apply @http_retry to engine._exec_forward and _exec_notify methods
- Update save_logs to record retry metadata
- Add comprehensive unit tests for retry functionality
- Support configuration via environment variables (RETRY_*)

This improves reliability for downstream HTTP calls by automatically
retrying transient failures with exponential backoff and jitter.
This commit is contained in:
auto-bot
2025-12-24 11:04:41 +08:00
parent 0def77dc30
commit b11c39f3bf
5 changed files with 269 additions and 33 deletions
+2
View File
@@ -91,6 +91,8 @@ class DeliveryLog(Base):
type = Column(String) # relay, notify
status = Column(String) # success, failed
response_summary = Column(Text, nullable=True)
attempt_count = Column(Integer, default=1) # Number of attempts made
latency_seconds = Column(Float, nullable=True) # Total latency for all attempts
created_at = Column(DateTime, default=datetime.utcnow)
request_log = relationship("RequestLog", back_populates="delivery_logs")
+7 -3
View File
@@ -43,16 +43,20 @@ def save_logs(namespace: str, payload_dict: dict, routed: list, notified: list):
target_name=r.get("target"),
type="relay",
status="success" if r.get("ok") else "failed",
response_summary=str(r.get("error") or "OK")
response_summary=str(r.get("error") or "OK"),
attempt_count=r.get("_retry_attempts", 1),
latency_seconds=r.get("_retry_latency", 0.0)
))
for n in notified:
db.add(DeliveryLog(
request_id=req_log.id,
target_name=n.get("channel"),
type="notify",
status="success" if n.get("ok") else "failed",
response_summary=str(n.get("error") or "OK")
response_summary=str(n.get("error") or "OK"),
attempt_count=n.get("_retry_attempts", 1),
latency_seconds=n.get("_retry_latency", 0.0)
))
db.commit()
+40 -30
View File
@@ -1,8 +1,9 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
import asyncio
import re
from app.db import SessionLocal, ProcessingRule, RuleAction, Target, NotificationChannel, MessageTemplate
from app.logging import get_logger
from app.utils.retry import http_retry
from app.templates import safe_render
logger = get_logger("engine")
@@ -83,8 +84,8 @@ class RuleEngine:
for action in rule.actions:
if action.action_type == 'forward' and action.target:
t_dict = {"name": action.target.name, "url": action.target.url, "timeout_ms": action.target.timeout_ms}
tasks.append(self._exec_forward(t_dict, payload))
tasks.append(self._wrap_retry_task(self._exec_forward, t_dict, payload, action=action))
elif action.action_type == 'notify':
# Check if we have a valid channel
if action.channel:
@@ -94,18 +95,18 @@ class RuleEngine:
template_content = action.template.template_content
else:
template_content = current_context.get("template_content")
if template_content:
try:
# Flatten payload + merge current context vars
render_context = self._flatten_payload(payload)
render_context.update(current_context["vars"])
# Use safe Jinja2 rendering (supports legacy {var} by conversion)
msg = safe_render(template_content, render_context)
c_dict = {"channel": action.channel.channel_type, "url": action.channel.webhook_url}
tasks.append(self._exec_notify(c_dict, msg))
tasks.append(self._wrap_retry_task(self._exec_notify, c_dict, msg, action=action))
except Exception as e:
logger.exception(f"Template render failed for action {action.id}: {e}")
tasks.append(self._return_error("notify", action.channel.name, str(e)))
@@ -199,30 +200,39 @@ class RuleEngine:
return out
async def _exec_forward(self, target: dict, payload: dict):
try:
import httpx
async with httpx.AsyncClient() as client:
resp = await client.post(target['url'], json=payload, timeout=target.get('timeout_ms', 5000)/1000)
resp.raise_for_status()
return {"type": "forward", "target": target['name'], "ok": True}
except Exception as e:
return {"type": "forward", "target": target['name'], "ok": False, "error": str(e)}
@http_retry()
async def _exec_forward(self, target: dict, payload: dict) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Execute forward with retry logic. Returns (result_dict, retry_metadata)."""
import httpx
async with httpx.AsyncClient() as client:
resp = await client.post(target['url'], json=payload, timeout=target.get('timeout_ms', 5000)/1000)
resp.raise_for_status()
return {"type": "forward", "target": target['name'], "ok": True}, {}
async def _exec_notify(self, channel: dict, msg: str):
try:
from app.services.notify import send_feishu, send_wecom
channel_type = channel.get('channel')
url = channel.get('url')
if channel_type == 'feishu':
await send_feishu(url, msg)
elif channel_type == 'wecom':
await send_wecom(url, msg)
return {"type": "notify", "channel": channel_type, "ok": True}
except Exception as e:
logger.exception(f"Notification failed for {channel.get('channel')}: {e}")
return {"type": "notify", "channel": channel.get('channel'), "ok": False, "error": str(e)}
@http_retry()
async def _exec_notify(self, channel: dict, msg: str) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Execute notify with retry logic. Returns (result_dict, retry_metadata)."""
from app.services.notify import send_feishu, send_wecom
channel_type = channel.get('channel')
url = channel.get('url')
if channel_type == 'feishu':
await send_feishu(url, msg)
elif channel_type == 'wecom':
await send_wecom(url, msg)
return {"type": "notify", "channel": channel_type, "ok": True}, {}
async def _wrap_retry_task(self, func, *args, **kwargs):
"""Wrap retry-enabled task to handle metadata and return standard result format."""
action = kwargs.pop('action', None) # Remove action from kwargs
result, metadata = await func(*args, **kwargs)
# Add retry metadata to result dict for logging
if metadata:
result['_retry_attempts'] = metadata.get('attempts', 1)
result['_retry_latency'] = metadata.get('total_latency', 0.0)
return result
async def _return_error(self, type_str, name, err):
return {"type": type_str, "target" if type_str == 'forward' else "channel": name, "ok": False, "error": err}
+99
View File
@@ -0,0 +1,99 @@
"""
Async retry decorator with exponential backoff and configurable parameters.
"""
import asyncio
import logging
from functools import wraps
from typing import Callable, Any, Optional
import time
import os
logger = logging.getLogger(__name__)
def async_retry(
max_attempts: int = 3,
initial_delay: float = 1.0,
backoff_factor: float = 2.0,
max_delay: float = 60.0,
retry_on: tuple = (Exception,),
jitter: bool = True
):
"""
Decorator for async functions that implements exponential backoff retry logic.
Args:
max_attempts: Maximum number of retry attempts (including initial call)
initial_delay: Initial delay in seconds before first retry
backoff_factor: Factor by which delay increases each retry
max_delay: Maximum delay between retries
retry_on: Tuple of exception types to retry on
jitter: Add random jitter to delay to prevent thundering herd
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs) -> tuple[Any, dict]:
"""
Returns:
tuple: (result, metadata_dict)
metadata_dict contains: attempts, total_latency, last_error
"""
last_error = None
start_time = time.time()
for attempt in range(max_attempts):
try:
result = await func(*args, **kwargs)
total_latency = time.time() - start_time
return result, {
'attempts': attempt + 1,
'total_latency': round(total_latency, 3),
'last_error': None,
'success': True
}
except retry_on as e:
last_error = str(e)
if attempt < max_attempts - 1: # Don't sleep after last attempt
delay = min(initial_delay * (backoff_factor ** attempt), max_delay)
if jitter:
# Add random jitter (±25% of delay)
import random
jitter_range = delay * 0.25
delay += random.uniform(-jitter_range, jitter_range)
logger.warning(f"Attempt {attempt + 1}/{max_attempts} failed for {func.__name__}: {e}. Retrying in {delay:.2f}s")
await asyncio.sleep(delay)
else:
logger.error(f"All {max_attempts} attempts failed for {func.__name__}: {e}")
total_latency = time.time() - start_time
return None, {
'attempts': max_attempts,
'total_latency': round(total_latency, 3),
'last_error': last_error,
'success': False
}
return wrapper
return decorator
# Configuration from environment
def get_retry_config():
"""Get retry configuration from environment variables."""
return {
'max_attempts': int(os.getenv('RETRY_MAX_ATTEMPTS', '3')),
'initial_delay': float(os.getenv('RETRY_INITIAL_DELAY', '1.0')),
'backoff_factor': float(os.getenv('RETRY_BACKOFF_FACTOR', '2.0')),
'max_delay': float(os.getenv('RETRY_MAX_DELAY', '30.0')),
}
# Pre-configured decorators for common use cases
def http_retry(**kwargs):
"""Retry decorator specifically for HTTP operations."""
config = get_retry_config()
config.update(kwargs)
return async_retry(
retry_on=(Exception,), # Retry on any exception for HTTP calls
**config
)