from typing import Any, Dict, List, Optional import asyncio import re from app.db import SessionLocal, ProcessingRule, RuleAction, Target, NotificationChannel, MessageTemplate from app.logging import get_logger logger = get_logger("engine") class RuleEngine: def __init__(self): pass def get_value_by_path(self, payload: Dict[str, Any], path: str) -> Optional[str]: try: keys = path.split('.') value = payload for key in keys: if isinstance(value, dict): value = value.get(key) else: return None return str(value) if value is not None else None except Exception: return None def check_condition(self, actual_val: str, operator: str, match_val: str) -> bool: if actual_val is None: return False operator = operator or 'eq' if operator == 'eq': return actual_val == match_val elif operator == 'neq': return actual_val != match_val elif operator == 'contains': return match_val in actual_val elif operator == 'startswith': return actual_val.startswith(match_val) elif operator == 'regex': try: return re.search(match_val, actual_val) is not None except: return False return False async def process(self, endpoint_id: int, payload: Dict[str, Any]): db = SessionLocal() tasks = [] try: # Recursive processing function # context stores accumulated template_vars AND active_template from parent rules # context = { "vars": {...}, "template_content": "..." } def process_rules(rules: List[ProcessingRule], context: Dict): for rule in rules: actual_val = self.get_value_by_path(payload, rule.match_field) if self.check_condition(actual_val, rule.operator, rule.match_value): logger.info({"event": "rule_matched", "rule_id": rule.id, "match_field": rule.match_field}) # Prepare context for this level # We use shallow copy for dict structure, but deep copy for internal vars is not strictly needed # as long as we don't mutate active_template in place (it's a string). current_context = { "vars": context.get("vars", {}).copy(), "template_content": context.get("template_content") } # 1. First Pass: Collect Vars and Templates from all actions # This allows a parent rule to set a template even if it doesn't send a notification itself for action in rule.actions: if action.template_vars: current_context["vars"].update(action.template_vars) # If action has a template, it updates the current context's template # This template will be used by subsequent actions in this rule OR children if action.template: current_context["template_content"] = action.template.template_content # 2. Second Pass: Execute Actions for action in rule.actions: if action.action_type == 'forward' and action.target: t_dict = {"name": action.target.name, "url": action.target.url, "timeout_ms": action.target.timeout_ms} tasks.append(self._exec_forward(t_dict, payload)) elif action.action_type == 'notify': # Check if we have a valid channel if action.channel: # Determine template to use: Action's own template > Inherited template template_content = None if action.template: template_content = action.template.template_content else: template_content = current_context.get("template_content") if template_content: try: # Flatten payload + merge current context vars render_context = self._flatten_payload(payload) render_context.update(current_context["vars"]) msg = template_content.format(**render_context) c_dict = {"channel": action.channel.channel_type, "url": action.channel.webhook_url} tasks.append(self._exec_notify(c_dict, msg)) except Exception as e: logger.exception(f"Template render failed for action {action.id}: {e}") tasks.append(self._return_error("notify", action.channel.name, str(e))) else: # Channel exists but no template found anywhere logger.warning(f"Action {action.id} has channel but no template (own or inherited). Skipping.") # 3. Process children (DFS) if rule.children: process_rules(rule.children, current_context) # Start with root rules (parent_rule_id is NULL) root_rules = db.query(ProcessingRule).filter( ProcessingRule.endpoint_id == endpoint_id, ProcessingRule.parent_rule_id == None ).order_by(ProcessingRule.priority.desc()).all() process_rules(root_rules, {"vars": {}, "template_content": None}) # Wait for all actions results = await asyncio.gather(*tasks) if tasks else [] # Aggregate results routed_results = [] notified_results = [] for res in results: if res['type'] == 'forward': routed_results.append(res) else: notified_results.append(res) return routed_results, notified_results finally: db.close() def _flatten_payload(self, y: dict) -> dict: out = {} # Helper class to allow attribute access on dictionaries within templates class AttrDict(dict): def __getattr__(self, key): if key in self: v = self[key] if isinstance(v, dict): return AttrDict(v) return v # Return empty string or None to avoid AttributeError in templates return "" def flatten(x, name=''): if isinstance(x, dict): for a in x: flatten(x[a], name + a + '_') if name == '': # Wrap top-level nested dicts so {a.b} works in templates out[a] = AttrDict(x[a]) if isinstance(x[a], dict) else x[a] else: if name: out[name[:-1]] = x flatten(y) # Fallback aliases for common fields referenced by templates # cash_resp_desc: prefer nested trans_order_info.cash_resp_desc try: if 'cash_resp_desc' not in out: toi = y.get('trans_order_info') or {} out['cash_resp_desc'] = (toi.get('cash_resp_desc') or "") except Exception: out['cash_resp_desc'] = "" # actual_ref_amt: extra.actual_ref_amt > trans_order_info.ref_amt > settlement_amt try: if 'actual_ref_amt' not in out: extra = y.get('extra') or {} toi = y.get('trans_order_info') or {} val = extra.get('actual_ref_amt') if val is None: val = toi.get('ref_amt') if val is None: val = y.get('settlement_amt') out['actual_ref_amt'] = val except Exception: out['actual_ref_amt'] = y.get('settlement_amt') # Ensure any dict values in context are AttrDict to support dot-notation for k, v in list(out.items()): if isinstance(v, dict) and not isinstance(v, AttrDict): out[k] = AttrDict(v) return out async def _exec_forward(self, target: dict, payload: dict): try: import httpx async with httpx.AsyncClient() as client: resp = await client.post(target['url'], json=payload, timeout=target.get('timeout_ms', 5000)/1000) resp.raise_for_status() return {"type": "forward", "target": target['name'], "ok": True} except Exception as e: return {"type": "forward", "target": target['name'], "ok": False, "error": str(e)} async def _exec_notify(self, channel: dict, msg: str): try: from app.services.notify import send_feishu, send_wecom channel_type = channel.get('channel') url = channel.get('url') if channel_type == 'feishu': await send_feishu(url, msg) elif channel_type == 'wecom': await send_wecom(url, msg) return {"type": "notify", "channel": channel_type, "ok": True} except Exception as e: logger.exception(f"Notification failed for {channel.get('channel')}: {e}") return {"type": "notify", "channel": channel.get('channel'), "ok": False, "error": str(e)} async def _return_error(self, type_str, name, err): return {"type": type_str, "target" if type_str == 'forward' else "channel": name, "ok": False, "error": err} engine = RuleEngine()