- Add app/utils/retry.py with configurable async retry decorator - Update DeliveryLog model to track attempt_count and latency_seconds - Apply @http_retry to engine._exec_forward and _exec_notify methods - Update save_logs to record retry metadata - Add comprehensive unit tests for retry functionality - Support configuration via environment variables (RETRY_*) This improves reliability for downstream HTTP calls by automatically retrying transient failures with exponential backoff and jitter.
241 lines
11 KiB
Python
241 lines
11 KiB
Python
from typing import Any, Dict, List, Optional, Tuple
|
|
import asyncio
|
|
import re
|
|
from app.db import SessionLocal, ProcessingRule, RuleAction, Target, NotificationChannel, MessageTemplate
|
|
from app.logging import get_logger
|
|
from app.utils.retry import http_retry
|
|
from app.templates import safe_render
|
|
|
|
logger = get_logger("engine")
|
|
|
|
class RuleEngine:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def get_value_by_path(self, payload: Dict[str, Any], path: str) -> Optional[str]:
|
|
try:
|
|
keys = path.split('.')
|
|
value = payload
|
|
for key in keys:
|
|
if isinstance(value, dict):
|
|
value = value.get(key)
|
|
else:
|
|
return None
|
|
return str(value) if value is not None else None
|
|
except Exception:
|
|
return None
|
|
|
|
def check_condition(self, actual_val: str, operator: str, match_val: str) -> bool:
|
|
if actual_val is None:
|
|
return False
|
|
|
|
operator = operator or 'eq'
|
|
|
|
if operator == 'eq':
|
|
return actual_val == match_val
|
|
elif operator == 'neq':
|
|
return actual_val != match_val
|
|
elif operator == 'contains':
|
|
return match_val in actual_val
|
|
elif operator == 'startswith':
|
|
return actual_val.startswith(match_val)
|
|
elif operator == 'regex':
|
|
try:
|
|
return re.search(match_val, actual_val) is not None
|
|
except:
|
|
return False
|
|
return False
|
|
|
|
async def process(self, endpoint_id: int, payload: Dict[str, Any]):
|
|
db = SessionLocal()
|
|
tasks = []
|
|
|
|
try:
|
|
# Recursive processing function
|
|
# context stores accumulated template_vars AND active_template from parent rules
|
|
# context = { "vars": {...}, "template_content": "..." }
|
|
def process_rules(rules: List[ProcessingRule], context: Dict):
|
|
for rule in rules:
|
|
actual_val = self.get_value_by_path(payload, rule.match_field)
|
|
|
|
if self.check_condition(actual_val, rule.operator, rule.match_value):
|
|
logger.info({"event": "rule_matched", "rule_id": rule.id, "match_field": rule.match_field})
|
|
|
|
# Prepare context for this level
|
|
# We use shallow copy for dict structure, but deep copy for internal vars is not strictly needed
|
|
# as long as we don't mutate active_template in place (it's a string).
|
|
current_context = {
|
|
"vars": context.get("vars", {}).copy(),
|
|
"template_content": context.get("template_content")
|
|
}
|
|
|
|
# 1. First Pass: Collect Vars and Templates from all actions
|
|
# This allows a parent rule to set a template even if it doesn't send a notification itself
|
|
for action in rule.actions:
|
|
if action.template_vars:
|
|
current_context["vars"].update(action.template_vars)
|
|
|
|
# If action has a template, it updates the current context's template
|
|
# This template will be used by subsequent actions in this rule OR children
|
|
if action.template:
|
|
current_context["template_content"] = action.template.template_content
|
|
|
|
# 2. Second Pass: Execute Actions
|
|
for action in rule.actions:
|
|
if action.action_type == 'forward' and action.target:
|
|
t_dict = {"name": action.target.name, "url": action.target.url, "timeout_ms": action.target.timeout_ms}
|
|
tasks.append(self._wrap_retry_task(self._exec_forward, t_dict, payload, action=action))
|
|
|
|
elif action.action_type == 'notify':
|
|
# Check if we have a valid channel
|
|
if action.channel:
|
|
# Determine template to use: Action's own template > Inherited template
|
|
template_content = None
|
|
if action.template:
|
|
template_content = action.template.template_content
|
|
else:
|
|
template_content = current_context.get("template_content")
|
|
|
|
if template_content:
|
|
try:
|
|
# Flatten payload + merge current context vars
|
|
render_context = self._flatten_payload(payload)
|
|
render_context.update(current_context["vars"])
|
|
|
|
# Use safe Jinja2 rendering (supports legacy {var} by conversion)
|
|
msg = safe_render(template_content, render_context)
|
|
|
|
c_dict = {"channel": action.channel.channel_type, "url": action.channel.webhook_url}
|
|
tasks.append(self._wrap_retry_task(self._exec_notify, c_dict, msg, action=action))
|
|
except Exception as e:
|
|
logger.exception(f"Template render failed for action {action.id}: {e}")
|
|
tasks.append(self._return_error("notify", action.channel.name, str(e)))
|
|
else:
|
|
# Channel exists but no template found anywhere
|
|
logger.warning(f"Action {action.id} has channel but no template (own or inherited). Skipping.")
|
|
|
|
# 3. Process children (DFS)
|
|
if rule.children:
|
|
process_rules(rule.children, current_context)
|
|
|
|
# Start with root rules (parent_rule_id is NULL)
|
|
root_rules = db.query(ProcessingRule).filter(
|
|
ProcessingRule.endpoint_id == endpoint_id,
|
|
ProcessingRule.parent_rule_id == None
|
|
).order_by(ProcessingRule.priority.desc()).all()
|
|
|
|
process_rules(root_rules, {"vars": {}, "template_content": None})
|
|
|
|
# Wait for all actions
|
|
results = await asyncio.gather(*tasks) if tasks else []
|
|
|
|
# Aggregate results
|
|
routed_results = []
|
|
notified_results = []
|
|
for res in results:
|
|
if res['type'] == 'forward':
|
|
routed_results.append(res)
|
|
else:
|
|
notified_results.append(res)
|
|
|
|
return routed_results, notified_results
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
def _flatten_payload(self, y: dict) -> dict:
|
|
out = {}
|
|
|
|
# Helper class to allow attribute access on dictionaries within templates
|
|
class AttrDict(dict):
|
|
def __getattr__(self, key):
|
|
if key in self:
|
|
v = self[key]
|
|
if isinstance(v, dict):
|
|
return AttrDict(v)
|
|
return v
|
|
# Return empty string or None to avoid AttributeError in templates
|
|
return ""
|
|
|
|
def flatten(x, name=''):
|
|
if isinstance(x, dict):
|
|
for a in x:
|
|
flatten(x[a], name + a + '_')
|
|
if name == '':
|
|
# Wrap top-level nested dicts so {a.b} works in templates
|
|
out[a] = AttrDict(x[a]) if isinstance(x[a], dict) else x[a]
|
|
else:
|
|
if name:
|
|
out[name[:-1]] = x
|
|
|
|
flatten(y)
|
|
|
|
# Fallback aliases for common fields referenced by templates
|
|
# cash_resp_desc: prefer nested trans_order_info.cash_resp_desc
|
|
try:
|
|
if 'cash_resp_desc' not in out:
|
|
toi = y.get('trans_order_info') or {}
|
|
out['cash_resp_desc'] = (toi.get('cash_resp_desc') or "")
|
|
except Exception:
|
|
out['cash_resp_desc'] = ""
|
|
|
|
# actual_ref_amt: extra.actual_ref_amt > trans_order_info.ref_amt > settlement_amt
|
|
try:
|
|
if 'actual_ref_amt' not in out:
|
|
extra = y.get('extra') or {}
|
|
toi = y.get('trans_order_info') or {}
|
|
val = extra.get('actual_ref_amt')
|
|
if val is None:
|
|
val = toi.get('ref_amt')
|
|
if val is None:
|
|
val = y.get('settlement_amt')
|
|
out['actual_ref_amt'] = val
|
|
except Exception:
|
|
out['actual_ref_amt'] = y.get('settlement_amt')
|
|
|
|
# Ensure any dict values in context are AttrDict to support dot-notation
|
|
for k, v in list(out.items()):
|
|
if isinstance(v, dict) and not isinstance(v, AttrDict):
|
|
out[k] = AttrDict(v)
|
|
|
|
return out
|
|
|
|
@http_retry()
|
|
async def _exec_forward(self, target: dict, payload: dict) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
|
"""Execute forward with retry logic. Returns (result_dict, retry_metadata)."""
|
|
import httpx
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(target['url'], json=payload, timeout=target.get('timeout_ms', 5000)/1000)
|
|
resp.raise_for_status()
|
|
return {"type": "forward", "target": target['name'], "ok": True}, {}
|
|
|
|
@http_retry()
|
|
async def _exec_notify(self, channel: dict, msg: str) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
|
"""Execute notify with retry logic. Returns (result_dict, retry_metadata)."""
|
|
from app.services.notify import send_feishu, send_wecom
|
|
channel_type = channel.get('channel')
|
|
url = channel.get('url')
|
|
|
|
if channel_type == 'feishu':
|
|
await send_feishu(url, msg)
|
|
elif channel_type == 'wecom':
|
|
await send_wecom(url, msg)
|
|
return {"type": "notify", "channel": channel_type, "ok": True}, {}
|
|
|
|
async def _wrap_retry_task(self, func, *args, **kwargs):
|
|
"""Wrap retry-enabled task to handle metadata and return standard result format."""
|
|
action = kwargs.pop('action', None) # Remove action from kwargs
|
|
result, metadata = await func(*args, **kwargs)
|
|
|
|
# Add retry metadata to result dict for logging
|
|
if metadata:
|
|
result['_retry_attempts'] = metadata.get('attempts', 1)
|
|
result['_retry_latency'] = metadata.get('total_latency', 0.0)
|
|
|
|
return result
|
|
|
|
async def _return_error(self, type_str, name, err):
|
|
return {"type": type_str, "target" if type_str == 'forward' else "channel": name, "ok": False, "error": err}
|
|
|
|
engine = RuleEngine()
|