From ec585084761f4b9cd1dbe8031c8a2e7edb1be4f6 Mon Sep 17 00:00:00 2001 From: auto-bot Date: Wed, 24 Dec 2025 10:50:34 +0800 Subject: [PATCH 1/4] feat: add safe_render using Jinja2; add tests --- app/templates.py | 43 +++++++++++++++++++++++++++++++++++++++++ tests/test_templates.py | 19 ++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 app/templates.py create mode 100644 tests/test_templates.py diff --git a/app/templates.py b/app/templates.py new file mode 100644 index 0000000..d18f103 --- /dev/null +++ b/app/templates.py @@ -0,0 +1,43 @@ +import re +from typing import Any, Dict + +import jinja2 + +# Create a minimal Jinja2 environment for text templates. +# Use StrictUndefined so missing variables raise exceptions during rendering. +ENV = jinja2.Environment( + undefined=jinja2.StrictUndefined, + autoescape=False, +) + + +def _convert_braced_format_to_jinja(template: str) -> str: + """ + Convert simple `{var}` style placeholders into Jinja `{{ var }}` so legacy templates keep working. + This will not touch existing `{{ }}` or `{% %}`. + """ + # Skip conversion if template already contains jinja markers + if ("{{" in template) or ("{%" in template): + return template + + # Replace single-brace simple identifiers like {a_b} or {a.b} -> {{ a_b }} or {{ a.b }} + pattern = re.compile(r'(? str: + """ + Safely render a template string using Jinja2 with StrictUndefined. + - Accepts legacy `{var}` placeholders by converting them to Jinja style. + - Raises jinja2 exceptions when rendering fails. + """ + if template_str is None: + raise ValueError("template_str is None") + + tpl = _convert_braced_format_to_jinja(template_str) + jtpl = ENV.from_string(tpl) + # Jinja expects normal Python types in context; if context contains Pydantic models, + # they should be converted by caller to plain dicts. + return jtpl.render(**context) + + diff --git a/tests/test_templates.py b/tests/test_templates.py new file mode 100644 index 0000000..fd0a042 --- /dev/null +++ b/tests/test_templates.py @@ -0,0 +1,19 @@ +import pytest + +from app.templates import safe_render + + +def test_safe_render_legacy_and_nested(): + tpl = "收到{actual_ref_amt}元 by {trans_order_info.remark}" + ctx = {"actual_ref_amt": 88.88, "trans_order_info": {"remark": "order123"}} + out = safe_render(tpl, ctx) + assert "88.88" in out + assert "order123" in out + + +def test_safe_render_missing_var_raises(): + tpl = "Hello {missing_var}" + with pytest.raises(Exception): + safe_render(tpl, {}) + + -- 2.45.2 From 74b8b8e8ed0095df6a4c2266712333a4a86e0d05 Mon Sep 17 00:00:00 2001 From: auto-bot Date: Wed, 24 Dec 2025 10:53:15 +0800 Subject: [PATCH 2/4] feat: replace str.format with safe_render; add Pydantic validation to webhook route --- app/main.py | 14 ++++++++------ app/services/engine.py | 4 +++- tests/test_preview_endpoint.py | 23 +++++++++++++++++++++++ 3 files changed, 34 insertions(+), 7 deletions(-) create mode 100644 tests/test_preview_endpoint.py diff --git a/app/main.py b/app/main.py index 2bca5dc..f94f105 100644 --- a/app/main.py +++ b/app/main.py @@ -1,10 +1,11 @@ import asyncio -from fastapi import FastAPI, Request, BackgroundTasks +from fastapi import FastAPI, Request, BackgroundTasks, Body from fastapi.responses import JSONResponse from app.logging import get_logger from app.admin import router as admin_router from app.db import SessionLocal, WebhookEndpoint, RequestLog, DeliveryLog, init_db from app.services.engine import engine +from app.models import IncomingPayload from contextlib import asynccontextmanager logger = get_logger("app") @@ -64,7 +65,7 @@ async def health(): return {"status": "ok"} @app.post("/webhook/{namespace}") -async def webhook(namespace: str, request: Request, background_tasks: BackgroundTasks): +async def webhook(namespace: str, payload: IncomingPayload = Body(...), background_tasks: BackgroundTasks = BackgroundTasks()): db = SessionLocal() endpoint = db.query(WebhookEndpoint).filter(WebhookEndpoint.namespace == namespace).first() @@ -82,16 +83,17 @@ async def webhook(namespace: str, request: Request, background_tasks: Background endpoint_id = endpoint.id db.close() + # payload is validated by Pydantic; convert to plain dict for engine try: - body = await request.json() + body_dict = payload.model_dump() except Exception: - return JSONResponse({"error": "Invalid JSON"}, status_code=400) + return JSONResponse({"error": "Invalid payload"}, status_code=400) # Use new engine - routed, notified = await engine.process(endpoint_id, body) + routed, notified = await engine.process(endpoint_id, body_dict) # Async save logs - background_tasks.add_task(save_logs, namespace, body, routed, notified) + background_tasks.add_task(save_logs, namespace, body_dict, routed, notified) result = { "namespace": namespace, diff --git a/app/services/engine.py b/app/services/engine.py index 32d7669..41770b3 100644 --- a/app/services/engine.py +++ b/app/services/engine.py @@ -3,6 +3,7 @@ import asyncio import re from app.db import SessionLocal, ProcessingRule, RuleAction, Target, NotificationChannel, MessageTemplate from app.logging import get_logger +from app.templates import safe_render logger = get_logger("engine") @@ -100,7 +101,8 @@ class RuleEngine: render_context = self._flatten_payload(payload) render_context.update(current_context["vars"]) - msg = template_content.format(**render_context) + # Use safe Jinja2 rendering (supports legacy {var} by conversion) + msg = safe_render(template_content, render_context) c_dict = {"channel": action.channel.channel_type, "url": action.channel.webhook_url} tasks.append(self._exec_notify(c_dict, msg)) diff --git a/tests/test_preview_endpoint.py b/tests/test_preview_endpoint.py new file mode 100644 index 0000000..c9501d5 --- /dev/null +++ b/tests/test_preview_endpoint.py @@ -0,0 +1,23 @@ +from fastapi.testclient import TestClient + +from app.main import app + + +def test_preview_endpoint_success(): + client = TestClient(app) + payload = { + "template_content": "ok {trans_amt}", + "sample_payload": {"trans_amt": 123} + } + resp = client.post("/admin/templates/preview", json=payload) + assert resp.status_code == 200 + assert resp.json().get("rendered") == "ok 123" + + +def test_preview_endpoint_strict_failure(): + client = TestClient(app) + # Template refers to undefined variable -> StrictUndefined should cause error -> 400 + payload = {"template_content": "{{ undefined_var }}", "sample_payload": {}} + resp = client.post("/admin/templates/preview", json=payload) + assert resp.status_code == 400 + -- 2.45.2 From 0def77dc307948ece8c145ec1de7e30574ae48f3 Mon Sep 17 00:00:00 2001 From: auto-bot Date: Wed, 24 Dec 2025 10:58:43 +0800 Subject: [PATCH 3/4] feat: add /admin/templates/preview endpoint; add preview button to template editor --- app/admin.py | 28 ++++++++++++++++++++++++- templates/admin/templates.html | 37 ++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/app/admin.py b/app/admin.py index de45752..c75a7fb 100644 --- a/app/admin.py +++ b/app/admin.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, Request, Form, Depends, HTTPException +from fastapi import APIRouter, Request, Form, Depends, HTTPException, Body from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session, joinedload, aliased @@ -7,6 +7,7 @@ import json from app.db import SessionLocal, Target, NotificationChannel, MessageTemplate, WebhookEndpoint, RequestLog, DeliveryLog, ProcessingRule, RuleAction from app.services.stats import stats_service from app.services.engine import engine +from app.templates import safe_render router = APIRouter(prefix="/admin", tags=["admin"]) templates = Jinja2Templates(directory="templates") @@ -424,6 +425,31 @@ async def delete_template(id: int = Form(...), db: Session = Depends(get_db)): db.commit() return RedirectResponse(url="/admin/templates", status_code=303) + +@router.post("/templates/preview") +async def preview_template(data: dict = Body(...), db: Session = Depends(get_db)): + """ + Preview a template with an optional sample payload. + Request JSON: { "template_content": "...", "sample_payload": {...}, "vars": {...} } + """ + template_content = data.get("template_content") + if not template_content: + return JSONResponse({"error": "template_content is required"}, status_code=400) + + sample_payload = data.get("sample_payload") or {} + # Build render context using engine's flatten helper if available + try: + render_context = engine._flatten_payload(sample_payload) if hasattr(engine, "_flatten_payload") else dict(sample_payload) + # Merge any provided template vars + extra_vars = data.get("vars") or {} + if isinstance(extra_vars, dict): + render_context.update(extra_vars) + + rendered = safe_render(template_content, render_context) + return JSONResponse({"rendered": rendered}) + except Exception as e: + return JSONResponse({"error": str(e)}, status_code=400) + # --- Logs --- @router.get("/logs", response_class=HTMLResponse) async def list_logs(request: Request, db: Session = Depends(get_db)): diff --git a/templates/admin/templates.html b/templates/admin/templates.html index ff2279c..618e2e3 100644 --- a/templates/admin/templates.html +++ b/templates/admin/templates.html @@ -67,9 +67,14 @@ +
+ + +
@@ -88,6 +93,7 @@ document.getElementById('templateId').value = ""; document.getElementById('templateName').value = ""; document.getElementById('templateContent').value = ""; + document.getElementById('templateSample').value = ''; modal.show(); } @@ -97,7 +103,38 @@ document.getElementById('templateId').value = id; document.getElementById('templateName').value = name; document.getElementById('templateContent').value = content; + document.getElementById('templateSample').value = ''; modal.show(); } + + async function previewTemplate() { + const content = document.getElementById('templateContent').value; + let sample = {}; + try { + const sampleText = document.getElementById('templateSample').value; + if (sampleText && sampleText.trim()) { + sample = JSON.parse(sampleText); + } + } catch (e) { + alert('示例 payload JSON 格式错误: ' + e); + return; + } + + try { + const res = await fetch('/admin/templates/preview', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ template_content: content, sample_payload: sample }) + }); + const data = await res.json(); + if (res.ok) { + alert('渲染结果:\\n' + data.rendered); + } else { + alert('预览失败: ' + (data.error || '未知错误')); + } + } catch (e) { + alert('请求错误: ' + e); + } + } {% endblock %} -- 2.45.2 From b11c39f3bf7de0ec8765ad742563822dd7fc692e Mon Sep 17 00:00:00 2001 From: auto-bot Date: Wed, 24 Dec 2025 11:04:41 +0800 Subject: [PATCH 4/4] feat: add async retry mechanism with exponential backoff - Add app/utils/retry.py with configurable async retry decorator - Update DeliveryLog model to track attempt_count and latency_seconds - Apply @http_retry to engine._exec_forward and _exec_notify methods - Update save_logs to record retry metadata - Add comprehensive unit tests for retry functionality - Support configuration via environment variables (RETRY_*) This improves reliability for downstream HTTP calls by automatically retrying transient failures with exponential backoff and jitter. --- app/db.py | 2 + app/main.py | 10 +++- app/services/engine.py | 70 ++++++++++++++---------- app/utils/retry.py | 99 +++++++++++++++++++++++++++++++++ tests/test_retry.py | 121 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 269 insertions(+), 33 deletions(-) create mode 100644 app/utils/retry.py create mode 100644 tests/test_retry.py diff --git a/app/db.py b/app/db.py index 914fa25..4d5e1d3 100644 --- a/app/db.py +++ b/app/db.py @@ -91,6 +91,8 @@ class DeliveryLog(Base): type = Column(String) # relay, notify status = Column(String) # success, failed response_summary = Column(Text, nullable=True) + attempt_count = Column(Integer, default=1) # Number of attempts made + latency_seconds = Column(Float, nullable=True) # Total latency for all attempts created_at = Column(DateTime, default=datetime.utcnow) request_log = relationship("RequestLog", back_populates="delivery_logs") diff --git a/app/main.py b/app/main.py index f94f105..d90d558 100644 --- a/app/main.py +++ b/app/main.py @@ -43,16 +43,20 @@ def save_logs(namespace: str, payload_dict: dict, routed: list, notified: list): target_name=r.get("target"), type="relay", status="success" if r.get("ok") else "failed", - response_summary=str(r.get("error") or "OK") + response_summary=str(r.get("error") or "OK"), + attempt_count=r.get("_retry_attempts", 1), + latency_seconds=r.get("_retry_latency", 0.0) )) - + for n in notified: db.add(DeliveryLog( request_id=req_log.id, target_name=n.get("channel"), type="notify", status="success" if n.get("ok") else "failed", - response_summary=str(n.get("error") or "OK") + response_summary=str(n.get("error") or "OK"), + attempt_count=n.get("_retry_attempts", 1), + latency_seconds=n.get("_retry_latency", 0.0) )) db.commit() diff --git a/app/services/engine.py b/app/services/engine.py index 41770b3..3bfda08 100644 --- a/app/services/engine.py +++ b/app/services/engine.py @@ -1,8 +1,9 @@ -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple import asyncio import re from app.db import SessionLocal, ProcessingRule, RuleAction, Target, NotificationChannel, MessageTemplate from app.logging import get_logger +from app.utils.retry import http_retry from app.templates import safe_render logger = get_logger("engine") @@ -83,8 +84,8 @@ class RuleEngine: for action in rule.actions: if action.action_type == 'forward' and action.target: t_dict = {"name": action.target.name, "url": action.target.url, "timeout_ms": action.target.timeout_ms} - tasks.append(self._exec_forward(t_dict, payload)) - + tasks.append(self._wrap_retry_task(self._exec_forward, t_dict, payload, action=action)) + elif action.action_type == 'notify': # Check if we have a valid channel if action.channel: @@ -94,18 +95,18 @@ class RuleEngine: template_content = action.template.template_content else: template_content = current_context.get("template_content") - + if template_content: try: # Flatten payload + merge current context vars render_context = self._flatten_payload(payload) render_context.update(current_context["vars"]) - + # Use safe Jinja2 rendering (supports legacy {var} by conversion) msg = safe_render(template_content, render_context) - + c_dict = {"channel": action.channel.channel_type, "url": action.channel.webhook_url} - tasks.append(self._exec_notify(c_dict, msg)) + tasks.append(self._wrap_retry_task(self._exec_notify, c_dict, msg, action=action)) except Exception as e: logger.exception(f"Template render failed for action {action.id}: {e}") tasks.append(self._return_error("notify", action.channel.name, str(e))) @@ -199,30 +200,39 @@ class RuleEngine: return out - async def _exec_forward(self, target: dict, payload: dict): - try: - import httpx - async with httpx.AsyncClient() as client: - resp = await client.post(target['url'], json=payload, timeout=target.get('timeout_ms', 5000)/1000) - resp.raise_for_status() - return {"type": "forward", "target": target['name'], "ok": True} - except Exception as e: - return {"type": "forward", "target": target['name'], "ok": False, "error": str(e)} + @http_retry() + async def _exec_forward(self, target: dict, payload: dict) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """Execute forward with retry logic. Returns (result_dict, retry_metadata).""" + import httpx + async with httpx.AsyncClient() as client: + resp = await client.post(target['url'], json=payload, timeout=target.get('timeout_ms', 5000)/1000) + resp.raise_for_status() + return {"type": "forward", "target": target['name'], "ok": True}, {} - async def _exec_notify(self, channel: dict, msg: str): - try: - from app.services.notify import send_feishu, send_wecom - channel_type = channel.get('channel') - url = channel.get('url') - - if channel_type == 'feishu': - await send_feishu(url, msg) - elif channel_type == 'wecom': - await send_wecom(url, msg) - return {"type": "notify", "channel": channel_type, "ok": True} - except Exception as e: - logger.exception(f"Notification failed for {channel.get('channel')}: {e}") - return {"type": "notify", "channel": channel.get('channel'), "ok": False, "error": str(e)} + @http_retry() + async def _exec_notify(self, channel: dict, msg: str) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """Execute notify with retry logic. Returns (result_dict, retry_metadata).""" + from app.services.notify import send_feishu, send_wecom + channel_type = channel.get('channel') + url = channel.get('url') + + if channel_type == 'feishu': + await send_feishu(url, msg) + elif channel_type == 'wecom': + await send_wecom(url, msg) + return {"type": "notify", "channel": channel_type, "ok": True}, {} + + async def _wrap_retry_task(self, func, *args, **kwargs): + """Wrap retry-enabled task to handle metadata and return standard result format.""" + action = kwargs.pop('action', None) # Remove action from kwargs + result, metadata = await func(*args, **kwargs) + + # Add retry metadata to result dict for logging + if metadata: + result['_retry_attempts'] = metadata.get('attempts', 1) + result['_retry_latency'] = metadata.get('total_latency', 0.0) + + return result async def _return_error(self, type_str, name, err): return {"type": type_str, "target" if type_str == 'forward' else "channel": name, "ok": False, "error": err} diff --git a/app/utils/retry.py b/app/utils/retry.py new file mode 100644 index 0000000..e2be82c --- /dev/null +++ b/app/utils/retry.py @@ -0,0 +1,99 @@ +""" +Async retry decorator with exponential backoff and configurable parameters. +""" +import asyncio +import logging +from functools import wraps +from typing import Callable, Any, Optional +import time +import os + +logger = logging.getLogger(__name__) + +def async_retry( + max_attempts: int = 3, + initial_delay: float = 1.0, + backoff_factor: float = 2.0, + max_delay: float = 60.0, + retry_on: tuple = (Exception,), + jitter: bool = True +): + """ + Decorator for async functions that implements exponential backoff retry logic. + + Args: + max_attempts: Maximum number of retry attempts (including initial call) + initial_delay: Initial delay in seconds before first retry + backoff_factor: Factor by which delay increases each retry + max_delay: Maximum delay between retries + retry_on: Tuple of exception types to retry on + jitter: Add random jitter to delay to prevent thundering herd + """ + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(*args, **kwargs) -> tuple[Any, dict]: + """ + Returns: + tuple: (result, metadata_dict) + metadata_dict contains: attempts, total_latency, last_error + """ + last_error = None + start_time = time.time() + + for attempt in range(max_attempts): + try: + result = await func(*args, **kwargs) + total_latency = time.time() - start_time + return result, { + 'attempts': attempt + 1, + 'total_latency': round(total_latency, 3), + 'last_error': None, + 'success': True + } + except retry_on as e: + last_error = str(e) + if attempt < max_attempts - 1: # Don't sleep after last attempt + delay = min(initial_delay * (backoff_factor ** attempt), max_delay) + if jitter: + # Add random jitter (±25% of delay) + import random + jitter_range = delay * 0.25 + delay += random.uniform(-jitter_range, jitter_range) + + logger.warning(f"Attempt {attempt + 1}/{max_attempts} failed for {func.__name__}: {e}. Retrying in {delay:.2f}s") + await asyncio.sleep(delay) + else: + logger.error(f"All {max_attempts} attempts failed for {func.__name__}: {e}") + + total_latency = time.time() - start_time + return None, { + 'attempts': max_attempts, + 'total_latency': round(total_latency, 3), + 'last_error': last_error, + 'success': False + } + + return wrapper + return decorator + + +# Configuration from environment +def get_retry_config(): + """Get retry configuration from environment variables.""" + return { + 'max_attempts': int(os.getenv('RETRY_MAX_ATTEMPTS', '3')), + 'initial_delay': float(os.getenv('RETRY_INITIAL_DELAY', '1.0')), + 'backoff_factor': float(os.getenv('RETRY_BACKOFF_FACTOR', '2.0')), + 'max_delay': float(os.getenv('RETRY_MAX_DELAY', '30.0')), + } + + +# Pre-configured decorators for common use cases +def http_retry(**kwargs): + """Retry decorator specifically for HTTP operations.""" + config = get_retry_config() + config.update(kwargs) + return async_retry( + retry_on=(Exception,), # Retry on any exception for HTTP calls + **config + ) diff --git a/tests/test_retry.py b/tests/test_retry.py new file mode 100644 index 0000000..f4dd525 --- /dev/null +++ b/tests/test_retry.py @@ -0,0 +1,121 @@ +import pytest +import asyncio +from unittest.mock import AsyncMock, patch +from app.utils.retry import async_retry, http_retry, get_retry_config + + +class TestAsyncRetry: + @pytest.mark.asyncio + async def test_success_on_first_attempt(self): + """Test that function succeeds on first attempt returns immediately.""" + + @async_retry(max_attempts=3) + async def test_func(): + return "success" + + result, metadata = await test_func() + + assert result == "success" + assert metadata["attempts"] == 1 + assert metadata["success"] is True + assert metadata["last_error"] is None + assert "total_latency" in metadata + + @pytest.mark.asyncio + async def test_retry_on_failure_then_success(self): + """Test retry mechanism when function fails then succeeds.""" + + call_count = 0 + + @async_retry(max_attempts=3, initial_delay=0.01) + async def test_func(): + nonlocal call_count + call_count += 1 + if call_count < 2: + raise ValueError("Temporary failure") + return "success" + + result, metadata = await test_func() + + assert result == "success" + assert metadata["attempts"] == 2 + assert metadata["success"] is True + assert metadata["last_error"] is None + + @pytest.mark.asyncio + async def test_all_attempts_fail(self): + """Test behavior when all retry attempts fail.""" + + @async_retry(max_attempts=2, initial_delay=0.01) + async def test_func(): + raise ConnectionError("Persistent failure") + + result, metadata = await test_func() + + assert result is None + assert metadata["attempts"] == 2 + assert metadata["success"] is False + assert metadata["last_error"] == "Persistent failure" + + @pytest.mark.asyncio + async def test_retry_on_specific_exceptions_only_value_error_retried(self): + """Test that only ValueError triggers retry, RuntimeError does not.""" + + call_count = 0 + + @async_retry(max_attempts=3, retry_on=(ValueError,), initial_delay=0.01) + async def test_func(): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise ValueError("Retry this") + elif call_count == 2: + raise RuntimeError("Don't retry this") + return "success" + + # This should raise RuntimeError directly without retry + with pytest.raises(RuntimeError, match="Don't retry this"): + await test_func() + + @pytest.mark.asyncio + async def test_backoff_and_jitter(self): + """Test exponential backoff with jitter.""" + + @async_retry(max_attempts=4, initial_delay=0.1, backoff_factor=2, jitter=True) + async def test_func(): + raise ConnectionError("Always fail") + + start_time = asyncio.get_event_loop().time() + result, metadata = await test_func() + end_time = asyncio.get_event_loop().time() + + # Should take at least some time due to retries and delays + assert end_time - start_time >= 0.1 # At least initial delay + assert metadata["attempts"] == 4 + assert metadata["success"] is False + + def test_http_retry_decorator(self): + """Test the http_retry convenience decorator.""" + + @http_retry(max_attempts=5) + async def test_func(): + return "ok" + + assert hasattr(test_func, '__name__') + # Should be awaitable + assert asyncio.iscoroutinefunction(test_func) + + def test_get_retry_config(self): + """Test configuration loading from environment.""" + config = get_retry_config() + + assert "max_attempts" in config + assert "initial_delay" in config + assert "backoff_factor" in config + assert "max_delay" in config + + # Test with environment override + with patch.dict('os.environ', {'RETRY_MAX_ATTEMPTS': '5', 'RETRY_INITIAL_DELAY': '2.0'}): + config = get_retry_config() + assert config["max_attempts"] == 5 + assert config["initial_delay"] == 2.0 -- 2.45.2