Merge pull request 'feat/retry-mechanism' (#1) from feat/retry-mechanism into main

Reviewed-on: #1
This commit is contained in:
侯欢 2025-12-24 11:07:22 +08:00
commit d75101bb0f
10 changed files with 429 additions and 41 deletions

View File

@ -1,4 +1,4 @@
from fastapi import APIRouter, Request, Form, Depends, HTTPException
from fastapi import APIRouter, Request, Form, Depends, HTTPException, Body
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session, joinedload, aliased
@ -7,6 +7,7 @@ import json
from app.db import SessionLocal, Target, NotificationChannel, MessageTemplate, WebhookEndpoint, RequestLog, DeliveryLog, ProcessingRule, RuleAction
from app.services.stats import stats_service
from app.services.engine import engine
from app.templates import safe_render
router = APIRouter(prefix="/admin", tags=["admin"])
templates = Jinja2Templates(directory="templates")
@ -424,6 +425,31 @@ async def delete_template(id: int = Form(...), db: Session = Depends(get_db)):
db.commit()
return RedirectResponse(url="/admin/templates", status_code=303)
@router.post("/templates/preview")
async def preview_template(data: dict = Body(...), db: Session = Depends(get_db)):
"""
Preview a template with an optional sample payload.
Request JSON: { "template_content": "...", "sample_payload": {...}, "vars": {...} }
"""
template_content = data.get("template_content")
if not template_content:
return JSONResponse({"error": "template_content is required"}, status_code=400)
sample_payload = data.get("sample_payload") or {}
# Build render context using engine's flatten helper if available
try:
render_context = engine._flatten_payload(sample_payload) if hasattr(engine, "_flatten_payload") else dict(sample_payload)
# Merge any provided template vars
extra_vars = data.get("vars") or {}
if isinstance(extra_vars, dict):
render_context.update(extra_vars)
rendered = safe_render(template_content, render_context)
return JSONResponse({"rendered": rendered})
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=400)
# --- Logs ---
@router.get("/logs", response_class=HTMLResponse)
async def list_logs(request: Request, db: Session = Depends(get_db)):

View File

@ -91,6 +91,8 @@ class DeliveryLog(Base):
type = Column(String) # relay, notify
status = Column(String) # success, failed
response_summary = Column(Text, nullable=True)
attempt_count = Column(Integer, default=1) # Number of attempts made
latency_seconds = Column(Float, nullable=True) # Total latency for all attempts
created_at = Column(DateTime, default=datetime.utcnow)
request_log = relationship("RequestLog", back_populates="delivery_logs")

View File

@ -1,10 +1,11 @@
import asyncio
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi import FastAPI, Request, BackgroundTasks, Body
from fastapi.responses import JSONResponse
from app.logging import get_logger
from app.admin import router as admin_router
from app.db import SessionLocal, WebhookEndpoint, RequestLog, DeliveryLog, init_db
from app.services.engine import engine
from app.models import IncomingPayload
from contextlib import asynccontextmanager
logger = get_logger("app")
@ -42,7 +43,9 @@ def save_logs(namespace: str, payload_dict: dict, routed: list, notified: list):
target_name=r.get("target"),
type="relay",
status="success" if r.get("ok") else "failed",
response_summary=str(r.get("error") or "OK")
response_summary=str(r.get("error") or "OK"),
attempt_count=r.get("_retry_attempts", 1),
latency_seconds=r.get("_retry_latency", 0.0)
))
for n in notified:
@ -51,7 +54,9 @@ def save_logs(namespace: str, payload_dict: dict, routed: list, notified: list):
target_name=n.get("channel"),
type="notify",
status="success" if n.get("ok") else "failed",
response_summary=str(n.get("error") or "OK")
response_summary=str(n.get("error") or "OK"),
attempt_count=n.get("_retry_attempts", 1),
latency_seconds=n.get("_retry_latency", 0.0)
))
db.commit()
@ -64,7 +69,7 @@ async def health():
return {"status": "ok"}
@app.post("/webhook/{namespace}")
async def webhook(namespace: str, request: Request, background_tasks: BackgroundTasks):
async def webhook(namespace: str, payload: IncomingPayload = Body(...), background_tasks: BackgroundTasks = BackgroundTasks()):
db = SessionLocal()
endpoint = db.query(WebhookEndpoint).filter(WebhookEndpoint.namespace == namespace).first()
@ -82,16 +87,17 @@ async def webhook(namespace: str, request: Request, background_tasks: Background
endpoint_id = endpoint.id
db.close()
# payload is validated by Pydantic; convert to plain dict for engine
try:
body = await request.json()
body_dict = payload.model_dump()
except Exception:
return JSONResponse({"error": "Invalid JSON"}, status_code=400)
return JSONResponse({"error": "Invalid payload"}, status_code=400)
# Use new engine
routed, notified = await engine.process(endpoint_id, body)
routed, notified = await engine.process(endpoint_id, body_dict)
# Async save logs
background_tasks.add_task(save_logs, namespace, body, routed, notified)
background_tasks.add_task(save_logs, namespace, body_dict, routed, notified)
result = {
"namespace": namespace,

View File

@ -1,8 +1,10 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
import asyncio
import re
from app.db import SessionLocal, ProcessingRule, RuleAction, Target, NotificationChannel, MessageTemplate
from app.logging import get_logger
from app.utils.retry import http_retry
from app.templates import safe_render
logger = get_logger("engine")
@ -82,7 +84,7 @@ class RuleEngine:
for action in rule.actions:
if action.action_type == 'forward' and action.target:
t_dict = {"name": action.target.name, "url": action.target.url, "timeout_ms": action.target.timeout_ms}
tasks.append(self._exec_forward(t_dict, payload))
tasks.append(self._wrap_retry_task(self._exec_forward, t_dict, payload, action=action))
elif action.action_type == 'notify':
# Check if we have a valid channel
@ -100,10 +102,11 @@ class RuleEngine:
render_context = self._flatten_payload(payload)
render_context.update(current_context["vars"])
msg = template_content.format(**render_context)
# Use safe Jinja2 rendering (supports legacy {var} by conversion)
msg = safe_render(template_content, render_context)
c_dict = {"channel": action.channel.channel_type, "url": action.channel.webhook_url}
tasks.append(self._exec_notify(c_dict, msg))
tasks.append(self._wrap_retry_task(self._exec_notify, c_dict, msg, action=action))
except Exception as e:
logger.exception(f"Template render failed for action {action.id}: {e}")
tasks.append(self._return_error("notify", action.channel.name, str(e)))
@ -197,30 +200,39 @@ class RuleEngine:
return out
async def _exec_forward(self, target: dict, payload: dict):
try:
import httpx
async with httpx.AsyncClient() as client:
resp = await client.post(target['url'], json=payload, timeout=target.get('timeout_ms', 5000)/1000)
resp.raise_for_status()
return {"type": "forward", "target": target['name'], "ok": True}
except Exception as e:
return {"type": "forward", "target": target['name'], "ok": False, "error": str(e)}
@http_retry()
async def _exec_forward(self, target: dict, payload: dict) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Execute forward with retry logic. Returns (result_dict, retry_metadata)."""
import httpx
async with httpx.AsyncClient() as client:
resp = await client.post(target['url'], json=payload, timeout=target.get('timeout_ms', 5000)/1000)
resp.raise_for_status()
return {"type": "forward", "target": target['name'], "ok": True}, {}
async def _exec_notify(self, channel: dict, msg: str):
try:
from app.services.notify import send_feishu, send_wecom
channel_type = channel.get('channel')
url = channel.get('url')
@http_retry()
async def _exec_notify(self, channel: dict, msg: str) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Execute notify with retry logic. Returns (result_dict, retry_metadata)."""
from app.services.notify import send_feishu, send_wecom
channel_type = channel.get('channel')
url = channel.get('url')
if channel_type == 'feishu':
await send_feishu(url, msg)
elif channel_type == 'wecom':
await send_wecom(url, msg)
return {"type": "notify", "channel": channel_type, "ok": True}
except Exception as e:
logger.exception(f"Notification failed for {channel.get('channel')}: {e}")
return {"type": "notify", "channel": channel.get('channel'), "ok": False, "error": str(e)}
if channel_type == 'feishu':
await send_feishu(url, msg)
elif channel_type == 'wecom':
await send_wecom(url, msg)
return {"type": "notify", "channel": channel_type, "ok": True}, {}
async def _wrap_retry_task(self, func, *args, **kwargs):
"""Wrap retry-enabled task to handle metadata and return standard result format."""
action = kwargs.pop('action', None) # Remove action from kwargs
result, metadata = await func(*args, **kwargs)
# Add retry metadata to result dict for logging
if metadata:
result['_retry_attempts'] = metadata.get('attempts', 1)
result['_retry_latency'] = metadata.get('total_latency', 0.0)
return result
async def _return_error(self, type_str, name, err):
return {"type": type_str, "target" if type_str == 'forward' else "channel": name, "ok": False, "error": err}

43
app/templates.py Normal file
View File

@ -0,0 +1,43 @@
import re
from typing import Any, Dict
import jinja2
# Create a minimal Jinja2 environment for text templates.
# Use StrictUndefined so missing variables raise exceptions during rendering.
ENV = jinja2.Environment(
undefined=jinja2.StrictUndefined,
autoescape=False,
)
def _convert_braced_format_to_jinja(template: str) -> str:
"""
Convert simple `{var}` style placeholders into Jinja `{{ var }}` so legacy templates keep working.
This will not touch existing `{{ }}` or `{% %}`.
"""
# Skip conversion if template already contains jinja markers
if ("{{" in template) or ("{%" in template):
return template
# Replace single-brace simple identifiers like {a_b} or {a.b} -> {{ a_b }} or {{ a.b }}
pattern = re.compile(r'(?<!\{)\{([A-Za-z0-9_\.]+)\}(?!\})')
return pattern.sub(r'{{ \1 }}', template)
def safe_render(template_str: str, context: Dict[str, Any]) -> str:
"""
Safely render a template string using Jinja2 with StrictUndefined.
- Accepts legacy `{var}` placeholders by converting them to Jinja style.
- Raises jinja2 exceptions when rendering fails.
"""
if template_str is None:
raise ValueError("template_str is None")
tpl = _convert_braced_format_to_jinja(template_str)
jtpl = ENV.from_string(tpl)
# Jinja expects normal Python types in context; if context contains Pydantic models,
# they should be converted by caller to plain dicts.
return jtpl.render(**context)

99
app/utils/retry.py Normal file
View File

@ -0,0 +1,99 @@
"""
Async retry decorator with exponential backoff and configurable parameters.
"""
import asyncio
import logging
from functools import wraps
from typing import Callable, Any, Optional
import time
import os
logger = logging.getLogger(__name__)
def async_retry(
max_attempts: int = 3,
initial_delay: float = 1.0,
backoff_factor: float = 2.0,
max_delay: float = 60.0,
retry_on: tuple = (Exception,),
jitter: bool = True
):
"""
Decorator for async functions that implements exponential backoff retry logic.
Args:
max_attempts: Maximum number of retry attempts (including initial call)
initial_delay: Initial delay in seconds before first retry
backoff_factor: Factor by which delay increases each retry
max_delay: Maximum delay between retries
retry_on: Tuple of exception types to retry on
jitter: Add random jitter to delay to prevent thundering herd
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs) -> tuple[Any, dict]:
"""
Returns:
tuple: (result, metadata_dict)
metadata_dict contains: attempts, total_latency, last_error
"""
last_error = None
start_time = time.time()
for attempt in range(max_attempts):
try:
result = await func(*args, **kwargs)
total_latency = time.time() - start_time
return result, {
'attempts': attempt + 1,
'total_latency': round(total_latency, 3),
'last_error': None,
'success': True
}
except retry_on as e:
last_error = str(e)
if attempt < max_attempts - 1: # Don't sleep after last attempt
delay = min(initial_delay * (backoff_factor ** attempt), max_delay)
if jitter:
# Add random jitter (±25% of delay)
import random
jitter_range = delay * 0.25
delay += random.uniform(-jitter_range, jitter_range)
logger.warning(f"Attempt {attempt + 1}/{max_attempts} failed for {func.__name__}: {e}. Retrying in {delay:.2f}s")
await asyncio.sleep(delay)
else:
logger.error(f"All {max_attempts} attempts failed for {func.__name__}: {e}")
total_latency = time.time() - start_time
return None, {
'attempts': max_attempts,
'total_latency': round(total_latency, 3),
'last_error': last_error,
'success': False
}
return wrapper
return decorator
# Configuration from environment
def get_retry_config():
"""Get retry configuration from environment variables."""
return {
'max_attempts': int(os.getenv('RETRY_MAX_ATTEMPTS', '3')),
'initial_delay': float(os.getenv('RETRY_INITIAL_DELAY', '1.0')),
'backoff_factor': float(os.getenv('RETRY_BACKOFF_FACTOR', '2.0')),
'max_delay': float(os.getenv('RETRY_MAX_DELAY', '30.0')),
}
# Pre-configured decorators for common use cases
def http_retry(**kwargs):
"""Retry decorator specifically for HTTP operations."""
config = get_retry_config()
config.update(kwargs)
return async_retry(
retry_on=(Exception,), # Retry on any exception for HTTP calls
**config
)

View File

@ -67,9 +67,14 @@
<label class="form-label">模板内容</label>
<textarea class="form-control" name="template_content" id="templateContent" rows="3" required placeholder="收到{trans_amt}元"></textarea>
</div>
<div class="mb-3">
<label class="form-label">示例 Payload (JSON, 可选,用于预览)</label>
<textarea class="form-control font-monospace" id="templateSample" rows="4" placeholder='{"trans_amt": 100, "trans_order_info": {"remark": "abc"}}'></textarea>
</div>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
<button type="button" class="btn btn-outline-secondary" onclick="previewTemplate()">预览</button>
<button type="submit" class="btn btn-primary">保存</button>
</div>
</div>
@ -88,6 +93,7 @@
document.getElementById('templateId').value = "";
document.getElementById('templateName').value = "";
document.getElementById('templateContent').value = "";
document.getElementById('templateSample').value = '';
modal.show();
}
@ -97,7 +103,38 @@
document.getElementById('templateId').value = id;
document.getElementById('templateName').value = name;
document.getElementById('templateContent').value = content;
document.getElementById('templateSample').value = '';
modal.show();
}
async function previewTemplate() {
const content = document.getElementById('templateContent').value;
let sample = {};
try {
const sampleText = document.getElementById('templateSample').value;
if (sampleText && sampleText.trim()) {
sample = JSON.parse(sampleText);
}
} catch (e) {
alert('示例 payload JSON 格式错误: ' + e);
return;
}
try {
const res = await fetch('/admin/templates/preview', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ template_content: content, sample_payload: sample })
});
const data = await res.json();
if (res.ok) {
alert('渲染结果:\\n' + data.rendered);
} else {
alert('预览失败: ' + (data.error || '未知错误'));
}
} catch (e) {
alert('请求错误: ' + e);
}
}
</script>
{% endblock %}

View File

@ -0,0 +1,23 @@
from fastapi.testclient import TestClient
from app.main import app
def test_preview_endpoint_success():
client = TestClient(app)
payload = {
"template_content": "ok {trans_amt}",
"sample_payload": {"trans_amt": 123}
}
resp = client.post("/admin/templates/preview", json=payload)
assert resp.status_code == 200
assert resp.json().get("rendered") == "ok 123"
def test_preview_endpoint_strict_failure():
client = TestClient(app)
# Template refers to undefined variable -> StrictUndefined should cause error -> 400
payload = {"template_content": "{{ undefined_var }}", "sample_payload": {}}
resp = client.post("/admin/templates/preview", json=payload)
assert resp.status_code == 400

121
tests/test_retry.py Normal file
View File

@ -0,0 +1,121 @@
import pytest
import asyncio
from unittest.mock import AsyncMock, patch
from app.utils.retry import async_retry, http_retry, get_retry_config
class TestAsyncRetry:
@pytest.mark.asyncio
async def test_success_on_first_attempt(self):
"""Test that function succeeds on first attempt returns immediately."""
@async_retry(max_attempts=3)
async def test_func():
return "success"
result, metadata = await test_func()
assert result == "success"
assert metadata["attempts"] == 1
assert metadata["success"] is True
assert metadata["last_error"] is None
assert "total_latency" in metadata
@pytest.mark.asyncio
async def test_retry_on_failure_then_success(self):
"""Test retry mechanism when function fails then succeeds."""
call_count = 0
@async_retry(max_attempts=3, initial_delay=0.01)
async def test_func():
nonlocal call_count
call_count += 1
if call_count < 2:
raise ValueError("Temporary failure")
return "success"
result, metadata = await test_func()
assert result == "success"
assert metadata["attempts"] == 2
assert metadata["success"] is True
assert metadata["last_error"] is None
@pytest.mark.asyncio
async def test_all_attempts_fail(self):
"""Test behavior when all retry attempts fail."""
@async_retry(max_attempts=2, initial_delay=0.01)
async def test_func():
raise ConnectionError("Persistent failure")
result, metadata = await test_func()
assert result is None
assert metadata["attempts"] == 2
assert metadata["success"] is False
assert metadata["last_error"] == "Persistent failure"
@pytest.mark.asyncio
async def test_retry_on_specific_exceptions_only_value_error_retried(self):
"""Test that only ValueError triggers retry, RuntimeError does not."""
call_count = 0
@async_retry(max_attempts=3, retry_on=(ValueError,), initial_delay=0.01)
async def test_func():
nonlocal call_count
call_count += 1
if call_count == 1:
raise ValueError("Retry this")
elif call_count == 2:
raise RuntimeError("Don't retry this")
return "success"
# This should raise RuntimeError directly without retry
with pytest.raises(RuntimeError, match="Don't retry this"):
await test_func()
@pytest.mark.asyncio
async def test_backoff_and_jitter(self):
"""Test exponential backoff with jitter."""
@async_retry(max_attempts=4, initial_delay=0.1, backoff_factor=2, jitter=True)
async def test_func():
raise ConnectionError("Always fail")
start_time = asyncio.get_event_loop().time()
result, metadata = await test_func()
end_time = asyncio.get_event_loop().time()
# Should take at least some time due to retries and delays
assert end_time - start_time >= 0.1 # At least initial delay
assert metadata["attempts"] == 4
assert metadata["success"] is False
def test_http_retry_decorator(self):
"""Test the http_retry convenience decorator."""
@http_retry(max_attempts=5)
async def test_func():
return "ok"
assert hasattr(test_func, '__name__')
# Should be awaitable
assert asyncio.iscoroutinefunction(test_func)
def test_get_retry_config(self):
"""Test configuration loading from environment."""
config = get_retry_config()
assert "max_attempts" in config
assert "initial_delay" in config
assert "backoff_factor" in config
assert "max_delay" in config
# Test with environment override
with patch.dict('os.environ', {'RETRY_MAX_ATTEMPTS': '5', 'RETRY_INITIAL_DELAY': '2.0'}):
config = get_retry_config()
assert config["max_attempts"] == 5
assert config["initial_delay"] == 2.0

19
tests/test_templates.py Normal file
View File

@ -0,0 +1,19 @@
import pytest
from app.templates import safe_render
def test_safe_render_legacy_and_nested():
tpl = "收到{actual_ref_amt}元 by {trans_order_info.remark}"
ctx = {"actual_ref_amt": 88.88, "trans_order_info": {"remark": "order123"}}
out = safe_render(tpl, ctx)
assert "88.88" in out
assert "order123" in out
def test_safe_render_missing_var_raises():
tpl = "Hello {missing_var}"
with pytest.raises(Exception):
safe_render(tpl, {})