WebhockTransfer/app/services/engine.py
houhuan 2bc7460f1f feat: 初始化Webhook中继系统项目
- 添加FastAPI应用基础结构,包括主入口、路由和模型定义
- 实现Webhook接收端点(/webhook/{namespace})和健康检查(/health)
- 添加管理后台路由和模板,支持端点、目标、渠道和模板管理
- 包含SQLite数据库模型定义和初始化逻辑
- 添加日志记录和统计服务
- 包含Dockerfile和配置示例文件
- 添加项目文档,包括设计、流程图和验收标准
2025-12-21 18:43:12 +08:00

229 lines
10 KiB
Python

from typing import Any, Dict, List, Optional
import asyncio
import re
from app.db import SessionLocal, ProcessingRule, RuleAction, Target, NotificationChannel, MessageTemplate
from app.logging import get_logger
logger = get_logger("engine")
class RuleEngine:
def __init__(self):
pass
def get_value_by_path(self, payload: Dict[str, Any], path: str) -> Optional[str]:
try:
keys = path.split('.')
value = payload
for key in keys:
if isinstance(value, dict):
value = value.get(key)
else:
return None
return str(value) if value is not None else None
except Exception:
return None
def check_condition(self, actual_val: str, operator: str, match_val: str) -> bool:
if actual_val is None:
return False
operator = operator or 'eq'
if operator == 'eq':
return actual_val == match_val
elif operator == 'neq':
return actual_val != match_val
elif operator == 'contains':
return match_val in actual_val
elif operator == 'startswith':
return actual_val.startswith(match_val)
elif operator == 'regex':
try:
return re.search(match_val, actual_val) is not None
except:
return False
return False
async def process(self, endpoint_id: int, payload: Dict[str, Any]):
db = SessionLocal()
tasks = []
try:
# Recursive processing function
# context stores accumulated template_vars AND active_template from parent rules
# context = { "vars": {...}, "template_content": "..." }
def process_rules(rules: List[ProcessingRule], context: Dict):
for rule in rules:
actual_val = self.get_value_by_path(payload, rule.match_field)
if self.check_condition(actual_val, rule.operator, rule.match_value):
logger.info({"event": "rule_matched", "rule_id": rule.id, "match_field": rule.match_field})
# Prepare context for this level
# We use shallow copy for dict structure, but deep copy for internal vars is not strictly needed
# as long as we don't mutate active_template in place (it's a string).
current_context = {
"vars": context.get("vars", {}).copy(),
"template_content": context.get("template_content")
}
# 1. First Pass: Collect Vars and Templates from all actions
# This allows a parent rule to set a template even if it doesn't send a notification itself
for action in rule.actions:
if action.template_vars:
current_context["vars"].update(action.template_vars)
# If action has a template, it updates the current context's template
# This template will be used by subsequent actions in this rule OR children
if action.template:
current_context["template_content"] = action.template.template_content
# 2. Second Pass: Execute Actions
for action in rule.actions:
if action.action_type == 'forward' and action.target:
t_dict = {"name": action.target.name, "url": action.target.url, "timeout_ms": action.target.timeout_ms}
tasks.append(self._exec_forward(t_dict, payload))
elif action.action_type == 'notify':
# Check if we have a valid channel
if action.channel:
# Determine template to use: Action's own template > Inherited template
template_content = None
if action.template:
template_content = action.template.template_content
else:
template_content = current_context.get("template_content")
if template_content:
try:
# Flatten payload + merge current context vars
render_context = self._flatten_payload(payload)
render_context.update(current_context["vars"])
msg = template_content.format(**render_context)
c_dict = {"channel": action.channel.channel_type, "url": action.channel.webhook_url}
tasks.append(self._exec_notify(c_dict, msg))
except Exception as e:
logger.exception(f"Template render failed for action {action.id}: {e}")
tasks.append(self._return_error("notify", action.channel.name, str(e)))
else:
# Channel exists but no template found anywhere
logger.warning(f"Action {action.id} has channel but no template (own or inherited). Skipping.")
# 3. Process children (DFS)
if rule.children:
process_rules(rule.children, current_context)
# Start with root rules (parent_rule_id is NULL)
root_rules = db.query(ProcessingRule).filter(
ProcessingRule.endpoint_id == endpoint_id,
ProcessingRule.parent_rule_id == None
).order_by(ProcessingRule.priority.desc()).all()
process_rules(root_rules, {"vars": {}, "template_content": None})
# Wait for all actions
results = await asyncio.gather(*tasks) if tasks else []
# Aggregate results
routed_results = []
notified_results = []
for res in results:
if res['type'] == 'forward':
routed_results.append(res)
else:
notified_results.append(res)
return routed_results, notified_results
finally:
db.close()
def _flatten_payload(self, y: dict) -> dict:
out = {}
# Helper class to allow attribute access on dictionaries within templates
class AttrDict(dict):
def __getattr__(self, key):
if key in self:
v = self[key]
if isinstance(v, dict):
return AttrDict(v)
return v
# Return empty string or None to avoid AttributeError in templates
return ""
def flatten(x, name=''):
if isinstance(x, dict):
for a in x:
flatten(x[a], name + a + '_')
if name == '':
# Wrap top-level nested dicts so {a.b} works in templates
out[a] = AttrDict(x[a]) if isinstance(x[a], dict) else x[a]
else:
if name:
out[name[:-1]] = x
flatten(y)
# Fallback aliases for common fields referenced by templates
# cash_resp_desc: prefer nested trans_order_info.cash_resp_desc
try:
if 'cash_resp_desc' not in out:
toi = y.get('trans_order_info') or {}
out['cash_resp_desc'] = (toi.get('cash_resp_desc') or "")
except Exception:
out['cash_resp_desc'] = ""
# actual_ref_amt: extra.actual_ref_amt > trans_order_info.ref_amt > settlement_amt
try:
if 'actual_ref_amt' not in out:
extra = y.get('extra') or {}
toi = y.get('trans_order_info') or {}
val = extra.get('actual_ref_amt')
if val is None:
val = toi.get('ref_amt')
if val is None:
val = y.get('settlement_amt')
out['actual_ref_amt'] = val
except Exception:
out['actual_ref_amt'] = y.get('settlement_amt')
# Ensure any dict values in context are AttrDict to support dot-notation
for k, v in list(out.items()):
if isinstance(v, dict) and not isinstance(v, AttrDict):
out[k] = AttrDict(v)
return out
async def _exec_forward(self, target: dict, payload: dict):
try:
import httpx
async with httpx.AsyncClient() as client:
resp = await client.post(target['url'], json=payload, timeout=target.get('timeout_ms', 5000)/1000)
resp.raise_for_status()
return {"type": "forward", "target": target['name'], "ok": True}
except Exception as e:
return {"type": "forward", "target": target['name'], "ok": False, "error": str(e)}
async def _exec_notify(self, channel: dict, msg: str):
try:
from app.services.notify import send_feishu, send_wecom
channel_type = channel.get('channel')
url = channel.get('url')
if channel_type == 'feishu':
await send_feishu(url, msg)
elif channel_type == 'wecom':
await send_wecom(url, msg)
return {"type": "notify", "channel": channel_type, "ok": True}
except Exception as e:
logger.exception(f"Notification failed for {channel.get('channel')}: {e}")
return {"type": "notify", "channel": channel.get('channel'), "ok": False, "error": str(e)}
async def _return_error(self, type_str, name, err):
return {"type": type_str, "target" if type_str == 'forward' else "channel": name, "ok": False, "error": err}
engine = RuleEngine()