From 2bc7460f1f112dd07211369b05e70776dc8829c7 Mon Sep 17 00:00:00 2001 From: houhuan Date: Sun, 21 Dec 2025 18:43:12 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=88=9D=E5=A7=8B=E5=8C=96Webhook?= =?UTF-8?q?=E4=B8=AD=E7=BB=A7=E7=B3=BB=E7=BB=9F=E9=A1=B9=E7=9B=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加FastAPI应用基础结构,包括主入口、路由和模型定义 - 实现Webhook接收端点(/webhook/{namespace})和健康检查(/health) - 添加管理后台路由和模板,支持端点、目标、渠道和模板管理 - 包含SQLite数据库模型定义和初始化逻辑 - 添加日志记录和统计服务 - 包含Dockerfile和配置示例文件 - 添加项目文档,包括设计、流程图和验收标准 --- .../Webhook中继系统 - Admin后台升级计划.md | 32 ++ .../Webhook中继系统 - 增强功能实施计划.md | 47 ++ .../Webhook中继系统 - 核心逻辑重构计划.md | 53 ++ .trae/documents/增强模板变量与编辑功能.md | 30 ++ .trae/documents/系统修复与全链路优化计划.md | 42 ++ .trae/documents/规则引擎逻辑重构计划.md | 50 ++ 123.josn | 0 Dockerfile | 12 + README.md | 118 +++++ app/__init__.py | 1 + app/__pycache__/__init__.cpython-314.pyc | Bin 0 -> 141 bytes app/__pycache__/db.cpython-314.pyc | Bin 0 -> 5965 bytes app/__pycache__/logging.cpython-314.pyc | Bin 0 -> 1280 bytes app/__pycache__/main.cpython-314.pyc | Bin 0 -> 5341 bytes app/admin.py | 485 ++++++++++++++++++ app/db.py | 102 ++++ app/logging.py | 15 + app/main.py | 102 ++++ app/models.py | 32 ++ app/services/engine.py | 228 ++++++++ app/services/notify.py | 13 + app/services/stats.py | 45 ++ config/config.yml | 45 ++ config/data.db | Bin 0 -> 163840 bytes .../webhook-relay/ACCEPTANCE_webhook-relay.md | 21 + docs/webhook-relay/ALIGNMENT_webhook-relay.md | 48 ++ docs/webhook-relay/CONSENSUS_webhook-relay.md | 25 + docs/webhook-relay/DESIGN_webhook-relay.md | 51 ++ docs/webhook-relay/FLOWCHART.md | 191 +++++++ docs/webhook-relay/TASK_[webhook-relay].md | 27 + requirements.txt | 10 + samples/incoming.json | 1 + scripts/verify_logic.py | 22 + scripts/verify_message_only.py | 30 ++ templates/admin/base.html | 51 ++ templates/admin/channels.html | 124 +++++ templates/admin/dashboard.html | 196 +++++++ templates/admin/endpoint_detail.html | 467 +++++++++++++++++ templates/admin/endpoints.html | 124 +++++ templates/admin/logs.html | 123 +++++ templates/admin/targets.html | 111 ++++ templates/admin/templates.html | 103 ++++ 42 files changed, 3177 insertions(+) create mode 100644 .trae/documents/Webhook中继系统 - Admin后台升级计划.md create mode 100644 .trae/documents/Webhook中继系统 - 增强功能实施计划.md create mode 100644 .trae/documents/Webhook中继系统 - 核心逻辑重构计划.md create mode 100644 .trae/documents/增强模板变量与编辑功能.md create mode 100644 .trae/documents/系统修复与全链路优化计划.md create mode 100644 .trae/documents/规则引擎逻辑重构计划.md create mode 100644 123.josn create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 app/__init__.py create mode 100644 app/__pycache__/__init__.cpython-314.pyc create mode 100644 app/__pycache__/db.cpython-314.pyc create mode 100644 app/__pycache__/logging.cpython-314.pyc create mode 100644 app/__pycache__/main.cpython-314.pyc create mode 100644 app/admin.py create mode 100644 app/db.py create mode 100644 app/logging.py create mode 100644 app/main.py create mode 100644 app/models.py create mode 100644 app/services/engine.py create mode 100644 app/services/notify.py create mode 100644 app/services/stats.py create mode 100644 config/config.yml create mode 100644 config/data.db create mode 100644 docs/webhook-relay/ACCEPTANCE_webhook-relay.md create mode 100644 docs/webhook-relay/ALIGNMENT_webhook-relay.md create mode 100644 docs/webhook-relay/CONSENSUS_webhook-relay.md create mode 100644 docs/webhook-relay/DESIGN_webhook-relay.md create mode 100644 docs/webhook-relay/FLOWCHART.md create mode 100644 docs/webhook-relay/TASK_[webhook-relay].md create mode 100644 requirements.txt create mode 100644 samples/incoming.json create mode 100644 scripts/verify_logic.py create mode 100644 scripts/verify_message_only.py create mode 100644 templates/admin/base.html create mode 100644 templates/admin/channels.html create mode 100644 templates/admin/dashboard.html create mode 100644 templates/admin/endpoint_detail.html create mode 100644 templates/admin/endpoints.html create mode 100644 templates/admin/logs.html create mode 100644 templates/admin/targets.html create mode 100644 templates/admin/templates.html diff --git a/.trae/documents/Webhook中继系统 - Admin后台升级计划.md b/.trae/documents/Webhook中继系统 - Admin后台升级计划.md new file mode 100644 index 0000000..015c53c --- /dev/null +++ b/.trae/documents/Webhook中继系统 - Admin后台升级计划.md @@ -0,0 +1,32 @@ +# Webhook中继系统 - Admin后台升级计划 + +## 1. 核心变更:数据库驱动配置 +- 引入 **SQLite + SQLAlchemy** 替代静态 `config.yml`。 +- 存储模型: + - `Target`:转发目标Webhook(名称、URL、超时)。 + - `RemarkRule`:remark值到目标列表的映射(imcgcd03 -> [Target A, Target B])。 + - `EventTemplate`:事件模板(event_define_no -> 模板文本)。 + - `NotificationChannel`:通知渠道(类型feishu/wecom、Webhook URL、名称)。 + - `EventChannelBinding`:事件到渠道的绑定(pay.ali_scaned -> [Channel A, Channel B])。 + +## 2. Admin后台界面 +- 基于 **FastAPI + Jinja2 + Bootstrap** 提供简易管理UI。 +- 功能页面: + - **目标管理**:增删改查转发目标。 + - **路由规则**:配置 `remark` 对应的目标列表。 + - **通知模板**:编辑不同事件的通知文案。 + - **通知渠道**:添加飞书/企业微信机器人链接。 + - **通知绑定**:将事件与通知渠道关联。 + +## 3. 系统架构调整 +- `app/db.py`:数据库连接与模型定义。 +- `app/admin.py`:管理后台路由与CRUD逻辑。 +- `app/config.py`:改为混合模式,优先从DB加载动态配置,保留文件配置作为默认值/种子数据。 +- `app/services/relay.py` & `notify.py`:适配新的DB驱动配置查询。 + +## 4. 实施步骤 +1. **依赖更新**:添加 `sqlalchemy`、`jinja2`、`python-multipart`。 +2. **数据层实现**:定义ORM模型与初始化脚本(支持从现有yaml导入种子数据)。 +3. **后台UI实现**:创建HTML模板与FastAPI路由。 +4. **配置适配**:改造 `load_config` 逻辑,支持热重载或每次请求查询DB。 +5. **构建与部署**:更新Dockerfile,重新构建并运行,验证后台配置生效。 diff --git a/.trae/documents/Webhook中继系统 - 增强功能实施计划.md b/.trae/documents/Webhook中继系统 - 增强功能实施计划.md new file mode 100644 index 0000000..b7f580d --- /dev/null +++ b/.trae/documents/Webhook中继系统 - 增强功能实施计划.md @@ -0,0 +1,47 @@ +# Webhook中继系统 - 增强功能实施计划 + +## 1. 核心变更:日志审计与多端点管理 +为了满足多Webhook接收管理、系统状态监控及日志审计需求,将进行以下数据库与逻辑扩展: + +### 1.1 数据库模型扩展 (`app/db.py`) +- **`WebhookEndpoint`**:管理接收端点(Namespace)。 + - 字段:`namespace` (唯一标识), `description` (备注), `created_at`, `is_active`。 +- **`RequestLog`**:记录入站请求。 + - 字段:`id`, `namespace`, `remark`, `event_no`, `raw_body` (JSON), `received_at`, `status` (success/error)。 +- **`DeliveryLog`**:记录出站转发与推送结果。 + - 字段:`request_id` (FK), `target_name` (目标/渠道名), `type` (relay/notify), `status`, `response_summary`, `created_at`。 + +### 1.2 业务逻辑增强 (`app/main.py`) +- **端点校验**:`POST /webhook/{namespace}` 将校验 namespace 是否在 `WebhookEndpoint` 表中注册(未注册的可选拒绝或标记为匿名)。 +- **日志持久化**:请求处理完成后,将入站数据与出站结果(routed/notified)异步写入 SQLite 数据库。 +- **状态统计**:内存记录系统启动时间,实时计算运行耗时;从 DB 统计今日接收量。 + +## 2. Admin后台功能升级 +### 2.1 界面改造 (`templates/admin/*.html`) +- **顶部状态栏 (`base.html`)**:增加系统运行状态指示器(运行时间、今日请求数、最新一条日志时间)。 +- **端点管理 (`endpoints.html`)**:新增页面,支持创建、启用/禁用接收端点(Namespace)。 +- **日志审计 (`logs.html`)**: + - 请求列表:按时间倒序展示,显示 namespace, remark, event。 + - 详情视图:点击查看完整 JSON 负载及对应的所有分发记录(目标+渠道)。 + - 清理功能:一键清空历史日志。 + +### 2.2 后台逻辑 (`app/admin.py`) +- 新增 `/admin/endpoints` 路由:CRUD 操作。 +- 新增 `/admin/logs` 路由:分页查询日志。 +- 新增 `/admin/stats` 接口(或模板上下文):提供状态栏数据。 + +## 3. 实施步骤 +1. **更新 DB 模型**:修改 `app/db.py` 添加新表。 +2. **实现日志与端点逻辑**: + - 更新 `app/main.py` 接入日志记录。 + - 创建 `app/services/stats.py` 简单的统计服务。 +3. **开发前端页面**: + - 修改 `base.html` 添加状态栏与菜单。 + - 创建 `endpoints.html` 和 `logs.html`。 +4. **开发后台路由**:在 `app/admin.py` 实现对应视图。 +5. **验证**: + - 创建新 Endpoint `demo_input`。 + - 发送测试数据,确认日志生成且状态栏更新。 + - 验证“日志详情”能清晰展示接收到的 JSON 和分发结果。 + +此方案在保留现有轻量级架构的同时,补全了运维监控与多入口管理能力。 diff --git a/.trae/documents/Webhook中继系统 - 核心逻辑重构计划.md b/.trae/documents/Webhook中继系统 - 核心逻辑重构计划.md new file mode 100644 index 0000000..650cf7a --- /dev/null +++ b/.trae/documents/Webhook中继系统 - 核心逻辑重构计划.md @@ -0,0 +1,53 @@ +# Webhook 中继系统 - 核心逻辑重构计划 + +## 1. 核心设计理念变更 +响应您的需求,我们将系统从“基于固定字段(remark)的路由”升级为**“基于端点的自定义规则引擎”**。 + +- **以端点为中心**:每个接收端点(Endpoint)拥有独立的配置页面,绑定属于自己的处理流程。 +- **通用规则引擎**:不再硬编码检测 `remark` 或 `event_define_no`。 + - **变量自定义**:您可以指定检测 JSON 中的任意字段(如 `body.status` 或 `data.order_id`)。 + - **条件匹配**:当指定字段的值等于设定值时,触发动作。 +- **统一动作(Action)**:将“转发”和“通知”统一为规则命中的“动作”。 + - 动作 A:转发给目标 X(原 Target)。 + - 动作 B:使用模板 Y 发送给渠道 Z(原 Notification)。 + +## 2. 数据库模型重构 (`app/db.py`) +我们需要调整表结构以支持这种灵活关系(建议重置数据库): + +1. **保留**:`WebhookEndpoint`, `Target`, `NotificationChannel`, `RequestLog`, `DeliveryLog`。 +2. **移除**:`RemarkRule`, 旧的 `EventTemplate` 关联逻辑。 +3. **新增/修改**: + * **`MessageTemplate`** (原 EventTemplate):仅存储模板文本,不再绑定特定事件ID,作为纯资源库。 + * **`ProcessingRule`** (处理规则):绑定到 `WebhookEndpoint`。 + * 字段:`match_field` (匹配键,如 `trans_order_info.remark`), `match_value` (匹配值)。 + * **`RuleAction`** (规则动作):绑定到 `ProcessingRule`。 + * 字段:`action_type` (forward/notify), `target_id` (关联Target), `channel_id` (关联Channel), `template_id` (关联Template)。 + +## 3. 业务逻辑升级 (`app/services/engine.py`) +创建新的规则引擎服务: +- **动态取值**:支持点号索引(如 `data.user.id`)从深层 JSON 中提取变量。 +- **流程执行**: + 1. 接收 Webhook -> 查找 Endpoint。 + 2. 遍历 Endpoint 下的所有 `ProcessingRule`。 + 3. 提取 `match_field` 对应的值与 `match_value` 比对。 + 4. 命中则执行该规则下的所有 `RuleAction`(并行执行转发或通知)。 + +## 4. Admin UI 交互大改版 +- **资源库模式**:`目标管理`、`通知渠道`、`消息模板` 变为纯粹的基础资源维护页面。 +- **端点编排页**(核心): + - 点击某个端点,进入**“流程配置”**详情页。 + - **可视化规则编辑器**: + - "当 `[输入框: 字段路径]` 等于 `[输入框: 值]` 时:" + - ➕ 添加动作:[转发] -> 选择目标。 + - ➕ 添加动作:[通知] -> 选择渠道 + 选择模板。 +- **体验优化**:在同一个页面完成逻辑闭环,无需在不同菜单间跳转。 + +## 5. 实施步骤 +1. **重构 DB**:更新 `app/db.py` 模型定义。 +2. **实现引擎**:编写 `app/services/engine.py` 实现动态匹配与分发。 +3. **更新 API**:修改 `app/main.py` 调用新引擎。 +4. **重写 UI**: + - 改造 `admin.py` 路由。 + - 新增 `templates/admin/endpoint_detail.html` 作为核心配置页。 + - 简化其他资源页面。 +5. **迁移/重置**:删除旧 DB文件,重新构建运行。 diff --git a/.trae/documents/增强模板变量与编辑功能.md b/.trae/documents/增强模板变量与编辑功能.md new file mode 100644 index 0000000..8b4f879 --- /dev/null +++ b/.trae/documents/增强模板变量与编辑功能.md @@ -0,0 +1,30 @@ +# 系统功能增强计划 + +针对您提出的两个问题,我们将进行以下改进: + +## 1. 解决 "pay.wx_scaned" 等代码的显示问题 +**方案**:在配置“通知动作”时,允许您定义**自定义模板变量**。 +* **原理**: + * 当规则匹配到 `pay.wx_scaned` 时,您在动作中设置变量 `pay_type_name` = "微信支付"。 + * 当规则匹配到 `pay.ali_scaned` 时,您在动作中设置变量 `pay_type_name` = "支付宝"。 + * **模板写法**:`收到{pay_type_name}付款{trans_amt}元`。 +* 这样同一个模板可以根据不同的规则,显示不同的中文名称,完美解决代码转义问题。 + +## 2. 补全 "修改" 功能 +**方案**:为所有资源管理页面添加编辑功能。 +* **目标管理**:添加编辑按钮。 +* **通知渠道**:添加编辑按钮。 +* **消息模板**:添加编辑按钮。 +* **实现方式**:点击编辑弹出模态框(复用添加框),回显数据,保存时调用更新接口。 + +## 实施步骤 +1. **数据库升级**: + * `RuleAction` 表新增 `template_vars` 字段(JSON类型),用于存储自定义变量。 +2. **后端开发**: + * 更新 `RuleEngine`,在渲染模板前将 `template_vars` 合并到数据上下文中。 + * 在 `admin.py` 中新增 `update_target`, `update_channel`, `update_template` 等路由。 +3. **前端开发**: + * **端点详情页**:在添加/编辑动作时,增加“模板变量”输入框(JSON格式)。 + * **资源列表页**:增加“编辑”按钮及相应的 JS 逻辑来回显数据。 +4. **部署**: + * 由于涉及数据库字段变更,需要再次重置数据库。 diff --git a/.trae/documents/系统修复与全链路优化计划.md b/.trae/documents/系统修复与全链路优化计划.md new file mode 100644 index 0000000..23ba8a3 --- /dev/null +++ b/.trae/documents/系统修复与全链路优化计划.md @@ -0,0 +1,42 @@ +# 系统修复与全链路优化计划 + +针对“弹窗失效”的紧急Bug和您对系统的长远期望,我制定了以下全方位的优化方案: + +## 1. 紧急修复:UI 交互失效 (Priority 0) +**问题根源**:页面脚本(Script)在 Bootstrap 库加载之前执行了,导致 `bootstrap` 对象未定义。 +**修复方案**: +* 调整 `base.html` 结构,在底部引入 Bootstrap JS 之后新增 `{% block scripts %}` 插槽。 +* 将所有管理页面的 JS 代码移动到 `block scripts` 中,确保执行顺序正确。 + +## 2. 核心增强:规则引擎升级 (Priority 1) +目前的规则仅支持“等于”,无法满足复杂业务(如匹配特定前缀的订单号)。 +**优化方案**: +* **数据库变更**:`ProcessingRule` 表新增 `operator` 字段(默认为 `eq`)。 +* **支持操作符**: + * `eq` (等于) + * `neq` (不等于) + * `contains` (包含) + * `startswith` (以...开头) + * `regex` (正则表达式) +* **UI 适配**:在端点详情页的添加规则弹窗中,增加“操作符”下拉选择。 + +## 3. 运维增强:日志重试 (Replay) (Priority 2) +Webhook 系统最常见的问题是下游服务暂时不可用,导致消息丢失。 +**优化方案**: +* **后端**:新增 `/admin/logs/{id}/replay` 接口,读取历史日志的 `raw_body` 重新注入规则引擎。 +* **前端**:在“系统日志”列表的失败记录旁,添加“重试”按钮。 + +## 4. 体验优化:首页仪表盘 (Priority 3) +目前的首页直接跳转到列表,缺乏全局视野。 +**优化方案**: +* 重写 `/admin/` 首页,展示: + * 今日请求总量 / 成功率 + * 各端点 (Endpoint) 的流量占比 + * 最近 10 条失败记录(方便快速排查) + +## 实施路线图 +1. **UI 架构调整**:修复 `base.html` 及所有子页面(解决弹窗问题)。 +2. **数据模型迁移**:更新 DB,添加 `operator` 字段。 +3. **业务逻辑升级**:更新 `engine.py` 支持新操作符,实现重试逻辑。 +4. **界面功能补全**:更新前端页面适配新功能。 +5. **部署**:重置数据库并重启服务。 diff --git a/.trae/documents/规则引擎逻辑重构计划.md b/.trae/documents/规则引擎逻辑重构计划.md new file mode 100644 index 0000000..ef7281c --- /dev/null +++ b/.trae/documents/规则引擎逻辑重构计划.md @@ -0,0 +1,50 @@ +# 规则引擎逻辑重构计划:支持多级条件与流程编排 + +您说得非常对。目前的扁平化规则(单层 `if-match-then-action`)在处理“组合条件”时会导致规则数量爆炸,配置极其繁琐且难以维护。 + +**核心问题**:现在的结构是“单层匹配”,无法表达“先判断A,再判断B”的逻辑。 +**您的需求**: +1. **第一层(事件分类)**:先根据 `event_define_no` 判断业务类型(微信支付/退款/投诉),决定用哪个**模板**。 +2. **第二层(渠道分发)**:再根据 `remark` 判断归属(门店002/003),决定推给哪个**渠道**。 + +为此,我将对系统进行深度的逻辑重构,引入**“规则组(Rule Group)”**和**“条件链(Condition Chain)”**的概念,但为了保持用户界面的直观性,我设计了更符合您直觉的 **“流程树(Flow Tree)”** 方案。 + +## 方案设计:嵌套式规则引擎 + +### 1. 数据模型升级 (DB) +* **ProcessingRule 表变更**: + * 新增 `parent_rule_id` (Integer, nullable): 允许规则嵌套。子规则只有在父规则匹配成功后才会执行。 + * 此变更实现了“树状结构”。 + +### 2. 核心逻辑升级 (Engine) +* **递归匹配**: + * 引擎首先查找所有 `parent_id` 为空的**根规则**。 + * 如果根规则匹配成功(例如 `event_define_no == pay.wx_scaned`): + * 执行根规则挂载的动作(例如:设置模板变量 `pay_method="微信"`)。 + * **继续检查其子规则**。 + * 子规则(例如 `remark == imcgcd03`)匹配成功: + * 执行子规则挂载的动作(例如:推送到 `渠道003`)。 + * 动作执行时,会**继承**父规则上下文中设置的变量(如模板ID或变量)。 + +### 3. 用户界面重构 (UI) +* **端点详情页**: + * 不再是扁平列表,而是**树形展示**。 + * 在每个规则卡片内部,增加“**添加子规则**”按钮。 + * 视觉上通过缩进或连线表示层级关系。 + * 支持**拖拽排序**(优先级调整)。 + +## 实施步骤 +1. **数据库迁移**:为 `ProcessingRule` 添加 `parent_rule_id` 字段。 +2. **后端逻辑**:重写 `engine.py` 的 `process` 方法,改为递归处理。 +3. **前端交互**:重写 `endpoint_detail.html`,使用递归模板渲染规则树。 +4. **无损更新**:编写 SQL 脚本或逻辑,确保现有数据平滑迁移到新结构(现有规则都视为根规则)。 + +这样,您的配置流程将变为: +1. **根规则**:`event_define_no == pay.wx_scaned` + * 动作:设置变量 `pay_type="微信"`(**不需要指定渠道**) + * **子规则 A**:`remark == imcgcd02` + * 动作:推送到 `渠道002`(**复用父级的模板设置**) + * **子规则 B**:`remark == imcgcd03` + * 动作:推送到 `渠道003` + +这完美符合您的思维模型。 diff --git a/123.josn b/123.josn new file mode 100644 index 0000000..e69de29 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..007cca4 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.12-slim +WORKDIR /app +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 +COPY requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r /app/requirements.txt +COPY app /app/app +COPY config /app/config +COPY templates /app/templates +ENV WEBHOOK_CONFIG_PATH=/app/config/config.yml +EXPOSE 8080 +CMD ["uvicorn","app.main:app","--host","0.0.0.0","--port","8080"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..331410e --- /dev/null +++ b/README.md @@ -0,0 +1,118 @@ +## 项目简介 + +这是一个可配置的 Webhook 中继与通知系统,用于: +- 统一接收外部支付/业务系统的 Webhook 请求 +- 按规则将请求转发到内部多个目标系统 +- 根据事件与模板生成消息,推送到飞书、企业微信等 IM 渠道 + +核心特性: +- 通过 Web 管理后台 `/admin` 管理端点、规则树、动作、消息模板与通知渠道 +- 支持树形规则:按 `remark`、`event_define_no` 等字段组合匹配 +- 支持转发(forward)与通知(notify)两类动作 +- 消息模板支持变量与继承,可在父规则设置通用模板,在子规则覆盖变量 +- 所有请求与分发结果写入 SQLite 数据库,便于审计与排查 + +## 技术栈与目录结构 + +- 语言与框架:Python 3.12 + FastAPI +- 数据库:SQLite(可通过 `DB_PATH` 切换到其他 SQLAlchemy 支持的数据库) +- HTTP 客户端:httpx + +主要目录: +- `app/main.py`:FastAPI 入口,`/webhook/{namespace}` 与 `/health` +- `app/admin.py`:管理后台路由(端点、规则、动作、模板、渠道、日志) +- `app/db.py`:SQLAlchemy 模型与会话管理 +- `app/services/engine.py`:规则引擎,实现树形规则匹配与动作编排 +- `app/services/notify.py`:飞书与企业微信的实际发送封装 +- `templates/admin/`:管理后台的 HTML 模板 +- `config/data.db`:默认 SQLite 数据库文件(若不存在会自动创建) +- `docs/webhook-relay/`:6A 工作流下的对齐、设计、任务与流程文档 + +## 本地运行(开发环境) + +1. 安装依赖: + - 可选:创建虚拟环境 + - 执行:`pip install -r requirements.txt` + +2. 初始化数据库并启动服务: + - 进入项目根目录:`cd e:\2025Code\python\webhock` + - 启动:`uvicorn app.main:app --host 0.0.0.0 --port 8080 --reload` + +3. 访问入口: + - Webhook 接口:`POST http://localhost:8080/webhook/{namespace}` + - 健康检查:`GET http://localhost:8080/health` + - 管理后台:`GET http://localhost:8080/admin/endpoints` + +## Docker 部署 + +项目包含 `Dockerfile`,可直接构建镜像部署: + +1. 构建镜像: + - 在项目根目录执行 + `docker build -t webhook-relay .` + +2. 运行容器(基础示例): + - `docker run -d --name webhook-relay -p 8080:8080 webhook-relay` + + 默认配置: + - 工作目录:`/app` + - 配置与数据库目录:`/app/config` + - 默认数据库:`sqlite:///./config/data.db` + +3. 使用持久化存储: + - 在宿主机创建配置/数据目录,例如:`/srv/webhook-relay/config` + - 将初始配置文件与数据库(可选)放入该目录 + - 启动容器时挂载目录: + - `docker run -d --name webhook-relay -p 8080:8080 -v /srv/webhook-relay/config:/app/config webhook-relay` + +4. 自定义数据库: + - 通过环境变量 `DB_PATH` 覆盖默认数据库连接: + - 例如:`-e DB_PATH="sqlite:///./config/data.db"`(默认值) + - 或者:`-e DB_PATH="postgresql+psycopg2://user:pass@host:5432/dbname"`(需要自行安装驱动并调整镜像) + +## 管理后台与规则配置概览 + +管理后台主要页面: +- 端点管理(Endpoints):维护不同业务线/系统对应的 `namespace` +- 目标管理(Targets):配置转发目标的名称、URL 与超时时间 +- 渠道管理(Channels):配置飞书/企微等机器人 Webhook +- 模板管理(Templates):配置消息模板内容 +- 端点详情:以树形方式配置 ProcessingRule 与 RuleAction,支持: + - 根规则与子规则 + - 动作类型:转发或通知 + - 模板变量与模板继承 +- 日志页面:查看 RequestLog 与 DeliveryLog 记录 + +典型流程: +1. 在 “Targets” 中配置内部系统的 Webhook URL +2. 在 “Channels” 中配置飞书/企微机器人地址 +3. 在 “Templates” 中配置通用或专用消息模板 +4. 在 “Endpoints” 中创建业务端点,并在详情页中: + - 创建根规则,按 `remark` 或其他字段区分不同业务来源 + - 在子规则中根据 `event_define_no` 等字段区分事件类型 + - 为规则添加 Forward/Notify 动作,绑定目标、渠道与模板变量 + +## 推到 Gitea 与服务器部署建议 + +1. 在本地完成配置与验证后,将整个仓库推送到 Gitea: + - 初始化 git 仓库(如尚未初始化):`git init` + - 添加远程并推送到 Gitea 项目 + +2. 在服务器上通过 Gitea 拉取代码: + - 使用 Gitea 的 “克隆” 地址在服务器执行 `git clone` + +3. 在服务器上构建并运行 Docker 镜像: + - `docker build -t webhook-relay .` + - 准备持久化目录并挂载配置/数据库 + - `docker run -d --name webhook-relay -p 8080:8080 -v /srv/webhook-relay/config:/app/config webhook-relay` + +4. 如需进一步自动化(CI/CD): + - 可以在 Gitea 中配置 Actions 或 Webhook,在推送时自动触发服务器上的构建与重启逻辑 + - 具体流水线脚本可根据你的服务器环境与习惯单独设计 + +更详细的架构、流程与用例说明可参考: +- `docs/webhook-relay/ALIGNMENT_webhook-relay.md` +- `docs/webhook-relay/DESIGN_webhook-relay.md` +- `docs/webhook-relay/FLOWCHART.md` +- `docs/webhook-relay/ACCEPTANCE_webhook-relay.md` + diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/app/__init__.py @@ -0,0 +1 @@ + diff --git a/app/__pycache__/__init__.cpython-314.pyc b/app/__pycache__/__init__.cpython-314.pyc new file mode 100644 index 0000000000000000000000000000000000000000..fcf4deb0dd329d28a9f07ba7a65fbc853b55239e GIT binary patch literal 141 zcmdPq^MY(Pc>LlA>9gC?WjN`@kk%FiIlTUx1BF-8VP zrq1~(sWAnWB^mj7G3BXA8TrZCF^L5QG4b)4d6^~g@p=W7w>WHa^HWN5QtgUZfm%U^ Q7lRm|m>C%viCRO}o<) zmC`-5C+(uHw41uq9_mSZsWFe1o3eS+-D#?EzHY#QfWH*h^|+yv)(f$MAICOOv++&~+*mve)_ z4YhImI5!O3NE^4GbECkGwQ&bHHxArH8+R}pnSI?B+TI)Xkd%ARb+y(%lodyW?2z0i z_fLrOzT0fO-U@0*K!4ws8oNbDf6nSv4E+Wrf!sq>N1QSAX)(`jgq;fmzjHB%`Fpc zVJ0OqPaejiFjj7F?}FffX9>7fAuCW(7HHRf5ABvkDrI{>7%sU>?sf#><~$GZx@5@_ zgxBJp0|??neXX87vdih|m)$fVdq7lP=E|sKQKQUF`P}Rjj*03e1NhGb&Umnk=a(CI#u{ z6qSMs=POh*6@?5zmuK)7*oo^G=FXlu``V-TVl9+XU7|^xi>OR?^yNfjz3x(-AHU^n*OuZYdgM3t@rVGb#f=# zx9;9dG^68>ZjzVnf%d5sw}Q{Jc_3{n4SxrfN{+HRE7apC*e~~hGF_ldH;d00RA10@ z9C(vUYN@0ZShCc{RB{&A%#b9kYD*;j@ZypKHIiZcIE>^7lA}mQkT|*_$M6%CLyiMs zf!2TuT`gT$4aIU_&t_MB0GJE&kbG>f=9@U*pyWTW8s-Lgma%09>4#biH? z%K<=tkcID|g&WJ)OZl>)mne(cXw+I9TR(j;4Y)8*sZuDQ10sXKV4+!J~X z%wu=^I;Iv1r;g^ef=0E|1}TFLfnN-vQ&-AHxq@ggnLDSNIW@1fG@tlh*2KK%>ddm5 z11rjFW{&7(3?Br26Z7h(qUJEx5I=kqNIeAb%QROqR+!HUb9r!v1kH-|5Lmw15igiD z5F}`q=6gUo;U&^rd$^JR{fTDee07=|-sXvBY@&K=2WnO}rDo(b@WBImHcx&su{F_* zUZ~#O3G~+oQrlzonfYcQ_vpHXFPv*1zHmSIJoE4@$q95XeG9%^Sr6!6vMtle6TDd; z@FBd(ZU<+6E9mhaj;93Fc^%vYcrStH627c^T$KHGZ#f_bopvE>B*?u&c0CPqUc_Dr zjauJwxUEg>K$|$9FCs@_^q4bxLXOj-a^6+p-X$fd}sn! z0CGl&Nw^g9Ce|ZtW>UT18QPdFGrw&}3M>Z^N`8RskhLbhQLMpLFi&ntFXUmZ%mZ17 zL3Fxmr2_tsCm zt`ee5-_x=B>B+{iYmL}-fPDB+edzS|VlzBdP5;Lw^bW1xc>0t2nd^<0rs_X>x0$?K zo#Aax{b6x?ss8T0dgjN?*Wat>R+`}ts_C6@vbMPKpczh8(_dO44985?7B=$D@GH>3 zW_}qOT)(*Os;B3gq3oj@migeK_nD95o(K1v&+Gqp^9eWr4_Zd>4dxW$@ZJlXmWfy< z>$k_*XMhpjV{Z=2h^!vLjy+<$N6491`tJS|-7@0dDa8%#ge63@kdtip)evXj2=XD4pHSldQ! z<2a~p=C@#NrC45TrC>13WfdU+SVkDje770B2LUBCSU+^85qcGdZ8yu$ zXfu|v4YLs*=VqBaw0;hPRAS=M9m{fX1D!4BGpK1xY;Jvv<=_eHxpW+O2h$pCx5u+W z&>p@m8wzE^vqxe215aC;W7&Zdv}i0F?~wK+Ani%E(jK3z!RgGTw~a+Sb@H-OFcw>I zPl;pzrsHA2a_|0YS^+bORqO%b!0Exm!HfBM#3{K1gt;KTsAL(kr-u-5l#*dV&O_7{ zW!}iI?jVr z_nV1j8)P_jXOR6FYT7_{_gg^rzq1=W`0qJZ3s!sIR`$I2-GbRZD7NyIY;^VnEb741 z2EBMT!J#I~9w*mH%3g4&K5(dhf*U3818F5C;Wp@)j^o4(f~$l())#YxK6_;Fg_Z|w zIX!+IAmLGy=s1scQX-HygL{XZrc%UvivtiNSFtJ2JMvf>MRMR=%J+ukF8oa#8-aBE zfV;ypKYFRR8Y7n)(aYeX?BLCB2_9d9^QwavJZQnKLEqWBlz1*zjvZeAaQp3M?6!TL z#kraL+Er!tN+!KFXg( zX(6iE=PLr6G&c#7I(nEU`nR*=sx zkuV*w0zc2md8)}HD0fg?w=TG4(@GCX8Jn&FVS#t73&Yz+PA%{~9u$D*qIH#aiF^bd zP`2iufPB?0isBc7@j3r}AZhlh-kVhyWXBto?VeAkn&Dg3 Mz^){UV_aGP1#UJ_O8@`> literal 0 HcmV?d00001 diff --git a/app/__pycache__/logging.cpython-314.pyc b/app/__pycache__/logging.cpython-314.pyc new file mode 100644 index 0000000000000000000000000000000000000000..e2cba5494fa939378a6b46424a00ec00e33a3a6e GIT binary patch literal 1280 zcmZ8h%}*0S6rcTQ*_Lj_T0=0TPz6gBkq|U$j2yIu8l<%(YfRQQD;>*6yStfPFnC%M z6Nwi%c)-L&ul_F{gp#xw@Z^abh67jMY`b0cCA05iesAVCZ{F~N`%$gW7K_{!lnxxT)P*!h!>g8| z*NkcvlO+yfY%mdF2ry0`!N)-gD)D#TAyk3bAbQFzb45;tnYI*p)nkJSEb~9P$-|z) zF~d+%*Xo!C3AWdXo))NJ*@X5m1}l1%2IpNWKrHymAa%{Q7@^BFjqj;blT)`I8WpUX zoA$DytFQ6PWn*bg)l5_EsLwW#)m4J_f&QO?({brw!iziHd ze)gsGspLjZ0fF2iFoOv@id%$LVBsMDz=iH{U7Y(#LXoCBZ6a|ITyZjZ_o+1s+=V#0F86`7M#oR(R|7K!z zOtY5kl{!va!=qz0yn$=1lU%QB)Ujo0RS0I&GGSGvp(?i9JwY0kDlgguYxTLdPuTra`7ykbtlN+_q(vaj_rhAG(m~UZNeok~Lhq#m zgk3xhvety0fJ>S7boQa4c7MZi7VFQe!ytr}frf!&xt&Fl5OCXf3k(Mx$?>L~*q0N# zc0(RfoPZrdjKNMrlVC@}A#oNXnpU=EEv@a1<(1f7 zJGLn^4lqMJFpU|Qa%pCO_5pW>2Oj#sLm!x?p*-*)BX_LXIwhSl)8Wmo({kx_rvJaI zl^kQB1OG_p{Cn<~|NP%~&LMA^i-0`v(P`pH-SK50GMS4N$swm(mr*(v{G% z|Beyv<*8=UUiBGPJKIj{j($PaP7FqL2QNM`EvlOCJ1Hb4$CdQ7oE#F=NwuBQJr6&6 z?kq2=nY65m@a-BGwRkE$J}xTo#X?yPG`sG3R8&^i9}k~#93`hkOk;Xr-e)= zfooe>Ut$BDhW}Oz1PdfeMr`*#OGpyZ8pwHSg5s$dTN@-X zyOn7f1eg-l#mtfgJ8I3Oi1#qD`p+Rp(OD80@|vD|PS*hm)uc5PSyXz0DH4`?-P31KIL}tIVNw#{F#M zuZ;6cm4O0J?{BA|IjnTYrg4yfv)c+o@Lik6+hVw3hVXO@+ckBjq<1o~v38{rGIXab zOyOV?qVCBEbE&kDj3=dpRC>vF=!}ugq$E{?(cPgJ96d@n2X@}g=f~}nN&k&Mfhj#5P=CdkC?Sp8NJ0!~Px+29f^6WR1a9Fe0 zUuemDo zv{KA_hd2*8VzbsKNE!+6%Ww~J*rKUPltg;zMgk7A92|u>B`A|R(S72KC<8KyV}MC< zIjuX?=|loR4x9~oKm`T>?$Im-*_isAbVi%Yh{{3ir(7vOa9o|9f)UQ?j>H52 zkeJe0!>7&7SN9ujhqQD^6=uW`cz3lYbhf=r!Q-spiB&?_7L9|Vg=A7^6X~fbNz)yQ zI0hX|DCpmH8;rxANTpR#clOOD#Eb^GuGC=tP6*na=KLEmVJ`+j2y|O=6x(?M$$?HO zIwk9rsAAV`AR!bJQJgqBETt-^AlGoMN$`aWWW&~CKeAp{`~1ii+ozQc`8~mP|DL6v zuLbt61okg;@9xV5x^n*e)~jn?ZCI;qU#V@+9_)RO&(;1oSA8O1-IVt?+;+2m*A>Tx zmpI*5*+ursr5Aj8&UMwf=zPJAifhre)V<2pZ?UAZ=E{k@$9uJZvH#`XeEq&RdS2_v zH#EM{_gdeZ2Yz)fU)_*zX??rzt-j@!w+DVb@Z#LNKUr^TdGq|+kG=KSa`$gW)>?bB zt-bGRxu#>#ZI$=mge~)J($MZ!C8@0Y=EmD|4R__M8^34kbsLp9x#EntMeY}_FV@{v zw+RKy$9b=M7CqU(UCZZ}Pi3nQuW|Q$%-!>!4H`1PQd?ogpFY*vUq`MVIMEByjh3Sj zeo*H=&CwsU)|{s4Khbv3=N+9BqL9)i5>TTk2sh8bA0}wA7K9-rS0>{ytW~)ia(_#d zK8Oq;G&i0#8&vus`8fPlbnHkZZh5Mfl390smZ>k02fKrsuj({-S9-GZ$s(*jbqAMV zp+2}@vULoC@5?1RY2Xix!5<8}HjLO{!{wtzHXPRsI!qe}NMWEm1@ZFRCz&ew{7QQF=F`SfWOj*V_#< zN~H#olI@TvxgYphvHrK9eTIoLMJx=VPli$Lg8n66Ucz?Mba)$2cNcg}GjfYv@csw% zz;sf)eMb*yy*mVW&v%Two&cvpqQ66|nVlv)dl7bVmRg|C*j>PFV(2ps8AKkuLz>8i zsKwuQ*1ZJlq0A212GPmxkQn-0LoTovW{(zl8!!;(5U`LT2k#!@I_Zov7Ah1YgUyy! zig^!TX4Z(IPcUSt$P#$>wp7ltgHFcM;0jV?XhTJI&`ccU%u|5VMN9^q23$veR4Na< z?tnevN_#SGYb4FYLSbm*vf)g;2Heb7T9?QW09|MdFQ?MWa2nw1<~ETwo`B2Rd_}RG zBnt@*ZkWHi)X738#&voXa3gG!8%*Pvm?kKdaa{$1G zTQN}3gjZxRI-L^bc9%Isyd)_R4A5Z$+pzA_Zc9Vbz##3|2E{;P)ajI_;&3hOwb0j5 zn4|Jj$UF&u^*k&Hdf`u<)z9x;IGOiVT%BEHa%XN;*1dA*#X}1Nd4_#1d^!B=!y6234^sfbuf$%Ay)?4s zYrewVc9Fd;Z;ZS?vOJJ$JiZnWo8zA0xLy57Hc)pz=1?ldB7nEGmyBQ+0xsh)$S zeflKZ*F*im*$d%yyT7l7T(6<}_Au9L8PpqVG5tQO?|$Zb_lY2U+}J}w#*Lb~zApO4 z5gVo)rBLr;KyMG~4&z}&0x~)VZ#P0lG9EEXfTULx(mO1^sG4T!hPR6jyzj$1fZ~9$ z=}t@$l2ekbAOPqdLywyeIy#HF$x)L!q8*w15rN5gDS#&$D4>eUj07j8!+g0hE!MUW?>B{2XB&nH_2U}5dP1Eze(C4t%zI7< int: + new_rule = ProcessingRule( + endpoint_id=endpoint_id, + match_field=src_rule.match_field, + operator=src_rule.operator, + match_value=src_rule.match_value, + priority=src_rule.priority, + parent_rule_id=new_parent_id + ) + db.add(new_rule) + db.commit() + db.refresh(new_rule) + for a in src_rule.actions: + db.add(RuleAction( + rule_id=new_rule.id, + action_type=a.action_type, + target_id=a.target_id if a.action_type == 'forward' else None, + channel_id=a.channel_id if a.action_type == 'notify' else None, + template_id=a.template_id if a.action_type == 'notify' else None, + template_vars=a.template_vars + )) + db.commit() + children = db.query(ProcessingRule).filter(ProcessingRule.parent_rule_id == src_rule.id).all() + for child in children: + _duplicate_rule_tree(db, child, endpoint_id, new_rule.id) + return new_rule.id + +@router.post("/rules/duplicate") +async def duplicate_rule( + rule_id: int = Form(...), + endpoint_id: int = Form(...), + parent_rule_id: Optional[str] = Form(None), + include_children: Optional[str] = Form("true"), + db: Session = Depends(get_db) +): + src = db.query(ProcessingRule).filter(ProcessingRule.id == rule_id).first() + if not src: + raise HTTPException(status_code=404, detail="Rule not found") + new_parent = None + if parent_rule_id and str(parent_rule_id).strip(): + try: + new_parent = int(parent_rule_id) + except ValueError: + new_parent = None + new_rule_id = _duplicate_rule_tree(db, src, endpoint_id, new_parent) if (include_children or include_children.lower() == "true") else None + if not new_rule_id: + new_rule = ProcessingRule( + endpoint_id=endpoint_id, + match_field=src.match_field, + operator=src.operator, + match_value=src.match_value, + priority=src.priority, + parent_rule_id=new_parent + ) + db.add(new_rule) + db.commit() + db.refresh(new_rule) + for a in src.actions: + db.add(RuleAction( + rule_id=new_rule.id, + action_type=a.action_type, + target_id=a.target_id if a.action_type == 'forward' else None, + channel_id=a.channel_id if a.action_type == 'notify' else None, + template_id=a.template_id if a.action_type == 'notify' else None, + template_vars=a.template_vars + )) + db.commit() + return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303) + +@router.post("/actions/duplicate") +async def duplicate_action(id: int = Form(...), endpoint_id: int = Form(...), db: Session = Depends(get_db)): + src = db.query(RuleAction).filter(RuleAction.id == id).first() + if not src: + raise HTTPException(status_code=404, detail="Action not found") + db.add(RuleAction( + rule_id=src.rule_id, + action_type=src.action_type, + target_id=src.target_id if src.action_type == 'forward' else None, + channel_id=src.channel_id if src.action_type == 'notify' else None, + template_id=src.template_id if src.action_type == 'notify' else None, + template_vars=src.template_vars + )) + db.commit() + return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303) +@router.post("/rules/{rule_id}/actions") +async def add_action( + rule_id: int, + endpoint_id: int = Form(...), + action_type: str = Form(...), + target_id: Optional[Union[int, str]] = Form(None), + channel_id: Optional[Union[int, str]] = Form(None), + template_id: Optional[Union[int, str]] = Form(None), + template_vars_str: Optional[str] = Form(None), + db: Session = Depends(get_db) +): + t_vars = None + if template_vars_str and template_vars_str.strip(): + try: + t_vars = json.loads(template_vars_str) + except Exception: + pass + + # Helper to clean int params + def clean_int(val): + if val and str(val).strip(): + try: + return int(val) + except ValueError: + return None + return None + + action = RuleAction( + rule_id=rule_id, + action_type=action_type, + target_id=clean_int(target_id) if action_type == 'forward' else None, + channel_id=clean_int(channel_id) if action_type == 'notify' else None, + template_id=clean_int(template_id) if action_type == 'notify' else None, + template_vars=t_vars + ) + db.add(action) + db.commit() + return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303) + +@router.post("/actions/delete") +async def delete_action(id: int = Form(...), endpoint_id: int = Form(...), db: Session = Depends(get_db)): + db.query(RuleAction).filter(RuleAction.id == id).delete() + db.commit() + return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303) + + +# --- Targets --- +@router.get("/targets", response_class=HTMLResponse) +async def list_targets(request: Request, db: Session = Depends(get_db)): + targets = db.query(Target).all() + return await render("admin/targets.html", {"request": request, "targets": targets, "active_page": "targets"}) + +@router.post("/targets") +async def add_target(name: str = Form(...), url: str = Form(...), timeout_ms: int = Form(5000), db: Session = Depends(get_db)): + t = Target(name=name, url=url, timeout_ms=timeout_ms) + db.add(t) + db.commit() + return RedirectResponse(url="/admin/targets", status_code=303) + +@router.post("/targets/update") +async def update_target(id: int = Form(...), name: str = Form(...), url: str = Form(...), timeout_ms: int = Form(5000), db: Session = Depends(get_db)): + t = db.query(Target).filter(Target.id == id).first() + if t: + t.name = name + t.url = url + t.timeout_ms = timeout_ms + db.commit() + return RedirectResponse(url="/admin/targets", status_code=303) + +@router.post("/targets/delete") +async def delete_target(id: int = Form(...), db: Session = Depends(get_db)): + db.query(Target).filter(Target.id == id).delete() + db.commit() + return RedirectResponse(url="/admin/targets", status_code=303) + +# --- Channels --- +@router.get("/channels", response_class=HTMLResponse) +async def list_channels(request: Request, db: Session = Depends(get_db)): + channels = db.query(NotificationChannel).all() + return await render("admin/channels.html", {"request": request, "channels": channels, "active_page": "channels"}) + +@router.post("/channels") +async def add_channel(name: str = Form(...), channel_type: str = Form(...), webhook_url: str = Form(...), db: Session = Depends(get_db)): + c = NotificationChannel(name=name, channel_type=channel_type, webhook_url=webhook_url) + db.add(c) + db.commit() + return RedirectResponse(url="/admin/channels", status_code=303) + +@router.post("/channels/update") +async def update_channel(id: int = Form(...), name: str = Form(...), channel_type: str = Form(...), webhook_url: str = Form(...), db: Session = Depends(get_db)): + c = db.query(NotificationChannel).filter(NotificationChannel.id == id).first() + if c: + c.name = name + c.channel_type = channel_type + c.webhook_url = webhook_url + db.commit() + return RedirectResponse(url="/admin/channels", status_code=303) + +@router.post("/channels/delete") +async def delete_channel(id: int = Form(...), db: Session = Depends(get_db)): + db.query(NotificationChannel).filter(NotificationChannel.id == id).delete() + db.commit() + return RedirectResponse(url="/admin/channels", status_code=303) + +# --- Templates --- +@router.get("/templates", response_class=HTMLResponse) +async def list_templates(request: Request, db: Session = Depends(get_db)): + tmpls = db.query(MessageTemplate).all() + return await render("admin/templates.html", {"request": request, "templates": tmpls, "active_page": "templates"}) + +@router.post("/templates") +async def add_template(name: str = Form(...), template_content: str = Form(...), db: Session = Depends(get_db)): + t = MessageTemplate(name=name, template_content=template_content) + db.add(t) + db.commit() + return RedirectResponse(url="/admin/templates", status_code=303) + +@router.post("/templates/update") +async def update_template(id: int = Form(...), name: str = Form(...), template_content: str = Form(...), db: Session = Depends(get_db)): + t = db.query(MessageTemplate).filter(MessageTemplate.id == id).first() + if t: + t.name = name + t.template_content = template_content + db.commit() + return RedirectResponse(url="/admin/templates", status_code=303) + +@router.post("/templates/delete") +async def delete_template(id: int = Form(...), db: Session = Depends(get_db)): + db.query(MessageTemplate).filter(MessageTemplate.id == id).delete() + db.commit() + return RedirectResponse(url="/admin/templates", status_code=303) + +# --- Logs --- +@router.get("/logs", response_class=HTMLResponse) +async def list_logs(request: Request, db: Session = Depends(get_db)): + logs = db.query(RequestLog).options(joinedload(RequestLog.delivery_logs))\ + .order_by(RequestLog.received_at.desc()).limit(100).all() + return await render("admin/logs.html", {"request": request, "logs": logs, "active_page": "logs"}) + +@router.post("/logs/clear") +async def clear_logs(db: Session = Depends(get_db)): + db.query(DeliveryLog).delete() + db.query(RequestLog).delete() + db.commit() + return RedirectResponse(url="/admin/logs", status_code=303) + +@router.post("/logs/{id}/replay") +async def replay_log(id: int, db: Session = Depends(get_db)): + log = db.query(RequestLog).filter(RequestLog.id == id).first() + if not log: + return JSONResponse({"error": "Log not found"}, status_code=404) + + # Find endpoint + endpoint = db.query(WebhookEndpoint).filter(WebhookEndpoint.namespace == log.namespace).first() + if not endpoint or not endpoint.is_active: + return JSONResponse({"error": "Endpoint inactive or missing"}, status_code=400) + + # Re-process + routed, notified = await engine.process(endpoint.id, log.raw_body) + + new_log = RequestLog( + namespace=log.namespace, + remark=log.remark, + event_no=log.event_no, + raw_body=log.raw_body, + status="replay" + ) + db.add(new_log) + db.commit() + db.refresh(new_log) + + for r in routed: + db.add(DeliveryLog( + request_id=new_log.id, + target_name=r.get("target"), + type="relay", + status="success" if r.get("ok") else "failed", + response_summary=str(r.get("error") or "OK") + )) + + for n in notified: + db.add(DeliveryLog( + request_id=new_log.id, + target_name=n.get("channel"), + type="notify", + status="success" if n.get("ok") else "failed", + response_summary=str(n.get("error") or "OK") + )) + + db.commit() + return JSONResponse({"status": "ok", "new_log_id": new_log.id}) diff --git a/app/db.py b/app/db.py new file mode 100644 index 0000000..914fa25 --- /dev/null +++ b/app/db.py @@ -0,0 +1,102 @@ +from sqlalchemy import create_engine, Column, Integer, String, JSON, Boolean, Table, ForeignKey, DateTime, Text +from sqlalchemy.orm import declarative_base, sessionmaker, relationship, backref +import os +from datetime import datetime + +Base = declarative_base() + +class Target(Base): + __tablename__ = 'targets' + id = Column(Integer, primary_key=True) + name = Column(String, unique=True, index=True) + url = Column(String) + timeout_ms = Column(Integer, default=5000) + +class NotificationChannel(Base): + __tablename__ = 'notification_channels' + id = Column(Integer, primary_key=True) + name = Column(String, unique=True) + channel_type = Column(String) # feishu, wecom + webhook_url = Column(String) + +class MessageTemplate(Base): + __tablename__ = 'message_templates' + id = Column(Integer, primary_key=True) + name = Column(String, unique=True) # 方便识别,如 "收款成功通知" + template_content = Column(Text) # "收到{amt}元" + +class WebhookEndpoint(Base): + __tablename__ = 'webhook_endpoints' + id = Column(Integer, primary_key=True) + namespace = Column(String, unique=True, index=True) + description = Column(String, nullable=True) + is_active = Column(Boolean, default=True) + created_at = Column(DateTime, default=datetime.utcnow) + rules = relationship("ProcessingRule", back_populates="endpoint", cascade="all, delete-orphan") + +class ProcessingRule(Base): + __tablename__ = 'processing_rules' + id = Column(Integer, primary_key=True) + endpoint_id = Column(Integer, ForeignKey('webhook_endpoints.id')) + endpoint = relationship("WebhookEndpoint", back_populates="rules") + + # Tree structure support + parent_rule_id = Column(Integer, ForeignKey('processing_rules.id'), nullable=True) + children = relationship("ProcessingRule", backref=backref('parent', remote_side=[id]), cascade="all, delete-orphan") + priority = Column(Integer, default=0) # Higher executes first (if we want ordering) + + match_field = Column(String) # e.g. "trans_order_info.remark" or "event_define_no" + operator = Column(String, default="eq") # eq, neq, contains, startswith, regex + match_value = Column(String) # e.g. "imcgcd03" or "pay.success" + + actions = relationship("RuleAction", back_populates="rule", cascade="all, delete-orphan") + +class RuleAction(Base): + __tablename__ = 'rule_actions' + id = Column(Integer, primary_key=True) + rule_id = Column(Integer, ForeignKey('processing_rules.id')) + rule = relationship("ProcessingRule", back_populates="actions") + + action_type = Column(String) # "forward" or "notify" + + # Forward params + target_id = Column(Integer, ForeignKey('targets.id'), nullable=True) + target = relationship("Target") + + # Notify params + channel_id = Column(Integer, ForeignKey('notification_channels.id'), nullable=True) + channel = relationship("NotificationChannel") + template_id = Column(Integer, ForeignKey('message_templates.id'), nullable=True) + template = relationship("MessageTemplate") + + # Extra params for templating (e.g. {"pay_method": "微信"}) + template_vars = Column(JSON, nullable=True) + +class RequestLog(Base): + __tablename__ = 'request_logs' + id = Column(Integer, primary_key=True) + namespace = Column(String, index=True) + remark = Column(String, nullable=True) # 保留用于快速筛选,可选 + event_no = Column(String, nullable=True) # 保留用于快速筛选,可选 + raw_body = Column(JSON) + received_at = Column(DateTime, default=datetime.utcnow) + status = Column(String) # success, error + delivery_logs = relationship("DeliveryLog", back_populates="request_log", cascade="all, delete-orphan") + +class DeliveryLog(Base): + __tablename__ = 'delivery_logs' + id = Column(Integer, primary_key=True) + request_id = Column(Integer, ForeignKey('request_logs.id')) + target_name = Column(String) + type = Column(String) # relay, notify + status = Column(String) # success, failed + response_summary = Column(Text, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow) + request_log = relationship("RequestLog", back_populates="delivery_logs") + +DB_PATH = os.getenv("DB_PATH", "sqlite:///./config/data.db") +engine = create_engine(DB_PATH, connect_args={"check_same_thread": False}) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +def init_db(): + Base.metadata.create_all(bind=engine) diff --git a/app/logging.py b/app/logging.py new file mode 100644 index 0000000..078bbbf --- /dev/null +++ b/app/logging.py @@ -0,0 +1,15 @@ +import logging +from pythonjsonlogger import jsonlogger +import os + +def get_logger(name: str) -> logging.Logger: + logger = logging.getLogger(name) + level = os.getenv("LOG_LEVEL", "INFO").upper() + logger.setLevel(level) + if not logger.handlers: + handler = logging.StreamHandler() + formatter = jsonlogger.JsonFormatter("%(asctime)s %(levelname)s %(name)s %(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.propagate = False + return logger diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..2bca5dc --- /dev/null +++ b/app/main.py @@ -0,0 +1,102 @@ +import asyncio +from fastapi import FastAPI, Request, BackgroundTasks +from fastapi.responses import JSONResponse +from app.logging import get_logger +from app.admin import router as admin_router +from app.db import SessionLocal, WebhookEndpoint, RequestLog, DeliveryLog, init_db +from app.services.engine import engine +from contextlib import asynccontextmanager + +logger = get_logger("app") + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup: Initialize DB + logger.info("Initializing database...") + init_db() + yield + # Shutdown logic if any + +app = FastAPI(lifespan=lifespan) +app.include_router(admin_router) + +def save_logs(namespace: str, payload_dict: dict, routed: list, notified: list): + try: + db = SessionLocal() + # Create RequestLog + req_log = RequestLog( + namespace=namespace, + remark=str(payload_dict.get("remark", "")), + event_no=str(payload_dict.get("event_define_no", "")), + raw_body=payload_dict, + status="success" + ) + db.add(req_log) + db.commit() + db.refresh(req_log) + + # Create DeliveryLogs + for r in routed: + db.add(DeliveryLog( + request_id=req_log.id, + target_name=r.get("target"), + type="relay", + status="success" if r.get("ok") else "failed", + response_summary=str(r.get("error") or "OK") + )) + + for n in notified: + db.add(DeliveryLog( + request_id=req_log.id, + target_name=n.get("channel"), + type="notify", + status="success" if n.get("ok") else "failed", + response_summary=str(n.get("error") or "OK") + )) + + db.commit() + db.close() + except Exception as e: + logger.error(f"Failed to save logs: {e}") + +@app.get("/health") +async def health(): + return {"status": "ok"} + +@app.post("/webhook/{namespace}") +async def webhook(namespace: str, request: Request, background_tasks: BackgroundTasks): + db = SessionLocal() + endpoint = db.query(WebhookEndpoint).filter(WebhookEndpoint.namespace == namespace).first() + + if not endpoint: + # Auto-create for convenience if needed, or reject. For now reject if not exists. + # Or better: log warning but don't process. + # But per requirements "add endpoint", so we expect it to exist. + db.close() + return JSONResponse({"error": "Endpoint not found"}, status_code=404) + + if not endpoint.is_active: + db.close() + return JSONResponse({"error": "Endpoint inactive"}, status_code=403) + + endpoint_id = endpoint.id + db.close() + + try: + body = await request.json() + except Exception: + return JSONResponse({"error": "Invalid JSON"}, status_code=400) + + # Use new engine + routed, notified = await engine.process(endpoint_id, body) + + # Async save logs + background_tasks.add_task(save_logs, namespace, body, routed, notified) + + result = { + "namespace": namespace, + "routed": routed, + "notified": notified + } + logger.info({"event": "webhook_processed", "namespace": namespace, "routed": len(routed), "notified": len(notified)}) + return JSONResponse(result) diff --git a/app/models.py b/app/models.py new file mode 100644 index 0000000..cbf67a7 --- /dev/null +++ b/app/models.py @@ -0,0 +1,32 @@ +from typing import Optional, Dict, Any +from pydantic import BaseModel + +class IncomingOrderInfo(BaseModel): + cash_resp_desc: Optional[str] = None + ref_amt: Optional[float] = None + +class IncomingPayload(BaseModel): + remark: Optional[str] = None + event_define_no: Optional[str] = None + trans_amt: Optional[float] = None + settlement_amt: Optional[float] = None + out_trans_id: Optional[str] = None + hf_seq_id: Optional[str] = None + namespace: Optional[str] = None + trans_order_info: Optional[IncomingOrderInfo] = None + extra: Optional[Dict[str, Any]] = None + + def idempotent_key(self) -> Optional[str]: + return self.out_trans_id or self.hf_seq_id + + def cash_resp_desc(self) -> str: + v = None + if self.trans_order_info: + v = self.trans_order_info.cash_resp_desc + return v or "" + + def actual_ref_amt(self) -> Optional[float]: + extra_val = None + if self.extra and isinstance(self.extra, dict): + extra_val = self.extra.get("actual_ref_amt") + return extra_val or (self.trans_order_info.ref_amt if self.trans_order_info and self.trans_order_info.ref_amt is not None else self.settlement_amt) diff --git a/app/services/engine.py b/app/services/engine.py new file mode 100644 index 0000000..32d7669 --- /dev/null +++ b/app/services/engine.py @@ -0,0 +1,228 @@ +from typing import Any, Dict, List, Optional +import asyncio +import re +from app.db import SessionLocal, ProcessingRule, RuleAction, Target, NotificationChannel, MessageTemplate +from app.logging import get_logger + +logger = get_logger("engine") + +class RuleEngine: + def __init__(self): + pass + + def get_value_by_path(self, payload: Dict[str, Any], path: str) -> Optional[str]: + try: + keys = path.split('.') + value = payload + for key in keys: + if isinstance(value, dict): + value = value.get(key) + else: + return None + return str(value) if value is not None else None + except Exception: + return None + + def check_condition(self, actual_val: str, operator: str, match_val: str) -> bool: + if actual_val is None: + return False + + operator = operator or 'eq' + + if operator == 'eq': + return actual_val == match_val + elif operator == 'neq': + return actual_val != match_val + elif operator == 'contains': + return match_val in actual_val + elif operator == 'startswith': + return actual_val.startswith(match_val) + elif operator == 'regex': + try: + return re.search(match_val, actual_val) is not None + except: + return False + return False + + async def process(self, endpoint_id: int, payload: Dict[str, Any]): + db = SessionLocal() + tasks = [] + + try: + # Recursive processing function + # context stores accumulated template_vars AND active_template from parent rules + # context = { "vars": {...}, "template_content": "..." } + def process_rules(rules: List[ProcessingRule], context: Dict): + for rule in rules: + actual_val = self.get_value_by_path(payload, rule.match_field) + + if self.check_condition(actual_val, rule.operator, rule.match_value): + logger.info({"event": "rule_matched", "rule_id": rule.id, "match_field": rule.match_field}) + + # Prepare context for this level + # We use shallow copy for dict structure, but deep copy for internal vars is not strictly needed + # as long as we don't mutate active_template in place (it's a string). + current_context = { + "vars": context.get("vars", {}).copy(), + "template_content": context.get("template_content") + } + + # 1. First Pass: Collect Vars and Templates from all actions + # This allows a parent rule to set a template even if it doesn't send a notification itself + for action in rule.actions: + if action.template_vars: + current_context["vars"].update(action.template_vars) + + # If action has a template, it updates the current context's template + # This template will be used by subsequent actions in this rule OR children + if action.template: + current_context["template_content"] = action.template.template_content + + # 2. Second Pass: Execute Actions + for action in rule.actions: + if action.action_type == 'forward' and action.target: + t_dict = {"name": action.target.name, "url": action.target.url, "timeout_ms": action.target.timeout_ms} + tasks.append(self._exec_forward(t_dict, payload)) + + elif action.action_type == 'notify': + # Check if we have a valid channel + if action.channel: + # Determine template to use: Action's own template > Inherited template + template_content = None + if action.template: + template_content = action.template.template_content + else: + template_content = current_context.get("template_content") + + if template_content: + try: + # Flatten payload + merge current context vars + render_context = self._flatten_payload(payload) + render_context.update(current_context["vars"]) + + msg = template_content.format(**render_context) + + c_dict = {"channel": action.channel.channel_type, "url": action.channel.webhook_url} + tasks.append(self._exec_notify(c_dict, msg)) + except Exception as e: + logger.exception(f"Template render failed for action {action.id}: {e}") + tasks.append(self._return_error("notify", action.channel.name, str(e))) + else: + # Channel exists but no template found anywhere + logger.warning(f"Action {action.id} has channel but no template (own or inherited). Skipping.") + + # 3. Process children (DFS) + if rule.children: + process_rules(rule.children, current_context) + + # Start with root rules (parent_rule_id is NULL) + root_rules = db.query(ProcessingRule).filter( + ProcessingRule.endpoint_id == endpoint_id, + ProcessingRule.parent_rule_id == None + ).order_by(ProcessingRule.priority.desc()).all() + + process_rules(root_rules, {"vars": {}, "template_content": None}) + + # Wait for all actions + results = await asyncio.gather(*tasks) if tasks else [] + + # Aggregate results + routed_results = [] + notified_results = [] + for res in results: + if res['type'] == 'forward': + routed_results.append(res) + else: + notified_results.append(res) + + return routed_results, notified_results + + finally: + db.close() + + def _flatten_payload(self, y: dict) -> dict: + out = {} + + # Helper class to allow attribute access on dictionaries within templates + class AttrDict(dict): + def __getattr__(self, key): + if key in self: + v = self[key] + if isinstance(v, dict): + return AttrDict(v) + return v + # Return empty string or None to avoid AttributeError in templates + return "" + + def flatten(x, name=''): + if isinstance(x, dict): + for a in x: + flatten(x[a], name + a + '_') + if name == '': + # Wrap top-level nested dicts so {a.b} works in templates + out[a] = AttrDict(x[a]) if isinstance(x[a], dict) else x[a] + else: + if name: + out[name[:-1]] = x + + flatten(y) + + # Fallback aliases for common fields referenced by templates + # cash_resp_desc: prefer nested trans_order_info.cash_resp_desc + try: + if 'cash_resp_desc' not in out: + toi = y.get('trans_order_info') or {} + out['cash_resp_desc'] = (toi.get('cash_resp_desc') or "") + except Exception: + out['cash_resp_desc'] = "" + + # actual_ref_amt: extra.actual_ref_amt > trans_order_info.ref_amt > settlement_amt + try: + if 'actual_ref_amt' not in out: + extra = y.get('extra') or {} + toi = y.get('trans_order_info') or {} + val = extra.get('actual_ref_amt') + if val is None: + val = toi.get('ref_amt') + if val is None: + val = y.get('settlement_amt') + out['actual_ref_amt'] = val + except Exception: + out['actual_ref_amt'] = y.get('settlement_amt') + + # Ensure any dict values in context are AttrDict to support dot-notation + for k, v in list(out.items()): + if isinstance(v, dict) and not isinstance(v, AttrDict): + out[k] = AttrDict(v) + + return out + + async def _exec_forward(self, target: dict, payload: dict): + try: + import httpx + async with httpx.AsyncClient() as client: + resp = await client.post(target['url'], json=payload, timeout=target.get('timeout_ms', 5000)/1000) + resp.raise_for_status() + return {"type": "forward", "target": target['name'], "ok": True} + except Exception as e: + return {"type": "forward", "target": target['name'], "ok": False, "error": str(e)} + + async def _exec_notify(self, channel: dict, msg: str): + try: + from app.services.notify import send_feishu, send_wecom + channel_type = channel.get('channel') + url = channel.get('url') + + if channel_type == 'feishu': + await send_feishu(url, msg) + elif channel_type == 'wecom': + await send_wecom(url, msg) + return {"type": "notify", "channel": channel_type, "ok": True} + except Exception as e: + logger.exception(f"Notification failed for {channel.get('channel')}: {e}") + return {"type": "notify", "channel": channel.get('channel'), "ok": False, "error": str(e)} + + async def _return_error(self, type_str, name, err): + return {"type": type_str, "target" if type_str == 'forward' else "channel": name, "ok": False, "error": err} + +engine = RuleEngine() diff --git a/app/services/notify.py b/app/services/notify.py new file mode 100644 index 0000000..5f7893a --- /dev/null +++ b/app/services/notify.py @@ -0,0 +1,13 @@ +import httpx + +async def send_feishu(url: str, text: str): + async with httpx.AsyncClient(timeout=10) as client: + body = {"msg_type": "text", "content": {"text": text}} + resp = await client.post(url, json=body) + resp.raise_for_status() + +async def send_wecom(url: str, text: str): + async with httpx.AsyncClient(timeout=10) as client: + body = {"msgtype": "text", "text": {"content": text}} + resp = await client.post(url, json=body) + resp.raise_for_status() diff --git a/app/services/stats.py b/app/services/stats.py new file mode 100644 index 0000000..73675f2 --- /dev/null +++ b/app/services/stats.py @@ -0,0 +1,45 @@ +from datetime import datetime, timedelta +from app.db import SessionLocal, RequestLog +from sqlalchemy import func + +# 全局变量,记录启动时间 +START_TIME = datetime.utcnow() + +class SystemStats: + def __init__(self): + pass + + def get_uptime(self) -> str: + delta = datetime.utcnow() - START_TIME + days = delta.days + hours, remainder = divmod(delta.seconds, 3600) + minutes, seconds = divmod(remainder, 60) + if days > 0: + return f"{days}天 {hours}小时" + elif hours > 0: + return f"{hours}小时 {minutes}分" + else: + return f"{minutes}分 {seconds}秒" + + def get_today_count(self) -> int: + session = SessionLocal() + try: + today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) + count = session.query(func.count(RequestLog.id)).filter(RequestLog.received_at >= today_start).scalar() + return count or 0 + finally: + session.close() + + def get_latest_log_time(self) -> str: + session = SessionLocal() + try: + log = session.query(RequestLog).order_by(RequestLog.received_at.desc()).first() + if log: + # 简单转为本地时间显示(+8) + dt = log.received_at + timedelta(hours=8) + return dt.strftime("%H:%M:%S") + return "无" + finally: + session.close() + +stats_service = SystemStats() diff --git a/config/config.yml b/config/config.yml new file mode 100644 index 0000000..650fd60 --- /dev/null +++ b/config/config.yml @@ -0,0 +1,45 @@ +server: + port: 8080 + log_level: info +routing: + remark_map: + imcgcd03: ["target_3"] + imcgcd02: ["target_2"] + default_targets: [] +targets: + - name: target_1 + url: "https://httpbin.org/post" + timeout_ms: 5000 + - name: target_2 + url: "https://httpbin.org/post" + timeout_ms: 5000 + - name: target_3 + url: "https://httpbin.org/post" + timeout_ms: 5000 +notifications: + event_map: + wechat.complaint: + template: "⚠️请注意,您有新的微信投诉,请注意查看" + channels: ["feishu","wecom"] + refund.standard: + template: "退款成功,退款金额:{actual_ref_amt}" + channels: ["feishu","wecom"] + pay.ali_scaned: + template: "支付宝收款{trans_amt}元,状态:{cash_resp_desc}" + channels: ["feishu","wecom"] + pay.wx_scaned: + template: "微信收款{trans_amt}元,状态:{cash_resp_desc}" + channels: ["feishu","wecom"] +channels: + feishu: + webhooks: + - "https://open.feishu.cn/your-bot-webhook" + wecom: + webhooks: + - "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your-key" +retry: + max_attempts: 3 + initial_delay_ms: 500 + max_delay_ms: 3000 +security: + outbound_signature: false diff --git a/config/data.db b/config/data.db new file mode 100644 index 0000000000000000000000000000000000000000..7abdafafc9f40b37a887de794d319b7a2af85148 GIT binary patch literal 163840 zcmeHwZE#%Ib=ZR77eD|@N~A!FQnEopQOrvwFMx7FYrM z!F{_RA%>|ae~dpenK+Kqj+2ascHKCW#%`uFc4q9_tv&voU;g3#>WnMOZYSeuJ8e4C zrqgroefRBqyKk|(6aW$6fn{m;?tSmxd+xdCp6|2p>V;z6O%*D&l2cEqiGzuOfy75s zsYD|2$MFAW;Q!+90DRajzQBJ6LZ3T*{NqIWtsA4s1BuZ+f1OCKBp)ArZ|vWX{KM$K z965U6v%_B>{_y_4+4mXviT(%z1Ob8oL7+1PKK1(OzVQ<$20nYE?qpZoT4Tk{IJtVU zQue~1$7bedr!UW@E>EAiFq;bhl6q!%sF+Wkzj%4}rP=w^#ktF=i&rjO_;6}?h;?iJ zhE4p{nfhAQO}#cfKXYz+9-Heotl^W*uvaJ zeD9_C^RGN3&os+5YPC1;$IM2c;pQwn5?ttX2kFO>EToF zoEX_Re)8nNrvbXKvw3%=xa!u{GAoruFY;wHvbT|+*4te7M#J@5_soY&*(pW$w`IS* zx>Ij>VGO9bUKMc8&3KJc$*HZSF3(=QjN6r~xv<;$3@{RB0Ru0ee`U6(oeicJ0and# zMj)__1vvfQfqmnXlLMc5j=`yQ`&++{M4;9B>pB2)%lT@hSZ;+_)v3AA7;{#twPK}K ztgnf)h06whDz}s=6y2447uhu|N_aPFE@1U?xW!$?eJ@wS zZ8hg+CR@p`1q7_-=3F?oZRp>AfLix;_|)Cx{(a-;PY>Kx1!&v$unFI`pN9OUU%7bx z)hn}rgFdX=IyT>d0B`#_0`>WQ`^Hb79(ZSgLA|YSTf_bc{M&x+cN6lim#Y=4NNdAT zFM90CTXj=s=H@QUPP40{N4f)G*7op4jk86yk>j?8y^%$={oG|y;fV{9pUG9qb-<>8 zXtp8)k`8|(*&%>)H@$c7_?6QG35FZqjTI2sGESoo6bg=ErfpUkxvgp6-PZ>9jhp7c zJ1?`9$6jTzjk_Zdi0#q`6vWNi%2Wt+Y1CH2K7_KrVyeBh=yiax8r4v+tNu)~Q9cf}*-f9D75)ZIe^`^JwSAGo{5Ho^bV|Ie-n zpo0fHa%`>lb_?k0#gbcT)H5Y7d?&R6bokULEO;RKMcQTuyZQ?j}$xyq#KDB7eZsmf$lvyI7|o;OXaAmwzu@c8G3 zVO;~s|B`@z=#L;k5FiK;1PB5I0fGQQfFM8+AP5iy2m%CwZ4ZGXgYrDENey4A`#aqR4`o@hk{B%0ESe(oj%cuR&g3}&EL%p!< zu08J>{k!#0%=|Z0fGQQfFM8+AP5iy2m%BFf&f8) zAg~P}aCdNE4+ONkzk2(t@BPWQ?*GYm@Bix`-v8FU`+xDHU%dTCx3k5!SZ2LDzj*sg zx2w(?f`|huQ@YUNn$6JC- zq+T_HIV11DZ}|Anvo@;jf$`Fss} ze*WEWfBpXFzW2)?{n;_8Uh>QDeDD6(zWLste~~}(?pHqh?zg}D?ic>*{^!5= z>mS_xg~62G0Z99{yX1-cisQffAii~Kh4|WE&Ri6$fewHR$!I|wg$cg z9N`;p!yx`!{>69R{iA<%|8KtW?%(|#?CP740-zmYZ4lxB@)ugE<=tAQST0o3F;HQE zZv`c{|Nl6V{4v=7DF?tdI8HQ0f&f8)AV3fx2oMAa0t5kq06~BtKoEE|A@IbYJkYMz z7(6(5Vj!%Z*oE=`?Ft2+|NrNSJ4D5k*4JQ8^uI`!S|4IHY{&8n5mR3U$ zAP5iy2m%BFf&f8)AV3fx2oMAa0tA6a5dx{>uJB6&P{D7kO!pT_j?+c028|8f3t2^bqQ_DYd z)bDz#y(^@QPdh8cjF)rDZvN!B-}7XA&zraWjz^z>j*0!p4n19SOHOUsy-_UX7IS$? zdG?6^ipfLW*KpYH`&@h9)-^otcRkhKwPg*D`8`j@_YBtX>_IWpk?u21ikY_eZJp_u zm}z_0mYI%gCp>=lFU?l#ySF<5-h=3s|93?`o(gkKO`r@D;wp2YWB+#S?C8%%3nTwH^23q21OMs3UmhqQ(1-u;@b`v)f7l*Q41Is-ouTKUGyM?+ z2m%BFf&f8)AV3h<<`6JJ78n>mcKp!vQy`>NO65wuSXlEKxt!~Hb1w(UljM{vO)18d zqNim`vvh3+M1=u)93L#WMQ^E*Su5UZIOR;W;VsqN6{m-nNG8Y-eDq^oMwjd9m1|R)sx6#chNuScBv|-7%WX%Bg%11xi&*+ht=q5-o zeDv|2Mpu-yWEqmK%z!w5bx~`cZ_wWag zbbnB_r*u7StEy>ACde23!Na{hsHH8*mQ2G00g6BPcyA9%X-n26NiTsoHSpZAW6RTz zMJs=+9Loz%aRtQYd@)x)nW|*pbaVC8lH;Yym6TJj*NWLj-A$d0RuelJoCpRP)tb`G zw5pr7W|lxEv7fB~w<65{RIDxY11;4pyh9HPzH)yR=&jH?mp7!2wk5v>~eo z;2229e02vlaX7_D+qz`iatUN?KHTu84QEa%HqaJBSB(;g>U_AN=y39e9au;K2$zT^ z*_Nh{j{b+l$>GwTKF_oI|4Y2COos%B|9+>qT zS*N1{9Z!gkht}yRPuw~DJJ9j?P$F^U$wH-e6Ku-m#>&c(-)4^w3>at&h~He=0xVG-+X%ltdHR`=D4`)4MSH7*BhFfx2`wzteu~@^TN~q zqs!y#KdKiT7alFRa`-jy0@OnsOAI8INBIkObr`$T3l2X8FJNFf5?blub6+tu~26l6<4@rnK8d}&mcIA>MHia(a;N%zn?hVdO8oMM9-t4o`-)!w2p^b z6NgbJ#a??L)O-XLPHa&J_&oiif$AjIc{tn|wLYx#u-}hE<^q+dWOpjsO`W}WdwFPYLyVienof#z1QKy(kDrLZ*BuTQaTB>d7^2CQz6IrLcoblWn zeivI&ZCS-1yhhe5=GQXdrBla;&ts#L%ZZo0D5;7K+9(^_!FMB*D`P(izEx|Ld?V<1 zZsFx%ppskj2YOwWR9#jzQ?@Kqk~PIPB<#j!s#`MXfw2`^a=A?2sXI6cHivp61Cs+1 zEZ$|9nr1<@10@)zzE*Xy|J>zsv-1nsT(3E0FIWs~s=15BO4*;!<+)38W*U3uYHq$* z&xluG)2dtZpewu)Cse5|W}HTS3H+YKoT3_3Yz0!T$P(NIie2E-T3ml4YOb@A$u&Fy z-{xjD_oN2<)ZDAfsHl|{7)I7qU0H~GqVCp8nS60IldCkUuvmXpGAyI$d6{B47>_M| z$tjoJl?-@wfxRPBFP0FBvS}EKt{AEY$b$g#dkM^7z-*Lrm2#m7qZM2?OMfWY4YFS6p012-k2% zr8Qt0>Rti1!h@X%PD&GM9AcW5)@|sID7Mrn768+Oqsdn0z_zq3f~_K7X=GvKm0}4% z=R+UfgCOv#D=p}Zt-YXI$$1Mq0?6{NmlL}Y0xRENI51AVQWL*dYI%PKuo@F4!GJ4} zg^`iGn}sQbUIe}TBXk4LdSss&b~BQMRK`aO-q<`Dl?I$x26j&BGzD-~Qf$)}=pt*f zuBftUF>GUFcOYg(FTLHd zP8_p&(gfP9cV=)^Vpe?baOY%2XL!wU3+B^8iB#wUBjK~b!9i4Gq`AYkE)_zD32{S` zkmyf`XsCJ#tVd2xR_&bZ;xyPC7Auv!7aReGK7b-MKsg3R?GfC7Z#l)+EIJ%P&>SFbE7 z?A+tk5M8vYNS316>{#SL+zT*BJlfn8x9%eIgA+tx_%p+MoXs>$Gz6)DD^;!4v^>6~ z>Vz1Lg@sFin!LF=%~#IP%+Fn#zJ~2yIk;Z%t3Yfqt&6NrrBQ=xt^@nQCnO$_0krrG zQW+$y91OkO{LS8CS)67vq|Z!WP%K5ZbpzJQCW=7xii^nOFfNP1s!}U8RszNgn>;pm z>~wNrcK)^5`IGoGP|5ll&M#oefH_qBg9Stkz5v&$0QtqicLVW^gCaGqCvXycrb^AaaPz>~Ej6^L%-jG|;1hlB8{xjBK(@*Q{& z=MYNB3i3lbfrNlMiw{CA`&ofqU0*uq3h=F%T@%M5jd^-BaV*WP4JTOkvE$0OVnh;yTP+P+d zd0;Bfm>?X9ChT9#)3TKC*F=8 zl4D(M-a%V*u&qVkgP}{*_Adf0JDy`QO)T z3I2`bf0F-6{%^8-$n7DoL-IeDmRi{|rY<7+pX7gERW2B9lK+8fko-^bKT4LpXt7BC zkLu1^FB+2nvFN=$1)*CIy95?WFl&?iA5nFX{Eup@7(&uP0Z;aSvi}Re7LxzT{!jLQ zq@QH}_g!Fw@tW-afnONe|H=MO_J83?O7?$neiRBXvj2DUL?rwF1NlQn?Eg&u*CC|e zmUK-e`Jd!}^8e>i9OVBmf>N4Z$x+{D^8as%4GOt(k^evW|AV21eUkr4{wMh#4g$shQ~ZA_2s$VEzZnol@&6?M zbBp5#5&sYNf0pzf(j}>ePV#?5-Ld`B|3RubSj3i}gCbDbZ36N_fNb3~JbJdpqkL7Kl^nV~}scCIHW&dN_+cEo} zn$|%b&a)bYbBOkr{Ey+b5I{xpKi1gvQ|O>_w;(;KpCMDXRKr$mT{rPn3`IL+iGu(; zk(O+`3jZqTz}Y&Pxpp&>|FMn#V1oaF4w;+|HIx5wOzS`QpLWJiQO5A;5;zZH?mr_d zWnqMk%l*fv*G&D#zlSK|HqHDOz7l%O{KwMyg^n1|yx@v>NHYIvR(07xDzSB$|9sBF z&okz`E_TcO*ZIIo*`SOL7D$pI*@SW zKhS|KP5j5k+xWbHkP!1h^Zs?l;h1pU*t~!E28PD*dH;H#@S(haY&uQ0!at89?;ra$ zU>{1&ynj3+XunLzdUpTzlS6U;ELXBU(dzflYU>$>Y#R#s{YT_}@rC4nxXfedNo)8& z$^RnSJudxUnAs!wA5*roMDLLr5AQquKgs{CIhLj1)@AoJuL;2$n-_JzS1-x`;Q|;B zko=$I|9*1+ll&iK{wMiAF7rRh|C>`VBvqwr~X;D08P?r3A=KaPBK_|pD2_Ez`&Z17!Zq`#dBfqU16 zhZ5sYpZu{oeY{jDyKAEKE?8L7H*Z1F8>j5%Z~HotR*4HWoLMKXtR6gy#o#Js4+8+9 zx-XWF6p3D%l@|R?-Mc;oP?dC3*Cqe%z4yjGLdBqBXH!EJ zAw4?>6(76j>=pYRxoU{?U4%($R|Mqaw=x`x9>`#tx38hNb-<-Fs>G8@sB|utk6O-grzLg@|aPwwzYZAALZg z=-Rn+FD<`5ccE0QE!DNF+FJSb#oDxxJiCgbR~BEHt(r5QvZTzuTF~pp%9XjbYki0& zrtb&V#vW`tQS>+Ny>W>4yBq1-Is5(e*PlO#`+e#scVbjc&Ps88T@zCbwdtDtR?sbZ z-5{wy$jNC=s}C&>Qz~(-O_>!5HcUUZ0LryPPB2Y2Y+cr%B)JWKwM>t-xr(2y_v(Tx zmHKQ0+IVl53)b{rglUKY8!!DD8K*TW9B( z4vyWsIzs#X=z?}kOjPXpk%jjHbG$u$`rTEooW<3 zu62k^%(%Cz#TsiAxefq%wO2$k>dYbGQ3dy|BJJ;6AUfNz{z-3hu8P0RIA?P zO*gw#sVpOPT51#v4PSbT6HY#$ynptTYD3m6MY2@N`xk4~C;1J09v&!kRzqkVxndX- z@5!LRKwM+0B{ze54tNnSH&6NhVpYQsMB^3nYvO472em8Dh6O&eiy@^lf}KSpL91}b zV{I{&|H3I@DF2^V1|>VstHE&BR~-v+p5%YPtiTAgH8hjtf0F;ZW_Ts}e>=#5`N+TA%Fyn3;gd80!Yu3( z!-67X@k#z?$-aF=A^Shs|H=N3mj}uJl>a|9(Uaq9ySWq11A%>M& z>Cw&qr$GKcJ8f&GV)}_8VrAe*xpDj1|8+~TEnSu+3+{~KR@MJgw{QJFMb*{#G^!it z|8J-|nC4(Zgizc|{XYfzGYLCX?W(Q*A4_vU_5XmXn`sSE9xIt~(_J&#gV~fEZ;7Q~ z3@lBsBlttU(#Wp3nU!LxSPx03cs+R46=)MyC?UbKJ;^nK=#N;3E;J5CBsuYPVE@PM z?dbmxVbJjN`>y|oxQzox^8aIFGcKk>{(rk!{||zU6jRj|nN{{;I*c7$|4;Qnwt4mc zaO1g3HP%71VWIKtSB=4U)8`P8~$tYI%+#fM}QYi&CQAHSH?Pd zyhu=34nFtbJc@vkP{nSpKA|`OHr^)00Voe% z2e2ca#@OxR^#Je&9f@&#q*o7AJnRR6(`m97{+SEv0K%`VWHn1)Ld`Rg6oz zW-#&FG4mTLe)H)yaf*Krkxdl8A=4w0sSrSrezHWd-MDXowSndg-h-5(+jGJdKdJ{5 ztzo|lhGA=wSP9$+i|_s>2;Xv`w&j5vLWQUT1E4@2Oav8^8^w(!gjW24v$1a7ML`D9 z6M+$`lENqvM{E9eL5UQtwkWAKJ5(Dhd|$zNZCrr~F3g1u*@PcC6m|fHg z0dZr6Z;c7xl7T1Hl5GhTr6w+ahsfTHJE_5agwl5?)xwOTW!P~=Z0jzU0})_v+A6u6 zjkgJMxf~;xPiI~^KQljfY5E%OJ6@hmq!jk~^-ws5qmPlxIbJi=ikHiKAn#4Nyp`eL zbMT|XCYn47B=|)d9jR3nUz94_Sc8?XR2GA59Kh#13xA5Hv)UZIgCIecWlL~!G#Qiq zGV%w>!TpeU1Y@C5&S$d4l@+kWc*y#qz9vu*2y|k-e4@xMkPGKw8>VMw=B`}441xHl zBtisjK8cZ*`2PMmtXSnjr_)cmg$}v}V@fsB8W#RN{`@hd% zw(B1rLjQlV{|k{Ol*|rv@p#5Ung_^m7g5l+-isP!6WiT@3$0Q34 z96tf2ToBxmTd8;;%tw4L1?)LX;w$vX{!jLQmN`J2SHzMS6@+e){XZmUy^kCLWdAod zC9mqH<_PFb&p`HncAv+a&?0vcGZV$=8NhrA90A$?AKf?r)tXYxv|$>uEmIzVi2Z+E zv6G2KP3uC_nkOt4!pobL>h5H$L*6`BX1pQr0o5Kn4%W^8PyYXX=Km-8KQ+Pq37MEv zbMxTy9uimpDM|hpQXpqaoRtju|8vED#E%k`;Jx}Cpvd!3=KmxAfAas2DIX)Qbl(X9 z&=*O;rn(@I{67)4vy%MJDK3g@gKYD{4shQ& zNsS3R(1BK9XfpIcMCSd%mg1%|F61xOv8-mu;od*$NQl-e5@5sn`S{HU1cr?L2O!AP zoUU+`g8&2*0}$+7p@5Uvu=syAyJq|W`x+vd+cf$=d}r*S=>Ni-l|>{Y8ykSIUWEXk zHQ^^oFgIPJ|2uL8OO)}tm(4UMBU=?_zz5ufI0G#v&fsznaR&Y!AJ4z`YX4V(J;gH_A7{`56>r8Fv~m~z zsSD!&1w+Ca-AaQQJ}1CQpM}!)%ZtRi=m*-d*7HH~e+T*Bp31C*PlNI(|`KZPwx28FP3tP zxxA!CK`dY?WCEWHVepL4D$r3)#dpfz6cJ)ob4yNb86`zAK5lAHJKxF01qp9)6<^up zQ#gK<5j!S&i*^r%xW=Lwi|l1N_k@o&P8=_qb(14>3ohvhJz)|Qs+@kje1L#_=-!)0 zalcRg_*%?<-@FwZI=@kMd02f0BKUY`(GYt4)j3BquBk zT;a9zi#Hjo(-Eu@q?8q(QuKwD0q}5MJt8II$>w1g?DvU#*T(sNo6-GN@O%eT?r^*x zbibQQ+}<|a*4MI=m6zsbistl7Gasv5(QYhiEY7v34WFO2mfS1(?3{I_;GLUZ_Rbay ztIK_CxZY{MnSQzh6$*g&Fzxs6+;g7b`~6aMzZpFbru+f#_u9F0FD<`5ccE0QE!DNF z+FJSb#TtwA?rFcTEWR>ZHD^3!Ntu1Mpx2F+D|2ht`q*z_zuo@l`?v2oN4CL!=Vsg+ zOJ#X&{@Ts_)pIk8ud11Ay%Y4S*(Evq(rdT8nX|X%^jC9BXRdtw6{vs?bhbx=E+qEt zz2QL5C+<0iX}=-i5oZCK28HMazGL?Lx9&NQ(|$iX`~90J_g`7k7f6T$; zWs2n>1v~;tSoS;E^w8iH>JK>>Z2Iz&|3CTvgRWK% zSFsYVKKy8koco&_{ZH~g$^ZTNH!}-7`Tv7mVAIO~k^g^e6c_pbzc>qCFrNfgqwD~` zTXk}btEg0~-sDX;yHu$xqafAw#@L#aNKIF2DJ9!eD)|5FX-l$gO(FR|kp1xk<^Kb$ z?ZN!-XV~42;s2EXkMjSmxKMSb1cCKY27gnd|1k}G$LN1>i-n)xclbZzcANPh8@rnS zDgQs^{|BWBKMx!m`;Y(cp>p(MrILr}AlGAc6j+o4L<_O{W2i7hAyZti&Q2zf7(g0BY{~hZAAmT^1Dggi+ZxiDG6&6FmIc<@nqR06E&J1j?@&E7! z%K!i9=Kt5EDM?M6;1nuJB>!)F`5zL3N|tUiGj%S9euj=SRBg{ zj@d2JKX2?V{ew9itMw{Shf`A}6Ap>WWV4;^{4Z-75Ca6+<|Y5bZRc!WOrQ`_3d|Jh z<1z0iqB?M1%*4lXTnLeQJ(n7Edk|hYzI!BEwiTZ3i6I|UEabVFWYaL9>LYG98y}zL z;x*#p1IYe1o{ty!6mU_(m$6Q)KQNqL^ zo@*tzK5!?6NFhrczwyTK834c7fvQNBqS-cL8B1Cx5Jx-WfCh!BCL-PqQsWw`G=c zbrUlGDV!whilISfR}*}{El!pnJl;e9O!Zfb#AA18! zyBDALPwh^@n|6ObohD=9pE*D8AO95(KVu+Dp}c>Lt>_SgXm?^mV*O$|o{m-p@&xl; zWeI;OwbCOg_sc2CoRah@MTVT&67W4F_ebP@@n!qj{}lr&SiwFJk#7?Jfg6>9k|D}(8~Gpi zx1;cgm}<0&R%^ghD`*(z>ZCwnYj7)B)5g`9`kplc{qHFGq+mCtlN_hSgPqzT~Iw$Hl~E;0LQ8#@gBizTCz2vmh`bwM#wp8sv@ig4~gp zec3_+tB|D{n`?bnz#YQmHm!cd@4$Q#;y2IUVni&PX_SksB-;at-;C(SXX*!eFdr@N zmPW*BTKz<3`^)pwXJ<1DS7v5r7Zz|*{YK)qy`aDFdp516;=(r+rnL>Tz_S669oSSK z{Pk_{z53v@x%vm`Mv&JljN^*I_o$HZi|`@hD@OCeY`yP#tBAn(A5Mi{gtE|33&UWn zX-H7Wz|H7j-9N-55ysq! zq3UFS*SG`$hqfLyz~i6>A2eiuXNdR!Li!mdPcSO07!bH=Fd^MB^Dx2&n&8>=nm7f8 zL;z+XvWW%wB!yjyA%GwU$P&$W3yKt2%ZUufSfmWyo@4buvWua6hhdZRmYadru;L{q z{T^+uC`Lb{LM zTu4820mKj!NUs(0WY}0C9m-^bkS;+wEy;lLJrYn|rexhAGCJc%%*~S`p5wBDh12qn?G|$4HqUl^|#X1Ow84JA2;pAwtY}h;~5vm9If)x&2Xq5Ar zY;k1;A~8K=u0L^m!U^;O0-abdpN(f1$c6Klrms!U%*mU`I+cCD_{mTD_1-e{YHZ;unylL{kvTe=pmaM3o z>wOQp7aLUReJ9`VN&aU%?be&$+vJQbmi_^kTV;RpzOP5}$jOM=3_aK#>>J-%V`r&f zLx(qJ0s;mTRf(!)f$N+C7I;Hpx+ZqF9@NGsL@O z*cxg8?3}f+X8Nv9_!}qv16Vd&_K)5e8*ZTgKxBVFKW|mh$|c0Xt&{zI#-tzFABaFl zo}f+k=baUl{b49D&>OmKTAIvgHZx!0DA@l8#ttUnANnH* z5CjMU1Ob8oL4Y7Y5FiK;1PB5Ifky}e-#GNtP-6V)Q$Ia1eY{jDyKBW#ZZVgaRM6U| zK~Vw~hMcmSzwM{@5e{a;q=Pz%Hm_f<8F9+XAuqM2eTXY+{HPa^@uKFIoZ2!vKZx=1 z&F(5XY3E%?D&+d^-O*_=6-#H!5ziUJLo%2)6LhUujG%%8b=Pa4PCk?ehEJToqF65A hOKwjve>|4ENO& B[FastAPI /webhook/{namespace}] +B --> C[Router] +C --> D[Targets Forwarder] +B --> E[Event Template] +E --> F[Channels Sender] +D --> G[External Webhook Targets] +F --> H[Feishu] +F --> I[WeCom] +``` + +## 模块分层 +- 接入层:`app/main.py`(FastAPI 入口与 Webhook 接收) +- 管理后台:`app/admin.py`(端点、规则、模板与渠道的增删改查) +- 领域模型与持久化:`app/db.py`, `app/models.py` +- 规则引擎:`app/services/engine.py`(树形规则 + 动作编排) +- 通知发送:`app/services/notify.py` +- 统计与日志:`app/services/stats.py`, `app/logging.py` + +## 关键接口 +- `engine.process(endpoint_id, payload) -> (routed: List, notified: List)` +- `_exec_forward(target, payload) -> Result`(内部使用) +- `_exec_notify(channel, msg) -> Result`(内部使用) +- 管理接口:`/admin/*` 用于维护端点、规则树、动作、模板与渠道 + +## 数据契约 +- 输入:示例 JSON 结构,关键字段存在于顶层与 `trans_order_info` +- 输出:`{"namespace": str, "routed": [...], "notified": [...]}`,其中: + - `routed`:每个转发目标的执行摘要(目标名、是否成功、失败原因) + - `notified`:每个通知渠道的执行摘要(渠道名、是否成功、失败原因) + +## 异常策略 +- 超时与网络错误重试;失败记录结构化日志 +- 非致命错误不影响其他目标或渠道发送 + +## 部署与运行(概要) +- 本地直接运行: + - 创建并激活虚拟环境(可选) + - 执行 `pip install -r requirements.txt` + - 启动服务:`uvicorn app.main:app --host 0.0.0.0 --port 8080` +- Docker 部署: + - 构建镜像:`docker build -t webhook-relay .` + - 运行容器示例: + - `docker run -d --name webhook-relay -p 8080:8080 webhook-relay` + - 默认使用 SQLite 数据库,路径由环境变量 `DB_PATH` 控制,默认值为 `sqlite:///./config/data.db` + - 如需持久化,可将宿主机目录挂载到容器内 `/app/config` 目录 + diff --git a/docs/webhook-relay/FLOWCHART.md b/docs/webhook-relay/FLOWCHART.md new file mode 100644 index 0000000..586e242 --- /dev/null +++ b/docs/webhook-relay/FLOWCHART.md @@ -0,0 +1,191 @@ +# Webhook 中继系统流程图 + +## 1. 系统宏观架构图 + +```mermaid +graph TB + subgraph External["外部世界"] + Sender["发送方 (支付宝/微信/业务系统)"] + User["管理员"] + TargetSys["目标系统 (ERP/BI)"] + NotifyApp["IM工具 (飞书/企微)"] + end + + subgraph WebhookRelay["Webhook中继服务 (Docker)"] + direction TB + + subgraph Interface["接入层"] + API["FastAPI (端口:8080)"] + AdminUI["Web管理后台"] + end + + subgraph Core["核心逻辑层"] + Parser["数据解析 (Pydantic)"] + Router["路由引擎 (Routing)"] + Notifier["通知引擎 (Notification)"] + Logger["日志审计 (Logging)"] + end + + subgraph Data["数据存储层"] + DB[(SQLite 数据库)] + Config["Config Loader (DB优先 + YAML回退)"] + end + end + + Sender -->|POST JSON| API + User -->|浏览器访问| AdminUI + AdminUI -->|CRUD配置| DB + + API --> Parser + Parser --> Router + Parser --> Notifier + + Router <-->|查询规则| Config + Notifier <-->|查询模板| Config + Config <-->|读取| DB + + Router -->|转发请求| TargetSys + Notifier -->|推送消息| NotifyApp + + API -->|异步写入| Logger + Logger -->|持久化| DB +``` + +## 2. 详细数据处理流程 + +此图展示了一个 Webhook 请求从进入系统到完成分发与记录的完整生命周期。 + +```mermaid +sequenceDiagram + participant Ext as 外部系统 + participant API as FastAPI入口 + participant DB as SQLite数据库 + participant Router as 路由服务 + participant Target as 目标系统 + participant Notifier as 通知服务 + participant Channel as IM渠道(飞书/企微) + participant Log as 日志服务 + + Note over Ext, API: 1. 请求接入 + Ext->>API: POST /webhook/{namespace} (Payload) + + activate API + API->>DB: 校验 Namespace 是否有效/启用 + alt Namespace 无效/禁用 + API-->>Ext: 403 Forbidden + else Namespace 有效 + API->>API: 解析 Payload (remark, event_no, trans_amt...) + + par 2. 并行处理 - 转发 (Relay) + API->>Router: 使用规则引擎匹配路由规则 + Router->>DB: 查询 ProcessingRule/RuleAction/Target + Router-->>API: 返回目标列表 [Target A, Target B] + + loop 对每个目标 + API->>Target: POST /target_url (异步 + 重试机制) + Target-->>API: 响应 (200 OK / 500 Error) + end + + and 3. 并行处理 - 通知 (Notify) + API->>Notifier: notify(event_no, payload) + Notifier->>DB: 查询 ProcessingRule/RuleAction/MessageTemplate/NotificationChannel + Notifier->>Notifier: 渲染模板 ("【{biz_name}】{pay_method_name} 收款 {trans_amt} 元,状态:{cash_resp_desc},日期:{trans_date}") + Notifier-->>API: 返回消息文本 & 渠道列表 + + loop 对每个渠道 + API->>Channel: POST Webhook (发送消息) + Channel-->>API: 响应结果 + end + end + + Note over API, Log: 4. 收尾工作 + API->>Log: BackgroundTask: 保存 RequestLog & DeliveryLog + Log->>DB: INSERT request_logs, delivery_logs + + API-->>Ext: 200 OK (包含 routed/notified 摘要) + end + deactivate API +``` + +## 3. 数据库实体关系图 (ERD) + +展示了用于支撑上述流程的数据库模型结构。 + +```mermaid +erDiagram + WebhookEndpoint { + int id PK + string namespace "URL路径标识" + string description + bool is_active + datetime created_at + } + + ProcessingRule { + int id PK + int endpoint_id FK + int parent_rule_id FK "父规则ID,可为空" + int priority "优先级,高优先级先匹配" + string match_field "如 remark / event_define_no" + string operator "eq/neq/contains/regex" + string match_value "匹配值" + } + + RuleAction { + int id PK + int rule_id FK + string action_type "forward/notify" + int target_id FK "转发目标,可空" + int channel_id FK "通知渠道,可空" + int template_id FK "消息模板,可空" + json template_vars "模板变量,键值对" + } + + Target { + int id PK + string name + string url + int timeout_ms + } + + NotificationChannel { + int id PK + string name + string channel_type "feishu/wecom" + string webhook_url + } + + MessageTemplate { + int id PK + string name "模板名称" + text template_content "模板内容" + } + + RequestLog { + int id PK + string namespace + string remark "来源标识" + string event_no "事件类型" + json raw_body "原始数据" + datetime received_at + string status "success/error" + } + + DeliveryLog { + int id PK + int request_id FK + string target_name + string type "relay/notify" + string status "success/failed" + text response_summary + datetime created_at + } + + WebhookEndpoint ||--|{ ProcessingRule : "拥有多条规则" + ProcessingRule ||--|{ ProcessingRule : "树形子规则" + ProcessingRule ||--|{ RuleAction : "每条规则包含多个动作" + RuleAction }o--|| Target : "转发到目标" + RuleAction }o--|| NotificationChannel : "推送到渠道" + RuleAction }o--|| MessageTemplate : "使用消息模板" + RequestLog ||--|{ DeliveryLog : "包含多条分发记录" +``` diff --git a/docs/webhook-relay/TASK_[webhook-relay].md b/docs/webhook-relay/TASK_[webhook-relay].md new file mode 100644 index 0000000..f395fab --- /dev/null +++ b/docs/webhook-relay/TASK_[webhook-relay].md @@ -0,0 +1,27 @@ +# 原子任务清单 + +## T1 配置加载 +- 输入:`config/config.yml` +- 输出:内存配置对象与查询API +- 验收:能返回目标、remark与事件模板 + +## T2 模型解析 +- 输入:请求JSON +- 输出:`IncomingPayload` 对象 +- 验收:示例JSON解析字段正确 + +## T3 路由与转发 +- 输入:`remark` 与目标列表 +- 输出:并发POST结果数组 +- 验收:`imcgcd03` 路由到 `target_3` + +## T4 通知消息 +- 输入:`event_define_no` 与负载 +- 输出:消息文本与渠道推送结果 +- 验收:`pay.ali_scaned` 生成正确文本并推送 + +## T5 接口与容器 +- 输入:应用代码 +- 输出:FastAPI端点与Dockerfile +- 验收:健康检查与示例数据POST通过 + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c8b674b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +fastapi==0.115.5 +uvicorn[standard]==0.32.0 +httpx==0.27.2 +pydantic==2.9.2 +PyYAML==6.0.2 +tenacity==9.0.0 +python-json-logger==2.0.7 +sqlalchemy==2.0.25 +jinja2==3.1.3 +python-multipart==0.0.9 diff --git a/samples/incoming.json b/samples/incoming.json new file mode 100644 index 0000000..8157bec --- /dev/null +++ b/samples/incoming.json @@ -0,0 +1 @@ +{"trans_date":"20251220","batch_id":"251220","bank_seq_id":"923913","trans_amt":"5.50","remark":"imcgcd03","event_define_no":"pay.ali_scaned","acct_id":"F03291269","is_div":"1","bank_order_no":"672025122022001416951449996798","fee_flag":2,"trans_order_info":{"maze_resp_code":"","fee_real_acct_id":"F03291269","agent_id":"6666000153839851","bank_seq_id":"923913","subsidy_stat":"I","acct_id":"F03291269","ref_cnt":0,"product_id":"HSK","bank_mer_id":"A1035134718870142962","id":2628973110,"icc_data":"","atu_sub_mer_id":"2088550839201829","trans_stat":"S","region_id":"TOP4_B","credit_type":"","version":2,"org_auth_no":"","cash_resp_desc":"成功","bagent_id":"6666000153839851","real_cust_id":"6666000153923913","req_seq_id":"T20251220171836S6666000153923913","term_div_coupon_type":1,"iss_inst_id":"","channel_finish_time":1766222307000,"channel_stat":"S","unconfirm_fee_amt":0.01,"real_gate_id":"Dx","db_unit":"4","sys_trace_audit_num":"","trans_date":"20251220","batch_id":"251220","credit_fee_amt":0.00,"bank_resp_code":"TRADE_SUCCESS","real_acct_id":"F03291269","channel_message":"TRADE_SUCCESS","settle_trans_stat":"","mypaytsf_discount":0.00,"unconfirm_amt":5.49,"org_huifu_seq_id":"","cash_resp_code":"000","double_limit_amt":0.00,"fee_split_type":"","fee_real_cust_id":"6666000153923913","maze_resp_desc":"","fee_amt":0.01,"creator":"","ord_amt":5.50,"acct_stat":"I","debit_fee_amt":0.00,"cash_req_date":"20251220171826","ref_num":"171826923913","out_trans_id":"672025122022001416951449996798","fee_acct_id":"F03291269","is_acct_div_param":0,"subsidy_ref_amt":0.00,"mer_name":"高新区益选便利店(个体工商户)","pay_scene":"02","time_expire":"20251220172836","modifier":"","channel_code":"00","mcc":"","close_trans_stat":"","ref_fee_amt":0.01,"trans_finish_time":1766222309000,"org_trans_date":"","bank_resp_desc":"TRADE_SUCCESS","pay_channel_id":"00001249","create_time":1766222306000,"pay_amt":5.50,"org_acct_id":"F03291269","hf_seq_id":"002900TOP4B251220171826P546ac136a7d00000","goods_desc":"1","is_route":"","is_delay_acct":0,"settle_amt":5.50,"ref_amt":5.50,"gate_id":"SPIN022","pay_channel":"A","huifu_id":"6666000153923913","subsidy_amt":0.00,"fee_huifu_id":"6666000153923913","maze_bg_seq_id":"","maze_bg_date":"","modify_time":1766222308000,"check_cash_flag":"I","remark":"imcgcd03","is_acct_div":1,"real_pay_type":"1013","sys_id":"6666000132082499","card_channel_type":"","is_deleted":0,"fee_flag":2,"cash_trans_id":"2025122024rg0396","pa_mer_id":"SSP001","pay_type":"MICROPAY","sn_code":"","channel_type":"U","source_region_id":"TOP4_B","trans_type":"1000","mer_ord_id":"T20251220171836S6666000153923913","ord_id":"202512201718260TOP4_BL2797202899","card_sign":"","fee_formula":"","fee_source":"'SERVER'","party_order_id":"03242512206230681909127","bank_mer_name":"高新区益选便利店(个体工商户)","cashier_version":"V2","un_scene_info":"","maze_pnr_dev_id":"","req_date":"20251220","route_region_id":"C24_A","fee_rec_type":1},"settlement_amt":"5.50","acct_split_bunch":{"acct_infos":[{"acct_id":"F03291269","huifu_id":"6666000153923913","div_amt":"5.50"}],"fee_acct_id":"F03291269","fee_huifu_id":"6666000153923913","fee_amt":"0.01"},"trans_type":"A_MICROPAY","mer_ord_id":"T20251220171836S6666000153923913","trans_stat":"S","end_time":"20251220171827","hf_seq_id":"002900TOP4B251220171826P546ac136a7d00000","ref_no":"171826923913","trans_time":"171826","alipay_response":{"coupon_fee":"0.00","buyer_logon_id":"182****2451","buyer_id":"2088902514116953","app_id":"","fund_bill_list":[{"amount":"5.50","fund_channel":"ALIPAYACCOUNT"}]},"fee_amount":"0.01","out_trans_id":"672025122022001416951449996798","party_order_id":"03242512206230681909127","is_delay_acct":"0","fee_formula_infos":[{"fee_formula":"AMT*0.0025","fee_type":"TRANS_FEE"}],"namespace":"opps-webhook","huifu_id":"6666000153923913","mer_name":"高新区益选便利店(个体工商户)"} diff --git a/scripts/verify_logic.py b/scripts/verify_logic.py new file mode 100644 index 0000000..795c8b5 --- /dev/null +++ b/scripts/verify_logic.py @@ -0,0 +1,22 @@ +import os +import json +from app.config import load_config +from app.models import IncomingPayload, IncomingOrderInfo +from app.services.relay import route_targets +from app.services.notify import render_message + +def main(): + cfg = load_config() + path = os.path.abspath(r"e:\2025Code\python\webhock\123.josn") + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + if isinstance(data.get("trans_order_info"), dict): + data["trans_order_info"] = IncomingOrderInfo(**data["trans_order_info"]) # type: ignore + payload = IncomingPayload(**data) + targets = route_targets(cfg, payload.remark or "") + msg = render_message(cfg, payload) + print("targets:", [t.get("name") for t in targets]) + print("message:", msg["text"]) + +if __name__ == "__main__": + main() diff --git a/scripts/verify_message_only.py b/scripts/verify_message_only.py new file mode 100644 index 0000000..daa140c --- /dev/null +++ b/scripts/verify_message_only.py @@ -0,0 +1,30 @@ +import json +import os + +def main(): + path = os.path.abspath(os.path.join(os.getcwd(), "samples", "incoming.json")) + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + remark = data.get("remark") + event = data.get("event_define_no") + trans_amt = data.get("trans_amt") + cash_resp_desc = "" + toi = data.get("trans_order_info") or {} + cash_resp_desc = toi.get("cash_resp_desc") or "" + if event == "pay.ali_scaned": + msg = f"支付宝收款{trans_amt}元,状态:{cash_resp_desc}" + elif event == "pay.wx_scaned": + msg = f"微信收款{trans_amt}元,状态:{cash_resp_desc}" + elif event == "wechat.complaint": + msg = "⚠️请注意,您有新的微信投诉,请注意查看" + elif event == "refund.standard": + actual_ref_amt = data.get("actual_ref_amt") or (toi.get("ref_amt") or data.get("settlement_amt")) + msg = f"退款成功,退款金额:{actual_ref_amt}" + else: + msg = "" + target = "target_3" if remark == "imcgcd03" else ("target_2" if remark == "imcgcd02" else "") + print("message:", msg) + print("route_target:", target) + +if __name__ == "__main__": + main() diff --git a/templates/admin/base.html b/templates/admin/base.html new file mode 100644 index 0000000..a79dbc7 --- /dev/null +++ b/templates/admin/base.html @@ -0,0 +1,51 @@ + + + + + + {% block title %}Webhook管理后台{% endblock %} + + + + +
+ + {% if system_stats %} +
+
+ 运行中 + 已运行: {{ system_stats.uptime }} + 今日请求: {{ system_stats.today_count }} +
+
+ 最新日志: {{ system_stats.latest_log }} +
+
+ {% endif %} + +
+ + Webhook中继管理 + + +
+ +
+ {% block content %}{% endblock %} +
+
+ + {% block scripts %}{% endblock %} + + diff --git a/templates/admin/channels.html b/templates/admin/channels.html new file mode 100644 index 0000000..3bf4317 --- /dev/null +++ b/templates/admin/channels.html @@ -0,0 +1,124 @@ +{% extends "admin/base.html" %} +{% block title %}通知渠道 - Webhook{% endblock %} +{% block content %} +
+
+

通知渠道列表

+
+
+ +
+
+ + + + + + + + + + + + + + + {% for c in channels %} + + + + + + + + {% endfor %} + +
ID名称类型Webhook URL操作
{{ c.id }}{{ c.name }} + {% if c.channel_type == 'feishu' %} + 飞书 + {% else %} + 企业微信 + {% endif %} + {{ c.webhook_url }} + +
+ + +
+
+ + + +{% endblock %} + +{% block scripts %} + +{% endblock %} diff --git a/templates/admin/dashboard.html b/templates/admin/dashboard.html new file mode 100644 index 0000000..62fb7ec --- /dev/null +++ b/templates/admin/dashboard.html @@ -0,0 +1,196 @@ +{% extends "admin/base.html" %} +{% block title %}仪表盘 - Webhook{% endblock %} +{% block content %} +
+
+
+
+
今日请求总数
+

{{ system_stats.today_count }}

+
+
+
+
+
+
+
系统运行时间
+

{{ system_stats.uptime }}

+
+
+
+
+
+
+
最新活动
+

{{ system_stats.latest_log or '暂无数据' }}

+
+
+
+
+ +
+
+ +
+
+
⚡ 模拟数据推送
+ 测试您的规则配置 +
+
+
+ + + {% if not endpoints %} +
请先创建端点
+ {% endif %} +
+
+ + + +
+ +
+
+
执行结果:
+

+                    
+                
+
+
+ +
+
+
快速开始
+
+
+

欢迎使用 Webhook 中继平台。请按照以下步骤配置您的第一个流程:

+
    +
  1. +
    +
    添加资源
    + 配置转发目标 (Target) 或通知渠道 (Channel)。 +
    + 去配置 +
  2. +
  3. +
    +
    创建端点
    + 定义一个 Webhook 接收地址 (Namespace)。 +
    + 去创建 +
  4. +
  5. +
    +
    设置规则
    + 在端点详情页添加入站匹配规则和动作。 +
    +
  6. +
+
+
+
+ +
+
+
+
系统状态
+ Online +
+
+
    +
  • + API 基础地址: + http://{your-host}:8090/webhook/ +
  • +
  • + 文档: + Swagger UI / Redoc +
  • +
  • + 当前版本: v2.2.0 (Tree Rule Engine) +
  • +
+
+
+
+
+{% endblock %} + +{% block scripts %} + +{% endblock %} diff --git a/templates/admin/endpoint_detail.html b/templates/admin/endpoint_detail.html new file mode 100644 index 0000000..0370649 --- /dev/null +++ b/templates/admin/endpoint_detail.html @@ -0,0 +1,467 @@ +{% extends "admin/base.html" %} +{% block title %}配置端点流程 - Webhook{% endblock %} +{% block content %} +
+
+

配置端点流程: {{ ep.namespace }}

+

{{ ep.description or '无描述' }}

+
+ +
+ +
+ 规则引擎说明 (树状逻辑): + 规则自上而下、从外向内匹配。只有父规则匹配成功,才会检查子规则。
+ 子规则会继承父规则设置的模板变量(如支付方式名称)。 +
+ + +{% macro render_rule(rule, level=0) %} +
+
+
+ {% if level > 0 %} + ↳ 并且 + {% else %} + 根条件: + {% endif %} + + 字段 {{ rule.match_field }} + {{ rule.operator or '等于' }} + {{ rule.match_value }} +
+
+
+ + + +
+ + + +
+
+
+
+
+ {% if rule.actions %} +
+ {% for action in rule.actions %} +
+
+ {% if action.action_type == 'forward' %} + 转发 + 目标: {{ action.target.name }} + {% else %} + 通知 + 渠道: {{ action.channel.name }} | 模板: {{ action.template.name }} + {% if action.template_vars %} + {{ action.template_vars }} + {% endif %} + {% endif %} +
+
+ +
+ + + +
+
+ + + +
+
+
+ {% endfor %} +
+ {% endif %} + +
+ + +
+
+
+ + +{% for child in rule.child_rules %} + {{ render_rule(child, level + 1) }} +{% endfor %} + +{% endmacro %} + + +{% for rule in root_rules %} + {{ render_rule(rule) }} +{% endfor %} + + +
+
+ +
+
+ + + + + + + + + + + + + + + +{% endblock %} + +{% block scripts %} + +{% endblock %} diff --git a/templates/admin/endpoints.html b/templates/admin/endpoints.html new file mode 100644 index 0000000..e4b834d --- /dev/null +++ b/templates/admin/endpoints.html @@ -0,0 +1,124 @@ +{% extends "admin/base.html" %} +{% block title %}端点管理 - Webhook{% endblock %} +{% block content %} +
+
+

接收端点 (Endpoints)

+
+
+ +
+
+ + + + + + + + + + + + + + + + + {% for e in endpoints %} + + + + + + + + + + {% endfor %} + +
IDNamespace完整URL示例描述状态创建时间操作
{{ e.id }}{{ e.namespace }} + /webhook/{{ e.namespace }} + + {{ e.description or '-' }} + {% if e.is_active %} + 启用 + {% else %} + 禁用 + {% endif %} + {{ e.created_at.strftime('%Y-%m-%d %H:%M') }} + ⚙️ 配置 +
+ + {% if e.is_active %} + + {% else %} + + {% endif %} +
+
+ + +
+
+ + + + + +{% endblock %} diff --git a/templates/admin/logs.html b/templates/admin/logs.html new file mode 100644 index 0000000..b7c9e6d --- /dev/null +++ b/templates/admin/logs.html @@ -0,0 +1,123 @@ +{% extends "admin/base.html" %} +{% block title %}系统日志 - Webhook{% endblock %} +{% block content %} +
+
+

请求日志 (最近100条)

+
+
+
+ +
+
+
+ +
+ {% for log in logs %} +
+

+ +

+
+
+
+
+
+
原始 Payload
+ +
+
{{ log.raw_body | tojson(indent=2) }}
+
+
+
分发记录
+
    + {% for d in log.delivery_logs %} +
  • +
    +
    + {% if d.type == 'relay' %} + 转发 + {% else %} + 通知 + {% endif %} + {{ d.target_name }} +
    + {{ d.response_summary }} +
    + {% if d.status == 'success' %} + 成功 + {% else %} + 失败 + {% endif %} +
  • + {% endfor %} + {% if not log.delivery_logs %} +
  • 无分发记录
  • + {% endif %} +
+
+
+
+
+
+ {% endfor %} +
+ +{% if not logs %} +
+ 暂无日志记录 +
+{% endif %} +{% endblock %} + +{% block scripts %} + +{% endblock %} diff --git a/templates/admin/targets.html b/templates/admin/targets.html new file mode 100644 index 0000000..8dc972a --- /dev/null +++ b/templates/admin/targets.html @@ -0,0 +1,111 @@ +{% extends "admin/base.html" %} +{% block title %}目标管理 - Webhook{% endblock %} +{% block content %} +
+
+

转发目标列表

+
+
+ +
+
+ + + + + + + + + + + + + + + {% for t in targets %} + + + + + + + + {% endfor %} + +
ID名称URL超时(ms)操作
{{ t.id }}{{ t.name }}{{ t.url }}{{ t.timeout_ms }} + +
+ + +
+
+ + + +{% endblock %} + +{% block scripts %} + +{% endblock %} diff --git a/templates/admin/templates.html b/templates/admin/templates.html new file mode 100644 index 0000000..ff2279c --- /dev/null +++ b/templates/admin/templates.html @@ -0,0 +1,103 @@ +{% extends "admin/base.html" %} +{% block title %}消息模板 - Webhook{% endblock %} +{% block content %} +
+
+

消息模板资源

+
+
+ +
+
+ + + + + + + + + + + + + + {% for t in templates %} + + + + + + + {% endfor %} + +
ID模板名称内容预览操作
{{ t.id }}{{ t.name }}{{ t.template_content }} + +
+ + +
+
+ + + +{% endblock %} + +{% block scripts %} + +{% endblock %}