commit 2bc7460f1f112dd07211369b05e70776dc8829c7 Author: houhuan Date: Sun Dec 21 18:43:12 2025 +0800 feat: 初始化Webhook中继系统项目 - 添加FastAPI应用基础结构,包括主入口、路由和模型定义 - 实现Webhook接收端点(/webhook/{namespace})和健康检查(/health) - 添加管理后台路由和模板,支持端点、目标、渠道和模板管理 - 包含SQLite数据库模型定义和初始化逻辑 - 添加日志记录和统计服务 - 包含Dockerfile和配置示例文件 - 添加项目文档,包括设计、流程图和验收标准 diff --git a/.trae/documents/Webhook中继系统 - Admin后台升级计划.md b/.trae/documents/Webhook中继系统 - Admin后台升级计划.md new file mode 100644 index 0000000..015c53c --- /dev/null +++ b/.trae/documents/Webhook中继系统 - Admin后台升级计划.md @@ -0,0 +1,32 @@ +# Webhook中继系统 - Admin后台升级计划 + +## 1. 核心变更:数据库驱动配置 +- 引入 **SQLite + SQLAlchemy** 替代静态 `config.yml`。 +- 存储模型: + - `Target`:转发目标Webhook(名称、URL、超时)。 + - `RemarkRule`:remark值到目标列表的映射(imcgcd03 -> [Target A, Target B])。 + - `EventTemplate`:事件模板(event_define_no -> 模板文本)。 + - `NotificationChannel`:通知渠道(类型feishu/wecom、Webhook URL、名称)。 + - `EventChannelBinding`:事件到渠道的绑定(pay.ali_scaned -> [Channel A, Channel B])。 + +## 2. Admin后台界面 +- 基于 **FastAPI + Jinja2 + Bootstrap** 提供简易管理UI。 +- 功能页面: + - **目标管理**:增删改查转发目标。 + - **路由规则**:配置 `remark` 对应的目标列表。 + - **通知模板**:编辑不同事件的通知文案。 + - **通知渠道**:添加飞书/企业微信机器人链接。 + - **通知绑定**:将事件与通知渠道关联。 + +## 3. 系统架构调整 +- `app/db.py`:数据库连接与模型定义。 +- `app/admin.py`:管理后台路由与CRUD逻辑。 +- `app/config.py`:改为混合模式,优先从DB加载动态配置,保留文件配置作为默认值/种子数据。 +- `app/services/relay.py` & `notify.py`:适配新的DB驱动配置查询。 + +## 4. 实施步骤 +1. **依赖更新**:添加 `sqlalchemy`、`jinja2`、`python-multipart`。 +2. **数据层实现**:定义ORM模型与初始化脚本(支持从现有yaml导入种子数据)。 +3. **后台UI实现**:创建HTML模板与FastAPI路由。 +4. **配置适配**:改造 `load_config` 逻辑,支持热重载或每次请求查询DB。 +5. **构建与部署**:更新Dockerfile,重新构建并运行,验证后台配置生效。 diff --git a/.trae/documents/Webhook中继系统 - 增强功能实施计划.md b/.trae/documents/Webhook中继系统 - 增强功能实施计划.md new file mode 100644 index 0000000..b7f580d --- /dev/null +++ b/.trae/documents/Webhook中继系统 - 增强功能实施计划.md @@ -0,0 +1,47 @@ +# Webhook中继系统 - 增强功能实施计划 + +## 1. 核心变更:日志审计与多端点管理 +为了满足多Webhook接收管理、系统状态监控及日志审计需求,将进行以下数据库与逻辑扩展: + +### 1.1 数据库模型扩展 (`app/db.py`) +- **`WebhookEndpoint`**:管理接收端点(Namespace)。 + - 字段:`namespace` (唯一标识), `description` (备注), `created_at`, `is_active`。 +- **`RequestLog`**:记录入站请求。 + - 字段:`id`, `namespace`, `remark`, `event_no`, `raw_body` (JSON), `received_at`, `status` (success/error)。 +- **`DeliveryLog`**:记录出站转发与推送结果。 + - 字段:`request_id` (FK), `target_name` (目标/渠道名), `type` (relay/notify), `status`, `response_summary`, `created_at`。 + +### 1.2 业务逻辑增强 (`app/main.py`) +- **端点校验**:`POST /webhook/{namespace}` 将校验 namespace 是否在 `WebhookEndpoint` 表中注册(未注册的可选拒绝或标记为匿名)。 +- **日志持久化**:请求处理完成后,将入站数据与出站结果(routed/notified)异步写入 SQLite 数据库。 +- **状态统计**:内存记录系统启动时间,实时计算运行耗时;从 DB 统计今日接收量。 + +## 2. Admin后台功能升级 +### 2.1 界面改造 (`templates/admin/*.html`) +- **顶部状态栏 (`base.html`)**:增加系统运行状态指示器(运行时间、今日请求数、最新一条日志时间)。 +- **端点管理 (`endpoints.html`)**:新增页面,支持创建、启用/禁用接收端点(Namespace)。 +- **日志审计 (`logs.html`)**: + - 请求列表:按时间倒序展示,显示 namespace, remark, event。 + - 详情视图:点击查看完整 JSON 负载及对应的所有分发记录(目标+渠道)。 + - 清理功能:一键清空历史日志。 + +### 2.2 后台逻辑 (`app/admin.py`) +- 新增 `/admin/endpoints` 路由:CRUD 操作。 +- 新增 `/admin/logs` 路由:分页查询日志。 +- 新增 `/admin/stats` 接口(或模板上下文):提供状态栏数据。 + +## 3. 实施步骤 +1. **更新 DB 模型**:修改 `app/db.py` 添加新表。 +2. **实现日志与端点逻辑**: + - 更新 `app/main.py` 接入日志记录。 + - 创建 `app/services/stats.py` 简单的统计服务。 +3. **开发前端页面**: + - 修改 `base.html` 添加状态栏与菜单。 + - 创建 `endpoints.html` 和 `logs.html`。 +4. **开发后台路由**:在 `app/admin.py` 实现对应视图。 +5. **验证**: + - 创建新 Endpoint `demo_input`。 + - 发送测试数据,确认日志生成且状态栏更新。 + - 验证“日志详情”能清晰展示接收到的 JSON 和分发结果。 + +此方案在保留现有轻量级架构的同时,补全了运维监控与多入口管理能力。 diff --git a/.trae/documents/Webhook中继系统 - 核心逻辑重构计划.md b/.trae/documents/Webhook中继系统 - 核心逻辑重构计划.md new file mode 100644 index 0000000..650cf7a --- /dev/null +++ b/.trae/documents/Webhook中继系统 - 核心逻辑重构计划.md @@ -0,0 +1,53 @@ +# Webhook 中继系统 - 核心逻辑重构计划 + +## 1. 核心设计理念变更 +响应您的需求,我们将系统从“基于固定字段(remark)的路由”升级为**“基于端点的自定义规则引擎”**。 + +- **以端点为中心**:每个接收端点(Endpoint)拥有独立的配置页面,绑定属于自己的处理流程。 +- **通用规则引擎**:不再硬编码检测 `remark` 或 `event_define_no`。 + - **变量自定义**:您可以指定检测 JSON 中的任意字段(如 `body.status` 或 `data.order_id`)。 + - **条件匹配**:当指定字段的值等于设定值时,触发动作。 +- **统一动作(Action)**:将“转发”和“通知”统一为规则命中的“动作”。 + - 动作 A:转发给目标 X(原 Target)。 + - 动作 B:使用模板 Y 发送给渠道 Z(原 Notification)。 + +## 2. 数据库模型重构 (`app/db.py`) +我们需要调整表结构以支持这种灵活关系(建议重置数据库): + +1. **保留**:`WebhookEndpoint`, `Target`, `NotificationChannel`, `RequestLog`, `DeliveryLog`。 +2. **移除**:`RemarkRule`, 旧的 `EventTemplate` 关联逻辑。 +3. **新增/修改**: + * **`MessageTemplate`** (原 EventTemplate):仅存储模板文本,不再绑定特定事件ID,作为纯资源库。 + * **`ProcessingRule`** (处理规则):绑定到 `WebhookEndpoint`。 + * 字段:`match_field` (匹配键,如 `trans_order_info.remark`), `match_value` (匹配值)。 + * **`RuleAction`** (规则动作):绑定到 `ProcessingRule`。 + * 字段:`action_type` (forward/notify), `target_id` (关联Target), `channel_id` (关联Channel), `template_id` (关联Template)。 + +## 3. 业务逻辑升级 (`app/services/engine.py`) +创建新的规则引擎服务: +- **动态取值**:支持点号索引(如 `data.user.id`)从深层 JSON 中提取变量。 +- **流程执行**: + 1. 接收 Webhook -> 查找 Endpoint。 + 2. 遍历 Endpoint 下的所有 `ProcessingRule`。 + 3. 提取 `match_field` 对应的值与 `match_value` 比对。 + 4. 命中则执行该规则下的所有 `RuleAction`(并行执行转发或通知)。 + +## 4. Admin UI 交互大改版 +- **资源库模式**:`目标管理`、`通知渠道`、`消息模板` 变为纯粹的基础资源维护页面。 +- **端点编排页**(核心): + - 点击某个端点,进入**“流程配置”**详情页。 + - **可视化规则编辑器**: + - "当 `[输入框: 字段路径]` 等于 `[输入框: 值]` 时:" + - ➕ 添加动作:[转发] -> 选择目标。 + - ➕ 添加动作:[通知] -> 选择渠道 + 选择模板。 +- **体验优化**:在同一个页面完成逻辑闭环,无需在不同菜单间跳转。 + +## 5. 实施步骤 +1. **重构 DB**:更新 `app/db.py` 模型定义。 +2. **实现引擎**:编写 `app/services/engine.py` 实现动态匹配与分发。 +3. **更新 API**:修改 `app/main.py` 调用新引擎。 +4. **重写 UI**: + - 改造 `admin.py` 路由。 + - 新增 `templates/admin/endpoint_detail.html` 作为核心配置页。 + - 简化其他资源页面。 +5. **迁移/重置**:删除旧 DB文件,重新构建运行。 diff --git a/.trae/documents/增强模板变量与编辑功能.md b/.trae/documents/增强模板变量与编辑功能.md new file mode 100644 index 0000000..8b4f879 --- /dev/null +++ b/.trae/documents/增强模板变量与编辑功能.md @@ -0,0 +1,30 @@ +# 系统功能增强计划 + +针对您提出的两个问题,我们将进行以下改进: + +## 1. 解决 "pay.wx_scaned" 等代码的显示问题 +**方案**:在配置“通知动作”时,允许您定义**自定义模板变量**。 +* **原理**: + * 当规则匹配到 `pay.wx_scaned` 时,您在动作中设置变量 `pay_type_name` = "微信支付"。 + * 当规则匹配到 `pay.ali_scaned` 时,您在动作中设置变量 `pay_type_name` = "支付宝"。 + * **模板写法**:`收到{pay_type_name}付款{trans_amt}元`。 +* 这样同一个模板可以根据不同的规则,显示不同的中文名称,完美解决代码转义问题。 + +## 2. 补全 "修改" 功能 +**方案**:为所有资源管理页面添加编辑功能。 +* **目标管理**:添加编辑按钮。 +* **通知渠道**:添加编辑按钮。 +* **消息模板**:添加编辑按钮。 +* **实现方式**:点击编辑弹出模态框(复用添加框),回显数据,保存时调用更新接口。 + +## 实施步骤 +1. **数据库升级**: + * `RuleAction` 表新增 `template_vars` 字段(JSON类型),用于存储自定义变量。 +2. **后端开发**: + * 更新 `RuleEngine`,在渲染模板前将 `template_vars` 合并到数据上下文中。 + * 在 `admin.py` 中新增 `update_target`, `update_channel`, `update_template` 等路由。 +3. **前端开发**: + * **端点详情页**:在添加/编辑动作时,增加“模板变量”输入框(JSON格式)。 + * **资源列表页**:增加“编辑”按钮及相应的 JS 逻辑来回显数据。 +4. **部署**: + * 由于涉及数据库字段变更,需要再次重置数据库。 diff --git a/.trae/documents/系统修复与全链路优化计划.md b/.trae/documents/系统修复与全链路优化计划.md new file mode 100644 index 0000000..23ba8a3 --- /dev/null +++ b/.trae/documents/系统修复与全链路优化计划.md @@ -0,0 +1,42 @@ +# 系统修复与全链路优化计划 + +针对“弹窗失效”的紧急Bug和您对系统的长远期望,我制定了以下全方位的优化方案: + +## 1. 紧急修复:UI 交互失效 (Priority 0) +**问题根源**:页面脚本(Script)在 Bootstrap 库加载之前执行了,导致 `bootstrap` 对象未定义。 +**修复方案**: +* 调整 `base.html` 结构,在底部引入 Bootstrap JS 之后新增 `{% block scripts %}` 插槽。 +* 将所有管理页面的 JS 代码移动到 `block scripts` 中,确保执行顺序正确。 + +## 2. 核心增强:规则引擎升级 (Priority 1) +目前的规则仅支持“等于”,无法满足复杂业务(如匹配特定前缀的订单号)。 +**优化方案**: +* **数据库变更**:`ProcessingRule` 表新增 `operator` 字段(默认为 `eq`)。 +* **支持操作符**: + * `eq` (等于) + * `neq` (不等于) + * `contains` (包含) + * `startswith` (以...开头) + * `regex` (正则表达式) +* **UI 适配**:在端点详情页的添加规则弹窗中,增加“操作符”下拉选择。 + +## 3. 运维增强:日志重试 (Replay) (Priority 2) +Webhook 系统最常见的问题是下游服务暂时不可用,导致消息丢失。 +**优化方案**: +* **后端**:新增 `/admin/logs/{id}/replay` 接口,读取历史日志的 `raw_body` 重新注入规则引擎。 +* **前端**:在“系统日志”列表的失败记录旁,添加“重试”按钮。 + +## 4. 体验优化:首页仪表盘 (Priority 3) +目前的首页直接跳转到列表,缺乏全局视野。 +**优化方案**: +* 重写 `/admin/` 首页,展示: + * 今日请求总量 / 成功率 + * 各端点 (Endpoint) 的流量占比 + * 最近 10 条失败记录(方便快速排查) + +## 实施路线图 +1. **UI 架构调整**:修复 `base.html` 及所有子页面(解决弹窗问题)。 +2. **数据模型迁移**:更新 DB,添加 `operator` 字段。 +3. **业务逻辑升级**:更新 `engine.py` 支持新操作符,实现重试逻辑。 +4. **界面功能补全**:更新前端页面适配新功能。 +5. **部署**:重置数据库并重启服务。 diff --git a/.trae/documents/规则引擎逻辑重构计划.md b/.trae/documents/规则引擎逻辑重构计划.md new file mode 100644 index 0000000..ef7281c --- /dev/null +++ b/.trae/documents/规则引擎逻辑重构计划.md @@ -0,0 +1,50 @@ +# 规则引擎逻辑重构计划:支持多级条件与流程编排 + +您说得非常对。目前的扁平化规则(单层 `if-match-then-action`)在处理“组合条件”时会导致规则数量爆炸,配置极其繁琐且难以维护。 + +**核心问题**:现在的结构是“单层匹配”,无法表达“先判断A,再判断B”的逻辑。 +**您的需求**: +1. **第一层(事件分类)**:先根据 `event_define_no` 判断业务类型(微信支付/退款/投诉),决定用哪个**模板**。 +2. **第二层(渠道分发)**:再根据 `remark` 判断归属(门店002/003),决定推给哪个**渠道**。 + +为此,我将对系统进行深度的逻辑重构,引入**“规则组(Rule Group)”**和**“条件链(Condition Chain)”**的概念,但为了保持用户界面的直观性,我设计了更符合您直觉的 **“流程树(Flow Tree)”** 方案。 + +## 方案设计:嵌套式规则引擎 + +### 1. 数据模型升级 (DB) +* **ProcessingRule 表变更**: + * 新增 `parent_rule_id` (Integer, nullable): 允许规则嵌套。子规则只有在父规则匹配成功后才会执行。 + * 此变更实现了“树状结构”。 + +### 2. 核心逻辑升级 (Engine) +* **递归匹配**: + * 引擎首先查找所有 `parent_id` 为空的**根规则**。 + * 如果根规则匹配成功(例如 `event_define_no == pay.wx_scaned`): + * 执行根规则挂载的动作(例如:设置模板变量 `pay_method="微信"`)。 + * **继续检查其子规则**。 + * 子规则(例如 `remark == imcgcd03`)匹配成功: + * 执行子规则挂载的动作(例如:推送到 `渠道003`)。 + * 动作执行时,会**继承**父规则上下文中设置的变量(如模板ID或变量)。 + +### 3. 用户界面重构 (UI) +* **端点详情页**: + * 不再是扁平列表,而是**树形展示**。 + * 在每个规则卡片内部,增加“**添加子规则**”按钮。 + * 视觉上通过缩进或连线表示层级关系。 + * 支持**拖拽排序**(优先级调整)。 + +## 实施步骤 +1. **数据库迁移**:为 `ProcessingRule` 添加 `parent_rule_id` 字段。 +2. **后端逻辑**:重写 `engine.py` 的 `process` 方法,改为递归处理。 +3. **前端交互**:重写 `endpoint_detail.html`,使用递归模板渲染规则树。 +4. **无损更新**:编写 SQL 脚本或逻辑,确保现有数据平滑迁移到新结构(现有规则都视为根规则)。 + +这样,您的配置流程将变为: +1. **根规则**:`event_define_no == pay.wx_scaned` + * 动作:设置变量 `pay_type="微信"`(**不需要指定渠道**) + * **子规则 A**:`remark == imcgcd02` + * 动作:推送到 `渠道002`(**复用父级的模板设置**) + * **子规则 B**:`remark == imcgcd03` + * 动作:推送到 `渠道003` + +这完美符合您的思维模型。 diff --git a/123.josn b/123.josn new file mode 100644 index 0000000..e69de29 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..007cca4 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.12-slim +WORKDIR /app +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 +COPY requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r /app/requirements.txt +COPY app /app/app +COPY config /app/config +COPY templates /app/templates +ENV WEBHOOK_CONFIG_PATH=/app/config/config.yml +EXPOSE 8080 +CMD ["uvicorn","app.main:app","--host","0.0.0.0","--port","8080"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..331410e --- /dev/null +++ b/README.md @@ -0,0 +1,118 @@ +## 项目简介 + +这是一个可配置的 Webhook 中继与通知系统,用于: +- 统一接收外部支付/业务系统的 Webhook 请求 +- 按规则将请求转发到内部多个目标系统 +- 根据事件与模板生成消息,推送到飞书、企业微信等 IM 渠道 + +核心特性: +- 通过 Web 管理后台 `/admin` 管理端点、规则树、动作、消息模板与通知渠道 +- 支持树形规则:按 `remark`、`event_define_no` 等字段组合匹配 +- 支持转发(forward)与通知(notify)两类动作 +- 消息模板支持变量与继承,可在父规则设置通用模板,在子规则覆盖变量 +- 所有请求与分发结果写入 SQLite 数据库,便于审计与排查 + +## 技术栈与目录结构 + +- 语言与框架:Python 3.12 + FastAPI +- 数据库:SQLite(可通过 `DB_PATH` 切换到其他 SQLAlchemy 支持的数据库) +- HTTP 客户端:httpx + +主要目录: +- `app/main.py`:FastAPI 入口,`/webhook/{namespace}` 与 `/health` +- `app/admin.py`:管理后台路由(端点、规则、动作、模板、渠道、日志) +- `app/db.py`:SQLAlchemy 模型与会话管理 +- `app/services/engine.py`:规则引擎,实现树形规则匹配与动作编排 +- `app/services/notify.py`:飞书与企业微信的实际发送封装 +- `templates/admin/`:管理后台的 HTML 模板 +- `config/data.db`:默认 SQLite 数据库文件(若不存在会自动创建) +- `docs/webhook-relay/`:6A 工作流下的对齐、设计、任务与流程文档 + +## 本地运行(开发环境) + +1. 安装依赖: + - 可选:创建虚拟环境 + - 执行:`pip install -r requirements.txt` + +2. 初始化数据库并启动服务: + - 进入项目根目录:`cd e:\2025Code\python\webhock` + - 启动:`uvicorn app.main:app --host 0.0.0.0 --port 8080 --reload` + +3. 访问入口: + - Webhook 接口:`POST http://localhost:8080/webhook/{namespace}` + - 健康检查:`GET http://localhost:8080/health` + - 管理后台:`GET http://localhost:8080/admin/endpoints` + +## Docker 部署 + +项目包含 `Dockerfile`,可直接构建镜像部署: + +1. 构建镜像: + - 在项目根目录执行 + `docker build -t webhook-relay .` + +2. 运行容器(基础示例): + - `docker run -d --name webhook-relay -p 8080:8080 webhook-relay` + + 默认配置: + - 工作目录:`/app` + - 配置与数据库目录:`/app/config` + - 默认数据库:`sqlite:///./config/data.db` + +3. 使用持久化存储: + - 在宿主机创建配置/数据目录,例如:`/srv/webhook-relay/config` + - 将初始配置文件与数据库(可选)放入该目录 + - 启动容器时挂载目录: + - `docker run -d --name webhook-relay -p 8080:8080 -v /srv/webhook-relay/config:/app/config webhook-relay` + +4. 自定义数据库: + - 通过环境变量 `DB_PATH` 覆盖默认数据库连接: + - 例如:`-e DB_PATH="sqlite:///./config/data.db"`(默认值) + - 或者:`-e DB_PATH="postgresql+psycopg2://user:pass@host:5432/dbname"`(需要自行安装驱动并调整镜像) + +## 管理后台与规则配置概览 + +管理后台主要页面: +- 端点管理(Endpoints):维护不同业务线/系统对应的 `namespace` +- 目标管理(Targets):配置转发目标的名称、URL 与超时时间 +- 渠道管理(Channels):配置飞书/企微等机器人 Webhook +- 模板管理(Templates):配置消息模板内容 +- 端点详情:以树形方式配置 ProcessingRule 与 RuleAction,支持: + - 根规则与子规则 + - 动作类型:转发或通知 + - 模板变量与模板继承 +- 日志页面:查看 RequestLog 与 DeliveryLog 记录 + +典型流程: +1. 在 “Targets” 中配置内部系统的 Webhook URL +2. 在 “Channels” 中配置飞书/企微机器人地址 +3. 在 “Templates” 中配置通用或专用消息模板 +4. 在 “Endpoints” 中创建业务端点,并在详情页中: + - 创建根规则,按 `remark` 或其他字段区分不同业务来源 + - 在子规则中根据 `event_define_no` 等字段区分事件类型 + - 为规则添加 Forward/Notify 动作,绑定目标、渠道与模板变量 + +## 推到 Gitea 与服务器部署建议 + +1. 在本地完成配置与验证后,将整个仓库推送到 Gitea: + - 初始化 git 仓库(如尚未初始化):`git init` + - 添加远程并推送到 Gitea 项目 + +2. 在服务器上通过 Gitea 拉取代码: + - 使用 Gitea 的 “克隆” 地址在服务器执行 `git clone` + +3. 在服务器上构建并运行 Docker 镜像: + - `docker build -t webhook-relay .` + - 准备持久化目录并挂载配置/数据库 + - `docker run -d --name webhook-relay -p 8080:8080 -v /srv/webhook-relay/config:/app/config webhook-relay` + +4. 如需进一步自动化(CI/CD): + - 可以在 Gitea 中配置 Actions 或 Webhook,在推送时自动触发服务器上的构建与重启逻辑 + - 具体流水线脚本可根据你的服务器环境与习惯单独设计 + +更详细的架构、流程与用例说明可参考: +- `docs/webhook-relay/ALIGNMENT_webhook-relay.md` +- `docs/webhook-relay/DESIGN_webhook-relay.md` +- `docs/webhook-relay/FLOWCHART.md` +- `docs/webhook-relay/ACCEPTANCE_webhook-relay.md` + diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/app/__init__.py @@ -0,0 +1 @@ + diff --git a/app/__pycache__/__init__.cpython-314.pyc b/app/__pycache__/__init__.cpython-314.pyc new file mode 100644 index 0000000..fcf4deb Binary files /dev/null and b/app/__pycache__/__init__.cpython-314.pyc differ diff --git a/app/__pycache__/db.cpython-314.pyc b/app/__pycache__/db.cpython-314.pyc new file mode 100644 index 0000000..e92909d Binary files /dev/null and b/app/__pycache__/db.cpython-314.pyc differ diff --git a/app/__pycache__/logging.cpython-314.pyc b/app/__pycache__/logging.cpython-314.pyc new file mode 100644 index 0000000..e2cba54 Binary files /dev/null and b/app/__pycache__/logging.cpython-314.pyc differ diff --git a/app/__pycache__/main.cpython-314.pyc b/app/__pycache__/main.cpython-314.pyc new file mode 100644 index 0000000..3cb78c9 Binary files /dev/null and b/app/__pycache__/main.cpython-314.pyc differ diff --git a/app/admin.py b/app/admin.py new file mode 100644 index 0000000..de45752 --- /dev/null +++ b/app/admin.py @@ -0,0 +1,485 @@ +from fastapi import APIRouter, Request, Form, Depends, HTTPException +from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse +from fastapi.templating import Jinja2Templates +from sqlalchemy.orm import Session, joinedload, aliased +from typing import List, Optional, Union +import json +from app.db import SessionLocal, Target, NotificationChannel, MessageTemplate, WebhookEndpoint, RequestLog, DeliveryLog, ProcessingRule, RuleAction +from app.services.stats import stats_service +from app.services.engine import engine + +router = APIRouter(prefix="/admin", tags=["admin"]) +templates = Jinja2Templates(directory="templates") + +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() + +async def global_stats_processor(request: Request): + return { + "system_stats": { + "uptime": stats_service.get_uptime(), + "today_count": stats_service.get_today_count(), + "latest_log": stats_service.get_latest_log_time() + } + } + +async def render(template_name: str, context: dict): + stats = await global_stats_processor(context["request"]) + context.update(stats) + return templates.TemplateResponse(template_name, context) + +@router.get("/", response_class=HTMLResponse) +async def admin_index(request: Request, db: Session = Depends(get_db)): + endpoints = db.query(WebhookEndpoint).filter(WebhookEndpoint.is_active == True).all() + return await render("admin/dashboard.html", {"request": request, "active_page": "dashboard", "endpoints": endpoints}) + +# --- Endpoints --- +@router.get("/endpoints", response_class=HTMLResponse) +async def list_endpoints(request: Request, db: Session = Depends(get_db)): + endpoints = db.query(WebhookEndpoint).order_by(WebhookEndpoint.created_at.desc()).all() + return await render("admin/endpoints.html", {"request": request, "endpoints": endpoints, "active_page": "endpoints"}) + +@router.post("/endpoints") +async def add_endpoint(namespace: str = Form(...), description: str = Form(None), db: Session = Depends(get_db)): + if not namespace.replace("-", "").replace("_", "").isalnum(): + pass + ep = WebhookEndpoint(namespace=namespace, description=description) + db.add(ep) + db.commit() + return RedirectResponse(url="/admin/endpoints", status_code=303) + +@router.post("/endpoints/toggle") +async def toggle_endpoint(id: int = Form(...), db: Session = Depends(get_db)): + ep = db.query(WebhookEndpoint).filter(WebhookEndpoint.id == id).first() + if ep: + ep.is_active = not ep.is_active + db.commit() + return RedirectResponse(url="/admin/endpoints", status_code=303) + +@router.post("/endpoints/delete") +async def delete_endpoint(id: int = Form(...), db: Session = Depends(get_db)): + db.query(WebhookEndpoint).filter(WebhookEndpoint.id == id).delete() + db.commit() + return RedirectResponse(url="/admin/endpoints", status_code=303) + +# --- Endpoint Details & Rules --- +@router.get("/endpoints/{id}", response_class=HTMLResponse) +async def endpoint_detail(id: int, request: Request, db: Session = Depends(get_db)): + ep = db.query(WebhookEndpoint).filter(WebhookEndpoint.id == id).first() + if not ep: + return RedirectResponse(url="/admin/endpoints") + + # Workaround: Fetch all rules for this endpoint and reconstruct tree in Python. + all_rules = db.query(ProcessingRule).options(joinedload(ProcessingRule.actions)).filter( + ProcessingRule.endpoint_id == id + ).order_by(ProcessingRule.priority.desc()).all() + + # Build tree manually + rule_map = {r.id: r for r in all_rules} + root_rules = [] + + # Initialize children list for each rule object (dynamically attached) + for r in all_rules: + r.child_rules = [] + + for r in all_rules: + if r.parent_rule_id: + parent = rule_map.get(r.parent_rule_id) + if parent: + parent.child_rules.append(r) + else: + root_rules.append(r) + + # Load resources for modals + targets = db.query(Target).all() + channels = db.query(NotificationChannel).all() + tmpls = db.query(MessageTemplate).all() + + return await render("admin/endpoint_detail.html", { + "request": request, + "ep": ep, + "root_rules": root_rules, + "targets": targets, + "channels": channels, + "templates": tmpls, + "active_page": "endpoints" + }) + +@router.post("/endpoints/{id}/rules") +async def add_rule( + id: int, + match_field: str = Form(...), + match_value: str = Form(...), + operator: str = Form("eq"), + parent_rule_id: Optional[Union[int, str]] = Form(None), + priority: int = Form(0), + db: Session = Depends(get_db) +): + # Handle empty string from form for optional int + final_parent_id = None + if parent_rule_id and str(parent_rule_id).strip(): + try: + final_parent_id = int(parent_rule_id) + except ValueError: + pass + + rule = ProcessingRule( + endpoint_id=id, + match_field=match_field, + match_value=match_value, + operator=operator, + parent_rule_id=final_parent_id, + priority=priority + ) + db.add(rule) + db.commit() + return RedirectResponse(url=f"/admin/endpoints/{id}", status_code=303) + +@router.post("/rules/delete") +async def delete_rule(id: int = Form(...), endpoint_id: int = Form(...), db: Session = Depends(get_db)): + db.query(ProcessingRule).filter(ProcessingRule.id == id).delete() + db.commit() + return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303) + +@router.post("/rules/update") +async def update_rule( + id: int = Form(...), + endpoint_id: int = Form(...), + match_field: str = Form(...), + match_value: str = Form(...), + operator: str = Form("eq"), + parent_rule_id: Optional[str] = Form(None), + priority: int = Form(0), + db: Session = Depends(get_db) +): + rule = db.query(ProcessingRule).filter(ProcessingRule.id == id).first() + if not rule: + raise HTTPException(status_code=404, detail="Rule not found") + rule.match_field = match_field + rule.match_value = match_value + rule.operator = operator + rule.priority = priority + if parent_rule_id is not None and str(parent_rule_id).strip() != "": + try: + rule.parent_rule_id = int(parent_rule_id) + except ValueError: + rule.parent_rule_id = None + else: + rule.parent_rule_id = None + db.commit() + return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303) + +@router.post("/actions/update") +async def update_action( + id: int = Form(...), + endpoint_id: int = Form(...), + action_type: str = Form(...), + target_id: Optional[str] = Form(None), + channel_id: Optional[str] = Form(None), + template_id: Optional[str] = Form(None), + template_vars_str: Optional[str] = Form(None), + db: Session = Depends(get_db) +): + action = db.query(RuleAction).filter(RuleAction.id == id).first() + if not action: + raise HTTPException(status_code=404, detail="Action not found") + t_vars = None + if template_vars_str and template_vars_str.strip(): + try: + t_vars = json.loads(template_vars_str) + except Exception: + t_vars = None + def clean_int(val): + if val and str(val).strip(): + try: + return int(val) + except ValueError: + return None + return None + action.action_type = action_type + action.target_id = clean_int(target_id) if action_type == 'forward' else None + action.channel_id = clean_int(channel_id) if action_type == 'notify' else None + action.template_id = clean_int(template_id) if action_type == 'notify' else None + action.template_vars = t_vars + db.commit() + return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303) + +def _duplicate_rule_tree(db: Session, src_rule: ProcessingRule, endpoint_id: int, new_parent_id: Optional[int]) -> int: + new_rule = ProcessingRule( + endpoint_id=endpoint_id, + match_field=src_rule.match_field, + operator=src_rule.operator, + match_value=src_rule.match_value, + priority=src_rule.priority, + parent_rule_id=new_parent_id + ) + db.add(new_rule) + db.commit() + db.refresh(new_rule) + for a in src_rule.actions: + db.add(RuleAction( + rule_id=new_rule.id, + action_type=a.action_type, + target_id=a.target_id if a.action_type == 'forward' else None, + channel_id=a.channel_id if a.action_type == 'notify' else None, + template_id=a.template_id if a.action_type == 'notify' else None, + template_vars=a.template_vars + )) + db.commit() + children = db.query(ProcessingRule).filter(ProcessingRule.parent_rule_id == src_rule.id).all() + for child in children: + _duplicate_rule_tree(db, child, endpoint_id, new_rule.id) + return new_rule.id + +@router.post("/rules/duplicate") +async def duplicate_rule( + rule_id: int = Form(...), + endpoint_id: int = Form(...), + parent_rule_id: Optional[str] = Form(None), + include_children: Optional[str] = Form("true"), + db: Session = Depends(get_db) +): + src = db.query(ProcessingRule).filter(ProcessingRule.id == rule_id).first() + if not src: + raise HTTPException(status_code=404, detail="Rule not found") + new_parent = None + if parent_rule_id and str(parent_rule_id).strip(): + try: + new_parent = int(parent_rule_id) + except ValueError: + new_parent = None + new_rule_id = _duplicate_rule_tree(db, src, endpoint_id, new_parent) if (include_children or include_children.lower() == "true") else None + if not new_rule_id: + new_rule = ProcessingRule( + endpoint_id=endpoint_id, + match_field=src.match_field, + operator=src.operator, + match_value=src.match_value, + priority=src.priority, + parent_rule_id=new_parent + ) + db.add(new_rule) + db.commit() + db.refresh(new_rule) + for a in src.actions: + db.add(RuleAction( + rule_id=new_rule.id, + action_type=a.action_type, + target_id=a.target_id if a.action_type == 'forward' else None, + channel_id=a.channel_id if a.action_type == 'notify' else None, + template_id=a.template_id if a.action_type == 'notify' else None, + template_vars=a.template_vars + )) + db.commit() + return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303) + +@router.post("/actions/duplicate") +async def duplicate_action(id: int = Form(...), endpoint_id: int = Form(...), db: Session = Depends(get_db)): + src = db.query(RuleAction).filter(RuleAction.id == id).first() + if not src: + raise HTTPException(status_code=404, detail="Action not found") + db.add(RuleAction( + rule_id=src.rule_id, + action_type=src.action_type, + target_id=src.target_id if src.action_type == 'forward' else None, + channel_id=src.channel_id if src.action_type == 'notify' else None, + template_id=src.template_id if src.action_type == 'notify' else None, + template_vars=src.template_vars + )) + db.commit() + return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303) +@router.post("/rules/{rule_id}/actions") +async def add_action( + rule_id: int, + endpoint_id: int = Form(...), + action_type: str = Form(...), + target_id: Optional[Union[int, str]] = Form(None), + channel_id: Optional[Union[int, str]] = Form(None), + template_id: Optional[Union[int, str]] = Form(None), + template_vars_str: Optional[str] = Form(None), + db: Session = Depends(get_db) +): + t_vars = None + if template_vars_str and template_vars_str.strip(): + try: + t_vars = json.loads(template_vars_str) + except Exception: + pass + + # Helper to clean int params + def clean_int(val): + if val and str(val).strip(): + try: + return int(val) + except ValueError: + return None + return None + + action = RuleAction( + rule_id=rule_id, + action_type=action_type, + target_id=clean_int(target_id) if action_type == 'forward' else None, + channel_id=clean_int(channel_id) if action_type == 'notify' else None, + template_id=clean_int(template_id) if action_type == 'notify' else None, + template_vars=t_vars + ) + db.add(action) + db.commit() + return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303) + +@router.post("/actions/delete") +async def delete_action(id: int = Form(...), endpoint_id: int = Form(...), db: Session = Depends(get_db)): + db.query(RuleAction).filter(RuleAction.id == id).delete() + db.commit() + return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303) + + +# --- Targets --- +@router.get("/targets", response_class=HTMLResponse) +async def list_targets(request: Request, db: Session = Depends(get_db)): + targets = db.query(Target).all() + return await render("admin/targets.html", {"request": request, "targets": targets, "active_page": "targets"}) + +@router.post("/targets") +async def add_target(name: str = Form(...), url: str = Form(...), timeout_ms: int = Form(5000), db: Session = Depends(get_db)): + t = Target(name=name, url=url, timeout_ms=timeout_ms) + db.add(t) + db.commit() + return RedirectResponse(url="/admin/targets", status_code=303) + +@router.post("/targets/update") +async def update_target(id: int = Form(...), name: str = Form(...), url: str = Form(...), timeout_ms: int = Form(5000), db: Session = Depends(get_db)): + t = db.query(Target).filter(Target.id == id).first() + if t: + t.name = name + t.url = url + t.timeout_ms = timeout_ms + db.commit() + return RedirectResponse(url="/admin/targets", status_code=303) + +@router.post("/targets/delete") +async def delete_target(id: int = Form(...), db: Session = Depends(get_db)): + db.query(Target).filter(Target.id == id).delete() + db.commit() + return RedirectResponse(url="/admin/targets", status_code=303) + +# --- Channels --- +@router.get("/channels", response_class=HTMLResponse) +async def list_channels(request: Request, db: Session = Depends(get_db)): + channels = db.query(NotificationChannel).all() + return await render("admin/channels.html", {"request": request, "channels": channels, "active_page": "channels"}) + +@router.post("/channels") +async def add_channel(name: str = Form(...), channel_type: str = Form(...), webhook_url: str = Form(...), db: Session = Depends(get_db)): + c = NotificationChannel(name=name, channel_type=channel_type, webhook_url=webhook_url) + db.add(c) + db.commit() + return RedirectResponse(url="/admin/channels", status_code=303) + +@router.post("/channels/update") +async def update_channel(id: int = Form(...), name: str = Form(...), channel_type: str = Form(...), webhook_url: str = Form(...), db: Session = Depends(get_db)): + c = db.query(NotificationChannel).filter(NotificationChannel.id == id).first() + if c: + c.name = name + c.channel_type = channel_type + c.webhook_url = webhook_url + db.commit() + return RedirectResponse(url="/admin/channels", status_code=303) + +@router.post("/channels/delete") +async def delete_channel(id: int = Form(...), db: Session = Depends(get_db)): + db.query(NotificationChannel).filter(NotificationChannel.id == id).delete() + db.commit() + return RedirectResponse(url="/admin/channels", status_code=303) + +# --- Templates --- +@router.get("/templates", response_class=HTMLResponse) +async def list_templates(request: Request, db: Session = Depends(get_db)): + tmpls = db.query(MessageTemplate).all() + return await render("admin/templates.html", {"request": request, "templates": tmpls, "active_page": "templates"}) + +@router.post("/templates") +async def add_template(name: str = Form(...), template_content: str = Form(...), db: Session = Depends(get_db)): + t = MessageTemplate(name=name, template_content=template_content) + db.add(t) + db.commit() + return RedirectResponse(url="/admin/templates", status_code=303) + +@router.post("/templates/update") +async def update_template(id: int = Form(...), name: str = Form(...), template_content: str = Form(...), db: Session = Depends(get_db)): + t = db.query(MessageTemplate).filter(MessageTemplate.id == id).first() + if t: + t.name = name + t.template_content = template_content + db.commit() + return RedirectResponse(url="/admin/templates", status_code=303) + +@router.post("/templates/delete") +async def delete_template(id: int = Form(...), db: Session = Depends(get_db)): + db.query(MessageTemplate).filter(MessageTemplate.id == id).delete() + db.commit() + return RedirectResponse(url="/admin/templates", status_code=303) + +# --- Logs --- +@router.get("/logs", response_class=HTMLResponse) +async def list_logs(request: Request, db: Session = Depends(get_db)): + logs = db.query(RequestLog).options(joinedload(RequestLog.delivery_logs))\ + .order_by(RequestLog.received_at.desc()).limit(100).all() + return await render("admin/logs.html", {"request": request, "logs": logs, "active_page": "logs"}) + +@router.post("/logs/clear") +async def clear_logs(db: Session = Depends(get_db)): + db.query(DeliveryLog).delete() + db.query(RequestLog).delete() + db.commit() + return RedirectResponse(url="/admin/logs", status_code=303) + +@router.post("/logs/{id}/replay") +async def replay_log(id: int, db: Session = Depends(get_db)): + log = db.query(RequestLog).filter(RequestLog.id == id).first() + if not log: + return JSONResponse({"error": "Log not found"}, status_code=404) + + # Find endpoint + endpoint = db.query(WebhookEndpoint).filter(WebhookEndpoint.namespace == log.namespace).first() + if not endpoint or not endpoint.is_active: + return JSONResponse({"error": "Endpoint inactive or missing"}, status_code=400) + + # Re-process + routed, notified = await engine.process(endpoint.id, log.raw_body) + + new_log = RequestLog( + namespace=log.namespace, + remark=log.remark, + event_no=log.event_no, + raw_body=log.raw_body, + status="replay" + ) + db.add(new_log) + db.commit() + db.refresh(new_log) + + for r in routed: + db.add(DeliveryLog( + request_id=new_log.id, + target_name=r.get("target"), + type="relay", + status="success" if r.get("ok") else "failed", + response_summary=str(r.get("error") or "OK") + )) + + for n in notified: + db.add(DeliveryLog( + request_id=new_log.id, + target_name=n.get("channel"), + type="notify", + status="success" if n.get("ok") else "failed", + response_summary=str(n.get("error") or "OK") + )) + + db.commit() + return JSONResponse({"status": "ok", "new_log_id": new_log.id}) diff --git a/app/db.py b/app/db.py new file mode 100644 index 0000000..914fa25 --- /dev/null +++ b/app/db.py @@ -0,0 +1,102 @@ +from sqlalchemy import create_engine, Column, Integer, String, JSON, Boolean, Table, ForeignKey, DateTime, Text +from sqlalchemy.orm import declarative_base, sessionmaker, relationship, backref +import os +from datetime import datetime + +Base = declarative_base() + +class Target(Base): + __tablename__ = 'targets' + id = Column(Integer, primary_key=True) + name = Column(String, unique=True, index=True) + url = Column(String) + timeout_ms = Column(Integer, default=5000) + +class NotificationChannel(Base): + __tablename__ = 'notification_channels' + id = Column(Integer, primary_key=True) + name = Column(String, unique=True) + channel_type = Column(String) # feishu, wecom + webhook_url = Column(String) + +class MessageTemplate(Base): + __tablename__ = 'message_templates' + id = Column(Integer, primary_key=True) + name = Column(String, unique=True) # 方便识别,如 "收款成功通知" + template_content = Column(Text) # "收到{amt}元" + +class WebhookEndpoint(Base): + __tablename__ = 'webhook_endpoints' + id = Column(Integer, primary_key=True) + namespace = Column(String, unique=True, index=True) + description = Column(String, nullable=True) + is_active = Column(Boolean, default=True) + created_at = Column(DateTime, default=datetime.utcnow) + rules = relationship("ProcessingRule", back_populates="endpoint", cascade="all, delete-orphan") + +class ProcessingRule(Base): + __tablename__ = 'processing_rules' + id = Column(Integer, primary_key=True) + endpoint_id = Column(Integer, ForeignKey('webhook_endpoints.id')) + endpoint = relationship("WebhookEndpoint", back_populates="rules") + + # Tree structure support + parent_rule_id = Column(Integer, ForeignKey('processing_rules.id'), nullable=True) + children = relationship("ProcessingRule", backref=backref('parent', remote_side=[id]), cascade="all, delete-orphan") + priority = Column(Integer, default=0) # Higher executes first (if we want ordering) + + match_field = Column(String) # e.g. "trans_order_info.remark" or "event_define_no" + operator = Column(String, default="eq") # eq, neq, contains, startswith, regex + match_value = Column(String) # e.g. "imcgcd03" or "pay.success" + + actions = relationship("RuleAction", back_populates="rule", cascade="all, delete-orphan") + +class RuleAction(Base): + __tablename__ = 'rule_actions' + id = Column(Integer, primary_key=True) + rule_id = Column(Integer, ForeignKey('processing_rules.id')) + rule = relationship("ProcessingRule", back_populates="actions") + + action_type = Column(String) # "forward" or "notify" + + # Forward params + target_id = Column(Integer, ForeignKey('targets.id'), nullable=True) + target = relationship("Target") + + # Notify params + channel_id = Column(Integer, ForeignKey('notification_channels.id'), nullable=True) + channel = relationship("NotificationChannel") + template_id = Column(Integer, ForeignKey('message_templates.id'), nullable=True) + template = relationship("MessageTemplate") + + # Extra params for templating (e.g. {"pay_method": "微信"}) + template_vars = Column(JSON, nullable=True) + +class RequestLog(Base): + __tablename__ = 'request_logs' + id = Column(Integer, primary_key=True) + namespace = Column(String, index=True) + remark = Column(String, nullable=True) # 保留用于快速筛选,可选 + event_no = Column(String, nullable=True) # 保留用于快速筛选,可选 + raw_body = Column(JSON) + received_at = Column(DateTime, default=datetime.utcnow) + status = Column(String) # success, error + delivery_logs = relationship("DeliveryLog", back_populates="request_log", cascade="all, delete-orphan") + +class DeliveryLog(Base): + __tablename__ = 'delivery_logs' + id = Column(Integer, primary_key=True) + request_id = Column(Integer, ForeignKey('request_logs.id')) + target_name = Column(String) + type = Column(String) # relay, notify + status = Column(String) # success, failed + response_summary = Column(Text, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow) + request_log = relationship("RequestLog", back_populates="delivery_logs") + +DB_PATH = os.getenv("DB_PATH", "sqlite:///./config/data.db") +engine = create_engine(DB_PATH, connect_args={"check_same_thread": False}) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +def init_db(): + Base.metadata.create_all(bind=engine) diff --git a/app/logging.py b/app/logging.py new file mode 100644 index 0000000..078bbbf --- /dev/null +++ b/app/logging.py @@ -0,0 +1,15 @@ +import logging +from pythonjsonlogger import jsonlogger +import os + +def get_logger(name: str) -> logging.Logger: + logger = logging.getLogger(name) + level = os.getenv("LOG_LEVEL", "INFO").upper() + logger.setLevel(level) + if not logger.handlers: + handler = logging.StreamHandler() + formatter = jsonlogger.JsonFormatter("%(asctime)s %(levelname)s %(name)s %(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.propagate = False + return logger diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..2bca5dc --- /dev/null +++ b/app/main.py @@ -0,0 +1,102 @@ +import asyncio +from fastapi import FastAPI, Request, BackgroundTasks +from fastapi.responses import JSONResponse +from app.logging import get_logger +from app.admin import router as admin_router +from app.db import SessionLocal, WebhookEndpoint, RequestLog, DeliveryLog, init_db +from app.services.engine import engine +from contextlib import asynccontextmanager + +logger = get_logger("app") + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup: Initialize DB + logger.info("Initializing database...") + init_db() + yield + # Shutdown logic if any + +app = FastAPI(lifespan=lifespan) +app.include_router(admin_router) + +def save_logs(namespace: str, payload_dict: dict, routed: list, notified: list): + try: + db = SessionLocal() + # Create RequestLog + req_log = RequestLog( + namespace=namespace, + remark=str(payload_dict.get("remark", "")), + event_no=str(payload_dict.get("event_define_no", "")), + raw_body=payload_dict, + status="success" + ) + db.add(req_log) + db.commit() + db.refresh(req_log) + + # Create DeliveryLogs + for r in routed: + db.add(DeliveryLog( + request_id=req_log.id, + target_name=r.get("target"), + type="relay", + status="success" if r.get("ok") else "failed", + response_summary=str(r.get("error") or "OK") + )) + + for n in notified: + db.add(DeliveryLog( + request_id=req_log.id, + target_name=n.get("channel"), + type="notify", + status="success" if n.get("ok") else "failed", + response_summary=str(n.get("error") or "OK") + )) + + db.commit() + db.close() + except Exception as e: + logger.error(f"Failed to save logs: {e}") + +@app.get("/health") +async def health(): + return {"status": "ok"} + +@app.post("/webhook/{namespace}") +async def webhook(namespace: str, request: Request, background_tasks: BackgroundTasks): + db = SessionLocal() + endpoint = db.query(WebhookEndpoint).filter(WebhookEndpoint.namespace == namespace).first() + + if not endpoint: + # Auto-create for convenience if needed, or reject. For now reject if not exists. + # Or better: log warning but don't process. + # But per requirements "add endpoint", so we expect it to exist. + db.close() + return JSONResponse({"error": "Endpoint not found"}, status_code=404) + + if not endpoint.is_active: + db.close() + return JSONResponse({"error": "Endpoint inactive"}, status_code=403) + + endpoint_id = endpoint.id + db.close() + + try: + body = await request.json() + except Exception: + return JSONResponse({"error": "Invalid JSON"}, status_code=400) + + # Use new engine + routed, notified = await engine.process(endpoint_id, body) + + # Async save logs + background_tasks.add_task(save_logs, namespace, body, routed, notified) + + result = { + "namespace": namespace, + "routed": routed, + "notified": notified + } + logger.info({"event": "webhook_processed", "namespace": namespace, "routed": len(routed), "notified": len(notified)}) + return JSONResponse(result) diff --git a/app/models.py b/app/models.py new file mode 100644 index 0000000..cbf67a7 --- /dev/null +++ b/app/models.py @@ -0,0 +1,32 @@ +from typing import Optional, Dict, Any +from pydantic import BaseModel + +class IncomingOrderInfo(BaseModel): + cash_resp_desc: Optional[str] = None + ref_amt: Optional[float] = None + +class IncomingPayload(BaseModel): + remark: Optional[str] = None + event_define_no: Optional[str] = None + trans_amt: Optional[float] = None + settlement_amt: Optional[float] = None + out_trans_id: Optional[str] = None + hf_seq_id: Optional[str] = None + namespace: Optional[str] = None + trans_order_info: Optional[IncomingOrderInfo] = None + extra: Optional[Dict[str, Any]] = None + + def idempotent_key(self) -> Optional[str]: + return self.out_trans_id or self.hf_seq_id + + def cash_resp_desc(self) -> str: + v = None + if self.trans_order_info: + v = self.trans_order_info.cash_resp_desc + return v or "" + + def actual_ref_amt(self) -> Optional[float]: + extra_val = None + if self.extra and isinstance(self.extra, dict): + extra_val = self.extra.get("actual_ref_amt") + return extra_val or (self.trans_order_info.ref_amt if self.trans_order_info and self.trans_order_info.ref_amt is not None else self.settlement_amt) diff --git a/app/services/engine.py b/app/services/engine.py new file mode 100644 index 0000000..32d7669 --- /dev/null +++ b/app/services/engine.py @@ -0,0 +1,228 @@ +from typing import Any, Dict, List, Optional +import asyncio +import re +from app.db import SessionLocal, ProcessingRule, RuleAction, Target, NotificationChannel, MessageTemplate +from app.logging import get_logger + +logger = get_logger("engine") + +class RuleEngine: + def __init__(self): + pass + + def get_value_by_path(self, payload: Dict[str, Any], path: str) -> Optional[str]: + try: + keys = path.split('.') + value = payload + for key in keys: + if isinstance(value, dict): + value = value.get(key) + else: + return None + return str(value) if value is not None else None + except Exception: + return None + + def check_condition(self, actual_val: str, operator: str, match_val: str) -> bool: + if actual_val is None: + return False + + operator = operator or 'eq' + + if operator == 'eq': + return actual_val == match_val + elif operator == 'neq': + return actual_val != match_val + elif operator == 'contains': + return match_val in actual_val + elif operator == 'startswith': + return actual_val.startswith(match_val) + elif operator == 'regex': + try: + return re.search(match_val, actual_val) is not None + except: + return False + return False + + async def process(self, endpoint_id: int, payload: Dict[str, Any]): + db = SessionLocal() + tasks = [] + + try: + # Recursive processing function + # context stores accumulated template_vars AND active_template from parent rules + # context = { "vars": {...}, "template_content": "..." } + def process_rules(rules: List[ProcessingRule], context: Dict): + for rule in rules: + actual_val = self.get_value_by_path(payload, rule.match_field) + + if self.check_condition(actual_val, rule.operator, rule.match_value): + logger.info({"event": "rule_matched", "rule_id": rule.id, "match_field": rule.match_field}) + + # Prepare context for this level + # We use shallow copy for dict structure, but deep copy for internal vars is not strictly needed + # as long as we don't mutate active_template in place (it's a string). + current_context = { + "vars": context.get("vars", {}).copy(), + "template_content": context.get("template_content") + } + + # 1. First Pass: Collect Vars and Templates from all actions + # This allows a parent rule to set a template even if it doesn't send a notification itself + for action in rule.actions: + if action.template_vars: + current_context["vars"].update(action.template_vars) + + # If action has a template, it updates the current context's template + # This template will be used by subsequent actions in this rule OR children + if action.template: + current_context["template_content"] = action.template.template_content + + # 2. Second Pass: Execute Actions + for action in rule.actions: + if action.action_type == 'forward' and action.target: + t_dict = {"name": action.target.name, "url": action.target.url, "timeout_ms": action.target.timeout_ms} + tasks.append(self._exec_forward(t_dict, payload)) + + elif action.action_type == 'notify': + # Check if we have a valid channel + if action.channel: + # Determine template to use: Action's own template > Inherited template + template_content = None + if action.template: + template_content = action.template.template_content + else: + template_content = current_context.get("template_content") + + if template_content: + try: + # Flatten payload + merge current context vars + render_context = self._flatten_payload(payload) + render_context.update(current_context["vars"]) + + msg = template_content.format(**render_context) + + c_dict = {"channel": action.channel.channel_type, "url": action.channel.webhook_url} + tasks.append(self._exec_notify(c_dict, msg)) + except Exception as e: + logger.exception(f"Template render failed for action {action.id}: {e}") + tasks.append(self._return_error("notify", action.channel.name, str(e))) + else: + # Channel exists but no template found anywhere + logger.warning(f"Action {action.id} has channel but no template (own or inherited). Skipping.") + + # 3. Process children (DFS) + if rule.children: + process_rules(rule.children, current_context) + + # Start with root rules (parent_rule_id is NULL) + root_rules = db.query(ProcessingRule).filter( + ProcessingRule.endpoint_id == endpoint_id, + ProcessingRule.parent_rule_id == None + ).order_by(ProcessingRule.priority.desc()).all() + + process_rules(root_rules, {"vars": {}, "template_content": None}) + + # Wait for all actions + results = await asyncio.gather(*tasks) if tasks else [] + + # Aggregate results + routed_results = [] + notified_results = [] + for res in results: + if res['type'] == 'forward': + routed_results.append(res) + else: + notified_results.append(res) + + return routed_results, notified_results + + finally: + db.close() + + def _flatten_payload(self, y: dict) -> dict: + out = {} + + # Helper class to allow attribute access on dictionaries within templates + class AttrDict(dict): + def __getattr__(self, key): + if key in self: + v = self[key] + if isinstance(v, dict): + return AttrDict(v) + return v + # Return empty string or None to avoid AttributeError in templates + return "" + + def flatten(x, name=''): + if isinstance(x, dict): + for a in x: + flatten(x[a], name + a + '_') + if name == '': + # Wrap top-level nested dicts so {a.b} works in templates + out[a] = AttrDict(x[a]) if isinstance(x[a], dict) else x[a] + else: + if name: + out[name[:-1]] = x + + flatten(y) + + # Fallback aliases for common fields referenced by templates + # cash_resp_desc: prefer nested trans_order_info.cash_resp_desc + try: + if 'cash_resp_desc' not in out: + toi = y.get('trans_order_info') or {} + out['cash_resp_desc'] = (toi.get('cash_resp_desc') or "") + except Exception: + out['cash_resp_desc'] = "" + + # actual_ref_amt: extra.actual_ref_amt > trans_order_info.ref_amt > settlement_amt + try: + if 'actual_ref_amt' not in out: + extra = y.get('extra') or {} + toi = y.get('trans_order_info') or {} + val = extra.get('actual_ref_amt') + if val is None: + val = toi.get('ref_amt') + if val is None: + val = y.get('settlement_amt') + out['actual_ref_amt'] = val + except Exception: + out['actual_ref_amt'] = y.get('settlement_amt') + + # Ensure any dict values in context are AttrDict to support dot-notation + for k, v in list(out.items()): + if isinstance(v, dict) and not isinstance(v, AttrDict): + out[k] = AttrDict(v) + + return out + + async def _exec_forward(self, target: dict, payload: dict): + try: + import httpx + async with httpx.AsyncClient() as client: + resp = await client.post(target['url'], json=payload, timeout=target.get('timeout_ms', 5000)/1000) + resp.raise_for_status() + return {"type": "forward", "target": target['name'], "ok": True} + except Exception as e: + return {"type": "forward", "target": target['name'], "ok": False, "error": str(e)} + + async def _exec_notify(self, channel: dict, msg: str): + try: + from app.services.notify import send_feishu, send_wecom + channel_type = channel.get('channel') + url = channel.get('url') + + if channel_type == 'feishu': + await send_feishu(url, msg) + elif channel_type == 'wecom': + await send_wecom(url, msg) + return {"type": "notify", "channel": channel_type, "ok": True} + except Exception as e: + logger.exception(f"Notification failed for {channel.get('channel')}: {e}") + return {"type": "notify", "channel": channel.get('channel'), "ok": False, "error": str(e)} + + async def _return_error(self, type_str, name, err): + return {"type": type_str, "target" if type_str == 'forward' else "channel": name, "ok": False, "error": err} + +engine = RuleEngine() diff --git a/app/services/notify.py b/app/services/notify.py new file mode 100644 index 0000000..5f7893a --- /dev/null +++ b/app/services/notify.py @@ -0,0 +1,13 @@ +import httpx + +async def send_feishu(url: str, text: str): + async with httpx.AsyncClient(timeout=10) as client: + body = {"msg_type": "text", "content": {"text": text}} + resp = await client.post(url, json=body) + resp.raise_for_status() + +async def send_wecom(url: str, text: str): + async with httpx.AsyncClient(timeout=10) as client: + body = {"msgtype": "text", "text": {"content": text}} + resp = await client.post(url, json=body) + resp.raise_for_status() diff --git a/app/services/stats.py b/app/services/stats.py new file mode 100644 index 0000000..73675f2 --- /dev/null +++ b/app/services/stats.py @@ -0,0 +1,45 @@ +from datetime import datetime, timedelta +from app.db import SessionLocal, RequestLog +from sqlalchemy import func + +# 全局变量,记录启动时间 +START_TIME = datetime.utcnow() + +class SystemStats: + def __init__(self): + pass + + def get_uptime(self) -> str: + delta = datetime.utcnow() - START_TIME + days = delta.days + hours, remainder = divmod(delta.seconds, 3600) + minutes, seconds = divmod(remainder, 60) + if days > 0: + return f"{days}天 {hours}小时" + elif hours > 0: + return f"{hours}小时 {minutes}分" + else: + return f"{minutes}分 {seconds}秒" + + def get_today_count(self) -> int: + session = SessionLocal() + try: + today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) + count = session.query(func.count(RequestLog.id)).filter(RequestLog.received_at >= today_start).scalar() + return count or 0 + finally: + session.close() + + def get_latest_log_time(self) -> str: + session = SessionLocal() + try: + log = session.query(RequestLog).order_by(RequestLog.received_at.desc()).first() + if log: + # 简单转为本地时间显示(+8) + dt = log.received_at + timedelta(hours=8) + return dt.strftime("%H:%M:%S") + return "无" + finally: + session.close() + +stats_service = SystemStats() diff --git a/config/config.yml b/config/config.yml new file mode 100644 index 0000000..650fd60 --- /dev/null +++ b/config/config.yml @@ -0,0 +1,45 @@ +server: + port: 8080 + log_level: info +routing: + remark_map: + imcgcd03: ["target_3"] + imcgcd02: ["target_2"] + default_targets: [] +targets: + - name: target_1 + url: "https://httpbin.org/post" + timeout_ms: 5000 + - name: target_2 + url: "https://httpbin.org/post" + timeout_ms: 5000 + - name: target_3 + url: "https://httpbin.org/post" + timeout_ms: 5000 +notifications: + event_map: + wechat.complaint: + template: "⚠️请注意,您有新的微信投诉,请注意查看" + channels: ["feishu","wecom"] + refund.standard: + template: "退款成功,退款金额:{actual_ref_amt}" + channels: ["feishu","wecom"] + pay.ali_scaned: + template: "支付宝收款{trans_amt}元,状态:{cash_resp_desc}" + channels: ["feishu","wecom"] + pay.wx_scaned: + template: "微信收款{trans_amt}元,状态:{cash_resp_desc}" + channels: ["feishu","wecom"] +channels: + feishu: + webhooks: + - "https://open.feishu.cn/your-bot-webhook" + wecom: + webhooks: + - "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your-key" +retry: + max_attempts: 3 + initial_delay_ms: 500 + max_delay_ms: 3000 +security: + outbound_signature: false diff --git a/config/data.db b/config/data.db new file mode 100644 index 0000000..7abdafa Binary files /dev/null and b/config/data.db differ diff --git a/docs/webhook-relay/ACCEPTANCE_webhook-relay.md b/docs/webhook-relay/ACCEPTANCE_webhook-relay.md new file mode 100644 index 0000000..6268992 --- /dev/null +++ b/docs/webhook-relay/ACCEPTANCE_webhook-relay.md @@ -0,0 +1,21 @@ +# 验收记录 + +## 用例1:示例JSON +- 步骤:POST /webhook/opps-webhook,负载为示例 +- 期望: + - 路由至 target_3 + - 生成文本包含商户名、支付方式、金额、状态与日期,例如:`【某商户】支付宝 收款 5.50 元,状态:成功,日期:20251220` + - 飞书与企业微信推送成功 + +## 用例2:remark未匹配 +- 步骤:remark为未知值 +- 期望:使用 default_targets 或仅通知 + +## 用例3:退款事件 +- 步骤:事件为 `refund.standard` +- 期望:从 `actual_ref_amt` 或 `ref_amt/settlement_amt` 渲染文本 + +## 用例4:目标不可达 +- 步骤:模拟目标URL不可达 +- 期望:记录失败并按策略重试 + diff --git a/docs/webhook-relay/ALIGNMENT_webhook-relay.md b/docs/webhook-relay/ALIGNMENT_webhook-relay.md new file mode 100644 index 0000000..fd5f585 --- /dev/null +++ b/docs/webhook-relay/ALIGNMENT_webhook-relay.md @@ -0,0 +1,48 @@ +# 任务名称 + +Webhook中继与通知系统(Docker化) + +## 背景与目标 +- 统一接收外部Webhook,按规则路由到目标端点,并生成通知消息推送至飞书/企业微信。 +- 基于配置可扩展,支持新增目标、remark映射与事件模板。 + +## 业务范围 +- 接收JSON负载,至少包含 `remark`、`event_define_no`、`trans_amt`、`trans_order_info.cash_resp_desc`、可选 `actual_ref_amt`。 +- 路由策略:按 `remark` 映射至一个或多个目标Webhook;若无匹配使用 `default_targets`。 +- 通知策略:按 `event_define_no` 模板生成文本,支持多渠道并发推送。 + +## 技术范围 +- Python 3.12、FastAPI、httpx +- Docker 容器化;SQLite 持久化,模型定义见 `app/db.py` +- 管理后台 `/admin` 维护端点、规则、动作、模板与通知渠道 +- 结构化日志;失败重试(在转发逻辑中可扩展);请求与分发记录持久化 + +## 输入与输出 +- 输入:`POST /webhook/{namespace}` JSON +- 输出:200 返回路由与通知结果结构 + +## 验收标准 +- 能正确解析示例 JSON 并生成消息,包含商户名、支付方式、金额、状态与日期,例如:`【某商户】支付宝 收款 5.50 元,状态:成功,日期:20251220` +- `remark=imcgcd03` 时转发至配置第3个目标;`remark=imcgcd02` 时至第2个目标 +- 同时推送至飞书与企业微信(若配置存在多个机器人则全部发送) +- 失败记录与重试可配置(次数与退避) + +## 部署与运行方式(概要) +- 运行模式: + - 作为独立 FastAPI 服务运行,暴露 `POST /webhook/{namespace}` 与 `/admin` 管理界面 + - 通过 Docker 镜像部署到服务器 +- 最低运行环境: + - Python 3.12 或兼容的 Docker 运行时 + - 对外开放的 HTTP 端口(默认 8080) +- 配置与数据: + - 数据库:默认 `sqlite:///./config/data.db`,可通过环境变量 `DB_PATH` 调整 + - 规则、模板、渠道均通过 Web 管理后台写入数据库,无需修改代码或 YAML + +## 边界与不做事项 +- 不实现目标端点的认证协议,默认匿名POST;可通过配置扩展签名 +- 不提供长期持久幂等存储,默认进程内短期去重 + +## 风险与约束 +- 目标Webhook不可达导致延迟;通过重试与超时保护 +- 通知渠道限流;并发可控与错误日志 + diff --git a/docs/webhook-relay/CONSENSUS_webhook-relay.md b/docs/webhook-relay/CONSENSUS_webhook-relay.md new file mode 100644 index 0000000..8b2f549 --- /dev/null +++ b/docs/webhook-relay/CONSENSUS_webhook-relay.md @@ -0,0 +1,25 @@ +# 共识与决策 + +## 技术选型 +- 语言与框架:Python + FastAPI +- 网络库:httpx +- 配置:YAML(PyYAML),环境变量 `WEBHOOK_CONFIG_PATH` +- 重试:简单指数退避 +- 容器:python:3.12-slim,入口 `uvicorn app.main:app --host 0.0.0.0 --port 8080` + +## 接口规范 +- `POST /webhook/{namespace}` 接收上游负载;`GET /health` 健康检查 + +## 配置规范 +- `targets`:目标Webhook注册表 +- `routing.remark_map`:remark到target名称列表映射 +- `notifications.event_map`:事件模板与渠道列表 +- `channels`:各渠道机器人Webhook列表 + +## 路由与通知规则 +- 路由按 `remark` 精确匹配,多目标并发转发 +- 通知按 `event_define_no` 模板渲染,支持多渠道并发推送 + +## 验收口径 +- 示例JSON生成消息与路由正确;返回结构包含每个目标与渠道的状态 + diff --git a/docs/webhook-relay/DESIGN_webhook-relay.md b/docs/webhook-relay/DESIGN_webhook-relay.md new file mode 100644 index 0000000..5de2434 --- /dev/null +++ b/docs/webhook-relay/DESIGN_webhook-relay.md @@ -0,0 +1,51 @@ +# 架构设计 + +## 总体架构 +```mermaid +flowchart LR +A[Incoming Webhook] --> B[FastAPI /webhook/{namespace}] +B --> C[Router] +C --> D[Targets Forwarder] +B --> E[Event Template] +E --> F[Channels Sender] +D --> G[External Webhook Targets] +F --> H[Feishu] +F --> I[WeCom] +``` + +## 模块分层 +- 接入层:`app/main.py`(FastAPI 入口与 Webhook 接收) +- 管理后台:`app/admin.py`(端点、规则、模板与渠道的增删改查) +- 领域模型与持久化:`app/db.py`, `app/models.py` +- 规则引擎:`app/services/engine.py`(树形规则 + 动作编排) +- 通知发送:`app/services/notify.py` +- 统计与日志:`app/services/stats.py`, `app/logging.py` + +## 关键接口 +- `engine.process(endpoint_id, payload) -> (routed: List, notified: List)` +- `_exec_forward(target, payload) -> Result`(内部使用) +- `_exec_notify(channel, msg) -> Result`(内部使用) +- 管理接口:`/admin/*` 用于维护端点、规则树、动作、模板与渠道 + +## 数据契约 +- 输入:示例 JSON 结构,关键字段存在于顶层与 `trans_order_info` +- 输出:`{"namespace": str, "routed": [...], "notified": [...]}`,其中: + - `routed`:每个转发目标的执行摘要(目标名、是否成功、失败原因) + - `notified`:每个通知渠道的执行摘要(渠道名、是否成功、失败原因) + +## 异常策略 +- 超时与网络错误重试;失败记录结构化日志 +- 非致命错误不影响其他目标或渠道发送 + +## 部署与运行(概要) +- 本地直接运行: + - 创建并激活虚拟环境(可选) + - 执行 `pip install -r requirements.txt` + - 启动服务:`uvicorn app.main:app --host 0.0.0.0 --port 8080` +- Docker 部署: + - 构建镜像:`docker build -t webhook-relay .` + - 运行容器示例: + - `docker run -d --name webhook-relay -p 8080:8080 webhook-relay` + - 默认使用 SQLite 数据库,路径由环境变量 `DB_PATH` 控制,默认值为 `sqlite:///./config/data.db` + - 如需持久化,可将宿主机目录挂载到容器内 `/app/config` 目录 + diff --git a/docs/webhook-relay/FLOWCHART.md b/docs/webhook-relay/FLOWCHART.md new file mode 100644 index 0000000..586e242 --- /dev/null +++ b/docs/webhook-relay/FLOWCHART.md @@ -0,0 +1,191 @@ +# Webhook 中继系统流程图 + +## 1. 系统宏观架构图 + +```mermaid +graph TB + subgraph External["外部世界"] + Sender["发送方 (支付宝/微信/业务系统)"] + User["管理员"] + TargetSys["目标系统 (ERP/BI)"] + NotifyApp["IM工具 (飞书/企微)"] + end + + subgraph WebhookRelay["Webhook中继服务 (Docker)"] + direction TB + + subgraph Interface["接入层"] + API["FastAPI (端口:8080)"] + AdminUI["Web管理后台"] + end + + subgraph Core["核心逻辑层"] + Parser["数据解析 (Pydantic)"] + Router["路由引擎 (Routing)"] + Notifier["通知引擎 (Notification)"] + Logger["日志审计 (Logging)"] + end + + subgraph Data["数据存储层"] + DB[(SQLite 数据库)] + Config["Config Loader (DB优先 + YAML回退)"] + end + end + + Sender -->|POST JSON| API + User -->|浏览器访问| AdminUI + AdminUI -->|CRUD配置| DB + + API --> Parser + Parser --> Router + Parser --> Notifier + + Router <-->|查询规则| Config + Notifier <-->|查询模板| Config + Config <-->|读取| DB + + Router -->|转发请求| TargetSys + Notifier -->|推送消息| NotifyApp + + API -->|异步写入| Logger + Logger -->|持久化| DB +``` + +## 2. 详细数据处理流程 + +此图展示了一个 Webhook 请求从进入系统到完成分发与记录的完整生命周期。 + +```mermaid +sequenceDiagram + participant Ext as 外部系统 + participant API as FastAPI入口 + participant DB as SQLite数据库 + participant Router as 路由服务 + participant Target as 目标系统 + participant Notifier as 通知服务 + participant Channel as IM渠道(飞书/企微) + participant Log as 日志服务 + + Note over Ext, API: 1. 请求接入 + Ext->>API: POST /webhook/{namespace} (Payload) + + activate API + API->>DB: 校验 Namespace 是否有效/启用 + alt Namespace 无效/禁用 + API-->>Ext: 403 Forbidden + else Namespace 有效 + API->>API: 解析 Payload (remark, event_no, trans_amt...) + + par 2. 并行处理 - 转发 (Relay) + API->>Router: 使用规则引擎匹配路由规则 + Router->>DB: 查询 ProcessingRule/RuleAction/Target + Router-->>API: 返回目标列表 [Target A, Target B] + + loop 对每个目标 + API->>Target: POST /target_url (异步 + 重试机制) + Target-->>API: 响应 (200 OK / 500 Error) + end + + and 3. 并行处理 - 通知 (Notify) + API->>Notifier: notify(event_no, payload) + Notifier->>DB: 查询 ProcessingRule/RuleAction/MessageTemplate/NotificationChannel + Notifier->>Notifier: 渲染模板 ("【{biz_name}】{pay_method_name} 收款 {trans_amt} 元,状态:{cash_resp_desc},日期:{trans_date}") + Notifier-->>API: 返回消息文本 & 渠道列表 + + loop 对每个渠道 + API->>Channel: POST Webhook (发送消息) + Channel-->>API: 响应结果 + end + end + + Note over API, Log: 4. 收尾工作 + API->>Log: BackgroundTask: 保存 RequestLog & DeliveryLog + Log->>DB: INSERT request_logs, delivery_logs + + API-->>Ext: 200 OK (包含 routed/notified 摘要) + end + deactivate API +``` + +## 3. 数据库实体关系图 (ERD) + +展示了用于支撑上述流程的数据库模型结构。 + +```mermaid +erDiagram + WebhookEndpoint { + int id PK + string namespace "URL路径标识" + string description + bool is_active + datetime created_at + } + + ProcessingRule { + int id PK + int endpoint_id FK + int parent_rule_id FK "父规则ID,可为空" + int priority "优先级,高优先级先匹配" + string match_field "如 remark / event_define_no" + string operator "eq/neq/contains/regex" + string match_value "匹配值" + } + + RuleAction { + int id PK + int rule_id FK + string action_type "forward/notify" + int target_id FK "转发目标,可空" + int channel_id FK "通知渠道,可空" + int template_id FK "消息模板,可空" + json template_vars "模板变量,键值对" + } + + Target { + int id PK + string name + string url + int timeout_ms + } + + NotificationChannel { + int id PK + string name + string channel_type "feishu/wecom" + string webhook_url + } + + MessageTemplate { + int id PK + string name "模板名称" + text template_content "模板内容" + } + + RequestLog { + int id PK + string namespace + string remark "来源标识" + string event_no "事件类型" + json raw_body "原始数据" + datetime received_at + string status "success/error" + } + + DeliveryLog { + int id PK + int request_id FK + string target_name + string type "relay/notify" + string status "success/failed" + text response_summary + datetime created_at + } + + WebhookEndpoint ||--|{ ProcessingRule : "拥有多条规则" + ProcessingRule ||--|{ ProcessingRule : "树形子规则" + ProcessingRule ||--|{ RuleAction : "每条规则包含多个动作" + RuleAction }o--|| Target : "转发到目标" + RuleAction }o--|| NotificationChannel : "推送到渠道" + RuleAction }o--|| MessageTemplate : "使用消息模板" + RequestLog ||--|{ DeliveryLog : "包含多条分发记录" +``` diff --git a/docs/webhook-relay/TASK_[webhook-relay].md b/docs/webhook-relay/TASK_[webhook-relay].md new file mode 100644 index 0000000..f395fab --- /dev/null +++ b/docs/webhook-relay/TASK_[webhook-relay].md @@ -0,0 +1,27 @@ +# 原子任务清单 + +## T1 配置加载 +- 输入:`config/config.yml` +- 输出:内存配置对象与查询API +- 验收:能返回目标、remark与事件模板 + +## T2 模型解析 +- 输入:请求JSON +- 输出:`IncomingPayload` 对象 +- 验收:示例JSON解析字段正确 + +## T3 路由与转发 +- 输入:`remark` 与目标列表 +- 输出:并发POST结果数组 +- 验收:`imcgcd03` 路由到 `target_3` + +## T4 通知消息 +- 输入:`event_define_no` 与负载 +- 输出:消息文本与渠道推送结果 +- 验收:`pay.ali_scaned` 生成正确文本并推送 + +## T5 接口与容器 +- 输入:应用代码 +- 输出:FastAPI端点与Dockerfile +- 验收:健康检查与示例数据POST通过 + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c8b674b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +fastapi==0.115.5 +uvicorn[standard]==0.32.0 +httpx==0.27.2 +pydantic==2.9.2 +PyYAML==6.0.2 +tenacity==9.0.0 +python-json-logger==2.0.7 +sqlalchemy==2.0.25 +jinja2==3.1.3 +python-multipart==0.0.9 diff --git a/samples/incoming.json b/samples/incoming.json new file mode 100644 index 0000000..8157bec --- /dev/null +++ b/samples/incoming.json @@ -0,0 +1 @@ +{"trans_date":"20251220","batch_id":"251220","bank_seq_id":"923913","trans_amt":"5.50","remark":"imcgcd03","event_define_no":"pay.ali_scaned","acct_id":"F03291269","is_div":"1","bank_order_no":"672025122022001416951449996798","fee_flag":2,"trans_order_info":{"maze_resp_code":"","fee_real_acct_id":"F03291269","agent_id":"6666000153839851","bank_seq_id":"923913","subsidy_stat":"I","acct_id":"F03291269","ref_cnt":0,"product_id":"HSK","bank_mer_id":"A1035134718870142962","id":2628973110,"icc_data":"","atu_sub_mer_id":"2088550839201829","trans_stat":"S","region_id":"TOP4_B","credit_type":"","version":2,"org_auth_no":"","cash_resp_desc":"成功","bagent_id":"6666000153839851","real_cust_id":"6666000153923913","req_seq_id":"T20251220171836S6666000153923913","term_div_coupon_type":1,"iss_inst_id":"","channel_finish_time":1766222307000,"channel_stat":"S","unconfirm_fee_amt":0.01,"real_gate_id":"Dx","db_unit":"4","sys_trace_audit_num":"","trans_date":"20251220","batch_id":"251220","credit_fee_amt":0.00,"bank_resp_code":"TRADE_SUCCESS","real_acct_id":"F03291269","channel_message":"TRADE_SUCCESS","settle_trans_stat":"","mypaytsf_discount":0.00,"unconfirm_amt":5.49,"org_huifu_seq_id":"","cash_resp_code":"000","double_limit_amt":0.00,"fee_split_type":"","fee_real_cust_id":"6666000153923913","maze_resp_desc":"","fee_amt":0.01,"creator":"","ord_amt":5.50,"acct_stat":"I","debit_fee_amt":0.00,"cash_req_date":"20251220171826","ref_num":"171826923913","out_trans_id":"672025122022001416951449996798","fee_acct_id":"F03291269","is_acct_div_param":0,"subsidy_ref_amt":0.00,"mer_name":"高新区益选便利店(个体工商户)","pay_scene":"02","time_expire":"20251220172836","modifier":"","channel_code":"00","mcc":"","close_trans_stat":"","ref_fee_amt":0.01,"trans_finish_time":1766222309000,"org_trans_date":"","bank_resp_desc":"TRADE_SUCCESS","pay_channel_id":"00001249","create_time":1766222306000,"pay_amt":5.50,"org_acct_id":"F03291269","hf_seq_id":"002900TOP4B251220171826P546ac136a7d00000","goods_desc":"1","is_route":"","is_delay_acct":0,"settle_amt":5.50,"ref_amt":5.50,"gate_id":"SPIN022","pay_channel":"A","huifu_id":"6666000153923913","subsidy_amt":0.00,"fee_huifu_id":"6666000153923913","maze_bg_seq_id":"","maze_bg_date":"","modify_time":1766222308000,"check_cash_flag":"I","remark":"imcgcd03","is_acct_div":1,"real_pay_type":"1013","sys_id":"6666000132082499","card_channel_type":"","is_deleted":0,"fee_flag":2,"cash_trans_id":"2025122024rg0396","pa_mer_id":"SSP001","pay_type":"MICROPAY","sn_code":"","channel_type":"U","source_region_id":"TOP4_B","trans_type":"1000","mer_ord_id":"T20251220171836S6666000153923913","ord_id":"202512201718260TOP4_BL2797202899","card_sign":"","fee_formula":"","fee_source":"'SERVER'","party_order_id":"03242512206230681909127","bank_mer_name":"高新区益选便利店(个体工商户)","cashier_version":"V2","un_scene_info":"","maze_pnr_dev_id":"","req_date":"20251220","route_region_id":"C24_A","fee_rec_type":1},"settlement_amt":"5.50","acct_split_bunch":{"acct_infos":[{"acct_id":"F03291269","huifu_id":"6666000153923913","div_amt":"5.50"}],"fee_acct_id":"F03291269","fee_huifu_id":"6666000153923913","fee_amt":"0.01"},"trans_type":"A_MICROPAY","mer_ord_id":"T20251220171836S6666000153923913","trans_stat":"S","end_time":"20251220171827","hf_seq_id":"002900TOP4B251220171826P546ac136a7d00000","ref_no":"171826923913","trans_time":"171826","alipay_response":{"coupon_fee":"0.00","buyer_logon_id":"182****2451","buyer_id":"2088902514116953","app_id":"","fund_bill_list":[{"amount":"5.50","fund_channel":"ALIPAYACCOUNT"}]},"fee_amount":"0.01","out_trans_id":"672025122022001416951449996798","party_order_id":"03242512206230681909127","is_delay_acct":"0","fee_formula_infos":[{"fee_formula":"AMT*0.0025","fee_type":"TRANS_FEE"}],"namespace":"opps-webhook","huifu_id":"6666000153923913","mer_name":"高新区益选便利店(个体工商户)"} diff --git a/scripts/verify_logic.py b/scripts/verify_logic.py new file mode 100644 index 0000000..795c8b5 --- /dev/null +++ b/scripts/verify_logic.py @@ -0,0 +1,22 @@ +import os +import json +from app.config import load_config +from app.models import IncomingPayload, IncomingOrderInfo +from app.services.relay import route_targets +from app.services.notify import render_message + +def main(): + cfg = load_config() + path = os.path.abspath(r"e:\2025Code\python\webhock\123.josn") + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + if isinstance(data.get("trans_order_info"), dict): + data["trans_order_info"] = IncomingOrderInfo(**data["trans_order_info"]) # type: ignore + payload = IncomingPayload(**data) + targets = route_targets(cfg, payload.remark or "") + msg = render_message(cfg, payload) + print("targets:", [t.get("name") for t in targets]) + print("message:", msg["text"]) + +if __name__ == "__main__": + main() diff --git a/scripts/verify_message_only.py b/scripts/verify_message_only.py new file mode 100644 index 0000000..daa140c --- /dev/null +++ b/scripts/verify_message_only.py @@ -0,0 +1,30 @@ +import json +import os + +def main(): + path = os.path.abspath(os.path.join(os.getcwd(), "samples", "incoming.json")) + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + remark = data.get("remark") + event = data.get("event_define_no") + trans_amt = data.get("trans_amt") + cash_resp_desc = "" + toi = data.get("trans_order_info") or {} + cash_resp_desc = toi.get("cash_resp_desc") or "" + if event == "pay.ali_scaned": + msg = f"支付宝收款{trans_amt}元,状态:{cash_resp_desc}" + elif event == "pay.wx_scaned": + msg = f"微信收款{trans_amt}元,状态:{cash_resp_desc}" + elif event == "wechat.complaint": + msg = "⚠️请注意,您有新的微信投诉,请注意查看" + elif event == "refund.standard": + actual_ref_amt = data.get("actual_ref_amt") or (toi.get("ref_amt") or data.get("settlement_amt")) + msg = f"退款成功,退款金额:{actual_ref_amt}" + else: + msg = "" + target = "target_3" if remark == "imcgcd03" else ("target_2" if remark == "imcgcd02" else "") + print("message:", msg) + print("route_target:", target) + +if __name__ == "__main__": + main() diff --git a/templates/admin/base.html b/templates/admin/base.html new file mode 100644 index 0000000..a79dbc7 --- /dev/null +++ b/templates/admin/base.html @@ -0,0 +1,51 @@ + + + + + + {% block title %}Webhook管理后台{% endblock %} + + + + +
+ + {% if system_stats %} +
+
+ 运行中 + 已运行: {{ system_stats.uptime }} + 今日请求: {{ system_stats.today_count }} +
+
+ 最新日志: {{ system_stats.latest_log }} +
+
+ {% endif %} + +
+ + Webhook中继管理 + + +
+ +
+ {% block content %}{% endblock %} +
+
+ + {% block scripts %}{% endblock %} + + diff --git a/templates/admin/channels.html b/templates/admin/channels.html new file mode 100644 index 0000000..3bf4317 --- /dev/null +++ b/templates/admin/channels.html @@ -0,0 +1,124 @@ +{% extends "admin/base.html" %} +{% block title %}通知渠道 - Webhook{% endblock %} +{% block content %} +
+
+

通知渠道列表

+
+
+ +
+
+ + + + + + + + + + + + + + + {% for c in channels %} + + + + + + + + {% endfor %} + +
ID名称类型Webhook URL操作
{{ c.id }}{{ c.name }} + {% if c.channel_type == 'feishu' %} + 飞书 + {% else %} + 企业微信 + {% endif %} + {{ c.webhook_url }} + +
+ + +
+
+ + + +{% endblock %} + +{% block scripts %} + +{% endblock %} diff --git a/templates/admin/dashboard.html b/templates/admin/dashboard.html new file mode 100644 index 0000000..62fb7ec --- /dev/null +++ b/templates/admin/dashboard.html @@ -0,0 +1,196 @@ +{% extends "admin/base.html" %} +{% block title %}仪表盘 - Webhook{% endblock %} +{% block content %} +
+
+
+
+
今日请求总数
+

{{ system_stats.today_count }}

+
+
+
+
+
+
+
系统运行时间
+

{{ system_stats.uptime }}

+
+
+
+
+
+
+
最新活动
+

{{ system_stats.latest_log or '暂无数据' }}

+
+
+
+
+ +
+
+ +
+
+
⚡ 模拟数据推送
+ 测试您的规则配置 +
+
+
+ + + {% if not endpoints %} +
请先创建端点
+ {% endif %} +
+
+ + + +
+ +
+
+
执行结果:
+

+                    
+                
+
+
+ +
+
+
快速开始
+
+
+

欢迎使用 Webhook 中继平台。请按照以下步骤配置您的第一个流程:

+
    +
  1. +
    +
    添加资源
    + 配置转发目标 (Target) 或通知渠道 (Channel)。 +
    + 去配置 +
  2. +
  3. +
    +
    创建端点
    + 定义一个 Webhook 接收地址 (Namespace)。 +
    + 去创建 +
  4. +
  5. +
    +
    设置规则
    + 在端点详情页添加入站匹配规则和动作。 +
    +
  6. +
+
+
+
+ +
+
+
+
系统状态
+ Online +
+
+
    +
  • + API 基础地址: + http://{your-host}:8090/webhook/ +
  • +
  • + 文档: + Swagger UI / Redoc +
  • +
  • + 当前版本: v2.2.0 (Tree Rule Engine) +
  • +
+
+
+
+
+{% endblock %} + +{% block scripts %} + +{% endblock %} diff --git a/templates/admin/endpoint_detail.html b/templates/admin/endpoint_detail.html new file mode 100644 index 0000000..0370649 --- /dev/null +++ b/templates/admin/endpoint_detail.html @@ -0,0 +1,467 @@ +{% extends "admin/base.html" %} +{% block title %}配置端点流程 - Webhook{% endblock %} +{% block content %} +
+
+

配置端点流程: {{ ep.namespace }}

+

{{ ep.description or '无描述' }}

+
+ +
+ +
+ 规则引擎说明 (树状逻辑): + 规则自上而下、从外向内匹配。只有父规则匹配成功,才会检查子规则。
+ 子规则会继承父规则设置的模板变量(如支付方式名称)。 +
+ + +{% macro render_rule(rule, level=0) %} +
+
+
+ {% if level > 0 %} + ↳ 并且 + {% else %} + 根条件: + {% endif %} + + 字段 {{ rule.match_field }} + {{ rule.operator or '等于' }} + {{ rule.match_value }} +
+
+
+ + + +
+ + + +
+
+
+
+
+ {% if rule.actions %} +
+ {% for action in rule.actions %} +
+
+ {% if action.action_type == 'forward' %} + 转发 + 目标: {{ action.target.name }} + {% else %} + 通知 + 渠道: {{ action.channel.name }} | 模板: {{ action.template.name }} + {% if action.template_vars %} + {{ action.template_vars }} + {% endif %} + {% endif %} +
+
+ +
+ + + +
+
+ + + +
+
+
+ {% endfor %} +
+ {% endif %} + +
+ + +
+
+
+ + +{% for child in rule.child_rules %} + {{ render_rule(child, level + 1) }} +{% endfor %} + +{% endmacro %} + + +{% for rule in root_rules %} + {{ render_rule(rule) }} +{% endfor %} + + +
+
+ +
+
+ + + + + + + + + + + + + + + +{% endblock %} + +{% block scripts %} + +{% endblock %} diff --git a/templates/admin/endpoints.html b/templates/admin/endpoints.html new file mode 100644 index 0000000..e4b834d --- /dev/null +++ b/templates/admin/endpoints.html @@ -0,0 +1,124 @@ +{% extends "admin/base.html" %} +{% block title %}端点管理 - Webhook{% endblock %} +{% block content %} +
+
+

接收端点 (Endpoints)

+
+
+ +
+
+ + + + + + + + + + + + + + + + + {% for e in endpoints %} + + + + + + + + + + {% endfor %} + +
IDNamespace完整URL示例描述状态创建时间操作
{{ e.id }}{{ e.namespace }} + /webhook/{{ e.namespace }} + + {{ e.description or '-' }} + {% if e.is_active %} + 启用 + {% else %} + 禁用 + {% endif %} + {{ e.created_at.strftime('%Y-%m-%d %H:%M') }} + ⚙️ 配置 +
+ + {% if e.is_active %} + + {% else %} + + {% endif %} +
+
+ + +
+
+ + + + + +{% endblock %} diff --git a/templates/admin/logs.html b/templates/admin/logs.html new file mode 100644 index 0000000..b7c9e6d --- /dev/null +++ b/templates/admin/logs.html @@ -0,0 +1,123 @@ +{% extends "admin/base.html" %} +{% block title %}系统日志 - Webhook{% endblock %} +{% block content %} +
+
+

请求日志 (最近100条)

+
+
+
+ +
+
+
+ +
+ {% for log in logs %} +
+

+ +

+
+
+
+
+
+
原始 Payload
+ +
+
{{ log.raw_body | tojson(indent=2) }}
+
+
+
分发记录
+
    + {% for d in log.delivery_logs %} +
  • +
    +
    + {% if d.type == 'relay' %} + 转发 + {% else %} + 通知 + {% endif %} + {{ d.target_name }} +
    + {{ d.response_summary }} +
    + {% if d.status == 'success' %} + 成功 + {% else %} + 失败 + {% endif %} +
  • + {% endfor %} + {% if not log.delivery_logs %} +
  • 无分发记录
  • + {% endif %} +
+
+
+
+
+
+ {% endfor %} +
+ +{% if not logs %} +
+ 暂无日志记录 +
+{% endif %} +{% endblock %} + +{% block scripts %} + +{% endblock %} diff --git a/templates/admin/targets.html b/templates/admin/targets.html new file mode 100644 index 0000000..8dc972a --- /dev/null +++ b/templates/admin/targets.html @@ -0,0 +1,111 @@ +{% extends "admin/base.html" %} +{% block title %}目标管理 - Webhook{% endblock %} +{% block content %} +
+
+

转发目标列表

+
+
+ +
+
+ + + + + + + + + + + + + + + {% for t in targets %} + + + + + + + + {% endfor %} + +
ID名称URL超时(ms)操作
{{ t.id }}{{ t.name }}{{ t.url }}{{ t.timeout_ms }} + +
+ + +
+
+ + + +{% endblock %} + +{% block scripts %} + +{% endblock %} diff --git a/templates/admin/templates.html b/templates/admin/templates.html new file mode 100644 index 0000000..ff2279c --- /dev/null +++ b/templates/admin/templates.html @@ -0,0 +1,103 @@ +{% extends "admin/base.html" %} +{% block title %}消息模板 - Webhook{% endblock %} +{% block content %} +
+
+

消息模板资源

+
+
+ +
+
+ + + + + + + + + + + + + + {% for t in templates %} + + + + + + + {% endfor %} + +
ID模板名称内容预览操作
{{ t.id }}{{ t.name }}{{ t.template_content }} + +
+ + +
+
+ + + +{% endblock %} + +{% block scripts %} + +{% endblock %}