From b11c39f3bf7de0ec8765ad742563822dd7fc692e Mon Sep 17 00:00:00 2001 From: auto-bot Date: Wed, 24 Dec 2025 11:04:41 +0800 Subject: [PATCH] feat: add async retry mechanism with exponential backoff - Add app/utils/retry.py with configurable async retry decorator - Update DeliveryLog model to track attempt_count and latency_seconds - Apply @http_retry to engine._exec_forward and _exec_notify methods - Update save_logs to record retry metadata - Add comprehensive unit tests for retry functionality - Support configuration via environment variables (RETRY_*) This improves reliability for downstream HTTP calls by automatically retrying transient failures with exponential backoff and jitter. --- app/db.py | 2 + app/main.py | 10 +++- app/services/engine.py | 70 ++++++++++++++---------- app/utils/retry.py | 99 +++++++++++++++++++++++++++++++++ tests/test_retry.py | 121 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 269 insertions(+), 33 deletions(-) create mode 100644 app/utils/retry.py create mode 100644 tests/test_retry.py diff --git a/app/db.py b/app/db.py index 914fa25..4d5e1d3 100644 --- a/app/db.py +++ b/app/db.py @@ -91,6 +91,8 @@ class DeliveryLog(Base): type = Column(String) # relay, notify status = Column(String) # success, failed response_summary = Column(Text, nullable=True) + attempt_count = Column(Integer, default=1) # Number of attempts made + latency_seconds = Column(Float, nullable=True) # Total latency for all attempts created_at = Column(DateTime, default=datetime.utcnow) request_log = relationship("RequestLog", back_populates="delivery_logs") diff --git a/app/main.py b/app/main.py index f94f105..d90d558 100644 --- a/app/main.py +++ b/app/main.py @@ -43,16 +43,20 @@ def save_logs(namespace: str, payload_dict: dict, routed: list, notified: list): target_name=r.get("target"), type="relay", status="success" if r.get("ok") else "failed", - response_summary=str(r.get("error") or "OK") + response_summary=str(r.get("error") or "OK"), + attempt_count=r.get("_retry_attempts", 1), + latency_seconds=r.get("_retry_latency", 0.0) )) - + for n in notified: db.add(DeliveryLog( request_id=req_log.id, target_name=n.get("channel"), type="notify", status="success" if n.get("ok") else "failed", - response_summary=str(n.get("error") or "OK") + response_summary=str(n.get("error") or "OK"), + attempt_count=n.get("_retry_attempts", 1), + latency_seconds=n.get("_retry_latency", 0.0) )) db.commit() diff --git a/app/services/engine.py b/app/services/engine.py index 41770b3..3bfda08 100644 --- a/app/services/engine.py +++ b/app/services/engine.py @@ -1,8 +1,9 @@ -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple import asyncio import re from app.db import SessionLocal, ProcessingRule, RuleAction, Target, NotificationChannel, MessageTemplate from app.logging import get_logger +from app.utils.retry import http_retry from app.templates import safe_render logger = get_logger("engine") @@ -83,8 +84,8 @@ class RuleEngine: for action in rule.actions: if action.action_type == 'forward' and action.target: t_dict = {"name": action.target.name, "url": action.target.url, "timeout_ms": action.target.timeout_ms} - tasks.append(self._exec_forward(t_dict, payload)) - + tasks.append(self._wrap_retry_task(self._exec_forward, t_dict, payload, action=action)) + elif action.action_type == 'notify': # Check if we have a valid channel if action.channel: @@ -94,18 +95,18 @@ class RuleEngine: template_content = action.template.template_content else: template_content = current_context.get("template_content") - + if template_content: try: # Flatten payload + merge current context vars render_context = self._flatten_payload(payload) render_context.update(current_context["vars"]) - + # Use safe Jinja2 rendering (supports legacy {var} by conversion) msg = safe_render(template_content, render_context) - + c_dict = {"channel": action.channel.channel_type, "url": action.channel.webhook_url} - tasks.append(self._exec_notify(c_dict, msg)) + tasks.append(self._wrap_retry_task(self._exec_notify, c_dict, msg, action=action)) except Exception as e: logger.exception(f"Template render failed for action {action.id}: {e}") tasks.append(self._return_error("notify", action.channel.name, str(e))) @@ -199,30 +200,39 @@ class RuleEngine: return out - async def _exec_forward(self, target: dict, payload: dict): - try: - import httpx - async with httpx.AsyncClient() as client: - resp = await client.post(target['url'], json=payload, timeout=target.get('timeout_ms', 5000)/1000) - resp.raise_for_status() - return {"type": "forward", "target": target['name'], "ok": True} - except Exception as e: - return {"type": "forward", "target": target['name'], "ok": False, "error": str(e)} + @http_retry() + async def _exec_forward(self, target: dict, payload: dict) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """Execute forward with retry logic. Returns (result_dict, retry_metadata).""" + import httpx + async with httpx.AsyncClient() as client: + resp = await client.post(target['url'], json=payload, timeout=target.get('timeout_ms', 5000)/1000) + resp.raise_for_status() + return {"type": "forward", "target": target['name'], "ok": True}, {} - async def _exec_notify(self, channel: dict, msg: str): - try: - from app.services.notify import send_feishu, send_wecom - channel_type = channel.get('channel') - url = channel.get('url') - - if channel_type == 'feishu': - await send_feishu(url, msg) - elif channel_type == 'wecom': - await send_wecom(url, msg) - return {"type": "notify", "channel": channel_type, "ok": True} - except Exception as e: - logger.exception(f"Notification failed for {channel.get('channel')}: {e}") - return {"type": "notify", "channel": channel.get('channel'), "ok": False, "error": str(e)} + @http_retry() + async def _exec_notify(self, channel: dict, msg: str) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """Execute notify with retry logic. Returns (result_dict, retry_metadata).""" + from app.services.notify import send_feishu, send_wecom + channel_type = channel.get('channel') + url = channel.get('url') + + if channel_type == 'feishu': + await send_feishu(url, msg) + elif channel_type == 'wecom': + await send_wecom(url, msg) + return {"type": "notify", "channel": channel_type, "ok": True}, {} + + async def _wrap_retry_task(self, func, *args, **kwargs): + """Wrap retry-enabled task to handle metadata and return standard result format.""" + action = kwargs.pop('action', None) # Remove action from kwargs + result, metadata = await func(*args, **kwargs) + + # Add retry metadata to result dict for logging + if metadata: + result['_retry_attempts'] = metadata.get('attempts', 1) + result['_retry_latency'] = metadata.get('total_latency', 0.0) + + return result async def _return_error(self, type_str, name, err): return {"type": type_str, "target" if type_str == 'forward' else "channel": name, "ok": False, "error": err} diff --git a/app/utils/retry.py b/app/utils/retry.py new file mode 100644 index 0000000..e2be82c --- /dev/null +++ b/app/utils/retry.py @@ -0,0 +1,99 @@ +""" +Async retry decorator with exponential backoff and configurable parameters. +""" +import asyncio +import logging +from functools import wraps +from typing import Callable, Any, Optional +import time +import os + +logger = logging.getLogger(__name__) + +def async_retry( + max_attempts: int = 3, + initial_delay: float = 1.0, + backoff_factor: float = 2.0, + max_delay: float = 60.0, + retry_on: tuple = (Exception,), + jitter: bool = True +): + """ + Decorator for async functions that implements exponential backoff retry logic. + + Args: + max_attempts: Maximum number of retry attempts (including initial call) + initial_delay: Initial delay in seconds before first retry + backoff_factor: Factor by which delay increases each retry + max_delay: Maximum delay between retries + retry_on: Tuple of exception types to retry on + jitter: Add random jitter to delay to prevent thundering herd + """ + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(*args, **kwargs) -> tuple[Any, dict]: + """ + Returns: + tuple: (result, metadata_dict) + metadata_dict contains: attempts, total_latency, last_error + """ + last_error = None + start_time = time.time() + + for attempt in range(max_attempts): + try: + result = await func(*args, **kwargs) + total_latency = time.time() - start_time + return result, { + 'attempts': attempt + 1, + 'total_latency': round(total_latency, 3), + 'last_error': None, + 'success': True + } + except retry_on as e: + last_error = str(e) + if attempt < max_attempts - 1: # Don't sleep after last attempt + delay = min(initial_delay * (backoff_factor ** attempt), max_delay) + if jitter: + # Add random jitter (±25% of delay) + import random + jitter_range = delay * 0.25 + delay += random.uniform(-jitter_range, jitter_range) + + logger.warning(f"Attempt {attempt + 1}/{max_attempts} failed for {func.__name__}: {e}. Retrying in {delay:.2f}s") + await asyncio.sleep(delay) + else: + logger.error(f"All {max_attempts} attempts failed for {func.__name__}: {e}") + + total_latency = time.time() - start_time + return None, { + 'attempts': max_attempts, + 'total_latency': round(total_latency, 3), + 'last_error': last_error, + 'success': False + } + + return wrapper + return decorator + + +# Configuration from environment +def get_retry_config(): + """Get retry configuration from environment variables.""" + return { + 'max_attempts': int(os.getenv('RETRY_MAX_ATTEMPTS', '3')), + 'initial_delay': float(os.getenv('RETRY_INITIAL_DELAY', '1.0')), + 'backoff_factor': float(os.getenv('RETRY_BACKOFF_FACTOR', '2.0')), + 'max_delay': float(os.getenv('RETRY_MAX_DELAY', '30.0')), + } + + +# Pre-configured decorators for common use cases +def http_retry(**kwargs): + """Retry decorator specifically for HTTP operations.""" + config = get_retry_config() + config.update(kwargs) + return async_retry( + retry_on=(Exception,), # Retry on any exception for HTTP calls + **config + ) diff --git a/tests/test_retry.py b/tests/test_retry.py new file mode 100644 index 0000000..f4dd525 --- /dev/null +++ b/tests/test_retry.py @@ -0,0 +1,121 @@ +import pytest +import asyncio +from unittest.mock import AsyncMock, patch +from app.utils.retry import async_retry, http_retry, get_retry_config + + +class TestAsyncRetry: + @pytest.mark.asyncio + async def test_success_on_first_attempt(self): + """Test that function succeeds on first attempt returns immediately.""" + + @async_retry(max_attempts=3) + async def test_func(): + return "success" + + result, metadata = await test_func() + + assert result == "success" + assert metadata["attempts"] == 1 + assert metadata["success"] is True + assert metadata["last_error"] is None + assert "total_latency" in metadata + + @pytest.mark.asyncio + async def test_retry_on_failure_then_success(self): + """Test retry mechanism when function fails then succeeds.""" + + call_count = 0 + + @async_retry(max_attempts=3, initial_delay=0.01) + async def test_func(): + nonlocal call_count + call_count += 1 + if call_count < 2: + raise ValueError("Temporary failure") + return "success" + + result, metadata = await test_func() + + assert result == "success" + assert metadata["attempts"] == 2 + assert metadata["success"] is True + assert metadata["last_error"] is None + + @pytest.mark.asyncio + async def test_all_attempts_fail(self): + """Test behavior when all retry attempts fail.""" + + @async_retry(max_attempts=2, initial_delay=0.01) + async def test_func(): + raise ConnectionError("Persistent failure") + + result, metadata = await test_func() + + assert result is None + assert metadata["attempts"] == 2 + assert metadata["success"] is False + assert metadata["last_error"] == "Persistent failure" + + @pytest.mark.asyncio + async def test_retry_on_specific_exceptions_only_value_error_retried(self): + """Test that only ValueError triggers retry, RuntimeError does not.""" + + call_count = 0 + + @async_retry(max_attempts=3, retry_on=(ValueError,), initial_delay=0.01) + async def test_func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise ValueError("Retry this") + elif call_count == 2: + raise RuntimeError("Don't retry this") + return "success" + + # This should raise RuntimeError directly without retry + with pytest.raises(RuntimeError, match="Don't retry this"): + await test_func() + + @pytest.mark.asyncio + async def test_backoff_and_jitter(self): + """Test exponential backoff with jitter.""" + + @async_retry(max_attempts=4, initial_delay=0.1, backoff_factor=2, jitter=True) + async def test_func(): + raise ConnectionError("Always fail") + + start_time = asyncio.get_event_loop().time() + result, metadata = await test_func() + end_time = asyncio.get_event_loop().time() + + # Should take at least some time due to retries and delays + assert end_time - start_time >= 0.1 # At least initial delay + assert metadata["attempts"] == 4 + assert metadata["success"] is False + + def test_http_retry_decorator(self): + """Test the http_retry convenience decorator.""" + + @http_retry(max_attempts=5) + async def test_func(): + return "ok" + + assert hasattr(test_func, '__name__') + # Should be awaitable + assert asyncio.iscoroutinefunction(test_func) + + def test_get_retry_config(self): + """Test configuration loading from environment.""" + config = get_retry_config() + + assert "max_attempts" in config + assert "initial_delay" in config + assert "backoff_factor" in config + assert "max_delay" in config + + # Test with environment override + with patch.dict('os.environ', {'RETRY_MAX_ATTEMPTS': '5', 'RETRY_INITIAL_DELAY': '2.0'}): + config = get_retry_config() + assert config["max_attempts"] == 5 + assert config["initial_delay"] == 2.0