diff --git a/app/admin.py b/app/admin.py index de45752..c75a7fb 100644 --- a/app/admin.py +++ b/app/admin.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, Request, Form, Depends, HTTPException +from fastapi import APIRouter, Request, Form, Depends, HTTPException, Body from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session, joinedload, aliased @@ -7,6 +7,7 @@ import json from app.db import SessionLocal, Target, NotificationChannel, MessageTemplate, WebhookEndpoint, RequestLog, DeliveryLog, ProcessingRule, RuleAction from app.services.stats import stats_service from app.services.engine import engine +from app.templates import safe_render router = APIRouter(prefix="/admin", tags=["admin"]) templates = Jinja2Templates(directory="templates") @@ -424,6 +425,31 @@ async def delete_template(id: int = Form(...), db: Session = Depends(get_db)): db.commit() return RedirectResponse(url="/admin/templates", status_code=303) + +@router.post("/templates/preview") +async def preview_template(data: dict = Body(...), db: Session = Depends(get_db)): + """ + Preview a template with an optional sample payload. + Request JSON: { "template_content": "...", "sample_payload": {...}, "vars": {...} } + """ + template_content = data.get("template_content") + if not template_content: + return JSONResponse({"error": "template_content is required"}, status_code=400) + + sample_payload = data.get("sample_payload") or {} + # Build render context using engine's flatten helper if available + try: + render_context = engine._flatten_payload(sample_payload) if hasattr(engine, "_flatten_payload") else dict(sample_payload) + # Merge any provided template vars + extra_vars = data.get("vars") or {} + if isinstance(extra_vars, dict): + render_context.update(extra_vars) + + rendered = safe_render(template_content, render_context) + return JSONResponse({"rendered": rendered}) + except Exception as e: + return JSONResponse({"error": str(e)}, status_code=400) + # --- Logs --- @router.get("/logs", response_class=HTMLResponse) async def list_logs(request: Request, db: Session = Depends(get_db)): diff --git a/app/db.py b/app/db.py index 914fa25..4d5e1d3 100644 --- a/app/db.py +++ b/app/db.py @@ -91,6 +91,8 @@ class DeliveryLog(Base): type = Column(String) # relay, notify status = Column(String) # success, failed response_summary = Column(Text, nullable=True) + attempt_count = Column(Integer, default=1) # Number of attempts made + latency_seconds = Column(Float, nullable=True) # Total latency for all attempts created_at = Column(DateTime, default=datetime.utcnow) request_log = relationship("RequestLog", back_populates="delivery_logs") diff --git a/app/main.py b/app/main.py index 2bca5dc..d90d558 100644 --- a/app/main.py +++ b/app/main.py @@ -1,10 +1,11 @@ import asyncio -from fastapi import FastAPI, Request, BackgroundTasks +from fastapi import FastAPI, Request, BackgroundTasks, Body from fastapi.responses import JSONResponse from app.logging import get_logger from app.admin import router as admin_router from app.db import SessionLocal, WebhookEndpoint, RequestLog, DeliveryLog, init_db from app.services.engine import engine +from app.models import IncomingPayload from contextlib import asynccontextmanager logger = get_logger("app") @@ -42,16 +43,20 @@ def save_logs(namespace: str, payload_dict: dict, routed: list, notified: list): target_name=r.get("target"), type="relay", status="success" if r.get("ok") else "failed", - response_summary=str(r.get("error") or "OK") + response_summary=str(r.get("error") or "OK"), + attempt_count=r.get("_retry_attempts", 1), + latency_seconds=r.get("_retry_latency", 0.0) )) - + for n in notified: db.add(DeliveryLog( request_id=req_log.id, target_name=n.get("channel"), type="notify", status="success" if n.get("ok") else "failed", - response_summary=str(n.get("error") or "OK") + response_summary=str(n.get("error") or "OK"), + attempt_count=n.get("_retry_attempts", 1), + latency_seconds=n.get("_retry_latency", 0.0) )) db.commit() @@ -64,7 +69,7 @@ async def health(): return {"status": "ok"} @app.post("/webhook/{namespace}") -async def webhook(namespace: str, request: Request, background_tasks: BackgroundTasks): +async def webhook(namespace: str, payload: IncomingPayload = Body(...), background_tasks: BackgroundTasks = BackgroundTasks()): db = SessionLocal() endpoint = db.query(WebhookEndpoint).filter(WebhookEndpoint.namespace == namespace).first() @@ -82,16 +87,17 @@ async def webhook(namespace: str, request: Request, background_tasks: Background endpoint_id = endpoint.id db.close() + # payload is validated by Pydantic; convert to plain dict for engine try: - body = await request.json() + body_dict = payload.model_dump() except Exception: - return JSONResponse({"error": "Invalid JSON"}, status_code=400) + return JSONResponse({"error": "Invalid payload"}, status_code=400) # Use new engine - routed, notified = await engine.process(endpoint_id, body) + routed, notified = await engine.process(endpoint_id, body_dict) # Async save logs - background_tasks.add_task(save_logs, namespace, body, routed, notified) + background_tasks.add_task(save_logs, namespace, body_dict, routed, notified) result = { "namespace": namespace, diff --git a/app/services/engine.py b/app/services/engine.py index 32d7669..3bfda08 100644 --- a/app/services/engine.py +++ b/app/services/engine.py @@ -1,8 +1,10 @@ -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple import asyncio import re from app.db import SessionLocal, ProcessingRule, RuleAction, Target, NotificationChannel, MessageTemplate from app.logging import get_logger +from app.utils.retry import http_retry +from app.templates import safe_render logger = get_logger("engine") @@ -82,8 +84,8 @@ class RuleEngine: for action in rule.actions: if action.action_type == 'forward' and action.target: t_dict = {"name": action.target.name, "url": action.target.url, "timeout_ms": action.target.timeout_ms} - tasks.append(self._exec_forward(t_dict, payload)) - + tasks.append(self._wrap_retry_task(self._exec_forward, t_dict, payload, action=action)) + elif action.action_type == 'notify': # Check if we have a valid channel if action.channel: @@ -93,17 +95,18 @@ class RuleEngine: template_content = action.template.template_content else: template_content = current_context.get("template_content") - + if template_content: try: # Flatten payload + merge current context vars render_context = self._flatten_payload(payload) render_context.update(current_context["vars"]) - - msg = template_content.format(**render_context) - + + # Use safe Jinja2 rendering (supports legacy {var} by conversion) + msg = safe_render(template_content, render_context) + c_dict = {"channel": action.channel.channel_type, "url": action.channel.webhook_url} - tasks.append(self._exec_notify(c_dict, msg)) + tasks.append(self._wrap_retry_task(self._exec_notify, c_dict, msg, action=action)) except Exception as e: logger.exception(f"Template render failed for action {action.id}: {e}") tasks.append(self._return_error("notify", action.channel.name, str(e))) @@ -197,30 +200,39 @@ class RuleEngine: return out - async def _exec_forward(self, target: dict, payload: dict): - try: - import httpx - async with httpx.AsyncClient() as client: - resp = await client.post(target['url'], json=payload, timeout=target.get('timeout_ms', 5000)/1000) - resp.raise_for_status() - return {"type": "forward", "target": target['name'], "ok": True} - except Exception as e: - return {"type": "forward", "target": target['name'], "ok": False, "error": str(e)} + @http_retry() + async def _exec_forward(self, target: dict, payload: dict) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """Execute forward with retry logic. Returns (result_dict, retry_metadata).""" + import httpx + async with httpx.AsyncClient() as client: + resp = await client.post(target['url'], json=payload, timeout=target.get('timeout_ms', 5000)/1000) + resp.raise_for_status() + return {"type": "forward", "target": target['name'], "ok": True}, {} - async def _exec_notify(self, channel: dict, msg: str): - try: - from app.services.notify import send_feishu, send_wecom - channel_type = channel.get('channel') - url = channel.get('url') - - if channel_type == 'feishu': - await send_feishu(url, msg) - elif channel_type == 'wecom': - await send_wecom(url, msg) - return {"type": "notify", "channel": channel_type, "ok": True} - except Exception as e: - logger.exception(f"Notification failed for {channel.get('channel')}: {e}") - return {"type": "notify", "channel": channel.get('channel'), "ok": False, "error": str(e)} + @http_retry() + async def _exec_notify(self, channel: dict, msg: str) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """Execute notify with retry logic. Returns (result_dict, retry_metadata).""" + from app.services.notify import send_feishu, send_wecom + channel_type = channel.get('channel') + url = channel.get('url') + + if channel_type == 'feishu': + await send_feishu(url, msg) + elif channel_type == 'wecom': + await send_wecom(url, msg) + return {"type": "notify", "channel": channel_type, "ok": True}, {} + + async def _wrap_retry_task(self, func, *args, **kwargs): + """Wrap retry-enabled task to handle metadata and return standard result format.""" + action = kwargs.pop('action', None) # Remove action from kwargs + result, metadata = await func(*args, **kwargs) + + # Add retry metadata to result dict for logging + if metadata: + result['_retry_attempts'] = metadata.get('attempts', 1) + result['_retry_latency'] = metadata.get('total_latency', 0.0) + + return result async def _return_error(self, type_str, name, err): return {"type": type_str, "target" if type_str == 'forward' else "channel": name, "ok": False, "error": err} diff --git a/app/templates.py b/app/templates.py new file mode 100644 index 0000000..d18f103 --- /dev/null +++ b/app/templates.py @@ -0,0 +1,43 @@ +import re +from typing import Any, Dict + +import jinja2 + +# Create a minimal Jinja2 environment for text templates. +# Use StrictUndefined so missing variables raise exceptions during rendering. +ENV = jinja2.Environment( + undefined=jinja2.StrictUndefined, + autoescape=False, +) + + +def _convert_braced_format_to_jinja(template: str) -> str: + """ + Convert simple `{var}` style placeholders into Jinja `{{ var }}` so legacy templates keep working. + This will not touch existing `{{ }}` or `{% %}`. + """ + # Skip conversion if template already contains jinja markers + if ("{{" in template) or ("{%" in template): + return template + + # Replace single-brace simple identifiers like {a_b} or {a.b} -> {{ a_b }} or {{ a.b }} + pattern = re.compile(r'(? str: + """ + Safely render a template string using Jinja2 with StrictUndefined. + - Accepts legacy `{var}` placeholders by converting them to Jinja style. + - Raises jinja2 exceptions when rendering fails. + """ + if template_str is None: + raise ValueError("template_str is None") + + tpl = _convert_braced_format_to_jinja(template_str) + jtpl = ENV.from_string(tpl) + # Jinja expects normal Python types in context; if context contains Pydantic models, + # they should be converted by caller to plain dicts. + return jtpl.render(**context) + + diff --git a/app/utils/retry.py b/app/utils/retry.py new file mode 100644 index 0000000..e2be82c --- /dev/null +++ b/app/utils/retry.py @@ -0,0 +1,99 @@ +""" +Async retry decorator with exponential backoff and configurable parameters. +""" +import asyncio +import logging +from functools import wraps +from typing import Callable, Any, Optional +import time +import os + +logger = logging.getLogger(__name__) + +def async_retry( + max_attempts: int = 3, + initial_delay: float = 1.0, + backoff_factor: float = 2.0, + max_delay: float = 60.0, + retry_on: tuple = (Exception,), + jitter: bool = True +): + """ + Decorator for async functions that implements exponential backoff retry logic. + + Args: + max_attempts: Maximum number of retry attempts (including initial call) + initial_delay: Initial delay in seconds before first retry + backoff_factor: Factor by which delay increases each retry + max_delay: Maximum delay between retries + retry_on: Tuple of exception types to retry on + jitter: Add random jitter to delay to prevent thundering herd + """ + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(*args, **kwargs) -> tuple[Any, dict]: + """ + Returns: + tuple: (result, metadata_dict) + metadata_dict contains: attempts, total_latency, last_error + """ + last_error = None + start_time = time.time() + + for attempt in range(max_attempts): + try: + result = await func(*args, **kwargs) + total_latency = time.time() - start_time + return result, { + 'attempts': attempt + 1, + 'total_latency': round(total_latency, 3), + 'last_error': None, + 'success': True + } + except retry_on as e: + last_error = str(e) + if attempt < max_attempts - 1: # Don't sleep after last attempt + delay = min(initial_delay * (backoff_factor ** attempt), max_delay) + if jitter: + # Add random jitter (±25% of delay) + import random + jitter_range = delay * 0.25 + delay += random.uniform(-jitter_range, jitter_range) + + logger.warning(f"Attempt {attempt + 1}/{max_attempts} failed for {func.__name__}: {e}. Retrying in {delay:.2f}s") + await asyncio.sleep(delay) + else: + logger.error(f"All {max_attempts} attempts failed for {func.__name__}: {e}") + + total_latency = time.time() - start_time + return None, { + 'attempts': max_attempts, + 'total_latency': round(total_latency, 3), + 'last_error': last_error, + 'success': False + } + + return wrapper + return decorator + + +# Configuration from environment +def get_retry_config(): + """Get retry configuration from environment variables.""" + return { + 'max_attempts': int(os.getenv('RETRY_MAX_ATTEMPTS', '3')), + 'initial_delay': float(os.getenv('RETRY_INITIAL_DELAY', '1.0')), + 'backoff_factor': float(os.getenv('RETRY_BACKOFF_FACTOR', '2.0')), + 'max_delay': float(os.getenv('RETRY_MAX_DELAY', '30.0')), + } + + +# Pre-configured decorators for common use cases +def http_retry(**kwargs): + """Retry decorator specifically for HTTP operations.""" + config = get_retry_config() + config.update(kwargs) + return async_retry( + retry_on=(Exception,), # Retry on any exception for HTTP calls + **config + ) diff --git a/templates/admin/templates.html b/templates/admin/templates.html index ff2279c..618e2e3 100644 --- a/templates/admin/templates.html +++ b/templates/admin/templates.html @@ -67,9 +67,14 @@ +