- 添加FastAPI应用基础结构,包括主入口、路由和模型定义
- 实现Webhook接收端点(/webhook/{namespace})和健康检查(/health)
- 添加管理后台路由和模板,支持端点、目标、渠道和模板管理
- 包含SQLite数据库模型定义和初始化逻辑
- 添加日志记录和统计服务
- 包含Dockerfile和配置示例文件
- 添加项目文档,包括设计、流程图和验收标准
486 lines
19 KiB
Python
486 lines
19 KiB
Python
from fastapi import APIRouter, Request, Form, Depends, HTTPException
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy.orm import Session, joinedload, aliased
|
|
from typing import List, Optional, Union
|
|
import json
|
|
from app.db import SessionLocal, Target, NotificationChannel, MessageTemplate, WebhookEndpoint, RequestLog, DeliveryLog, ProcessingRule, RuleAction
|
|
from app.services.stats import stats_service
|
|
from app.services.engine import engine
|
|
|
|
router = APIRouter(prefix="/admin", tags=["admin"])
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
async def global_stats_processor(request: Request):
|
|
return {
|
|
"system_stats": {
|
|
"uptime": stats_service.get_uptime(),
|
|
"today_count": stats_service.get_today_count(),
|
|
"latest_log": stats_service.get_latest_log_time()
|
|
}
|
|
}
|
|
|
|
async def render(template_name: str, context: dict):
|
|
stats = await global_stats_processor(context["request"])
|
|
context.update(stats)
|
|
return templates.TemplateResponse(template_name, context)
|
|
|
|
@router.get("/", response_class=HTMLResponse)
|
|
async def admin_index(request: Request, db: Session = Depends(get_db)):
|
|
endpoints = db.query(WebhookEndpoint).filter(WebhookEndpoint.is_active == True).all()
|
|
return await render("admin/dashboard.html", {"request": request, "active_page": "dashboard", "endpoints": endpoints})
|
|
|
|
# --- Endpoints ---
|
|
@router.get("/endpoints", response_class=HTMLResponse)
|
|
async def list_endpoints(request: Request, db: Session = Depends(get_db)):
|
|
endpoints = db.query(WebhookEndpoint).order_by(WebhookEndpoint.created_at.desc()).all()
|
|
return await render("admin/endpoints.html", {"request": request, "endpoints": endpoints, "active_page": "endpoints"})
|
|
|
|
@router.post("/endpoints")
|
|
async def add_endpoint(namespace: str = Form(...), description: str = Form(None), db: Session = Depends(get_db)):
|
|
if not namespace.replace("-", "").replace("_", "").isalnum():
|
|
pass
|
|
ep = WebhookEndpoint(namespace=namespace, description=description)
|
|
db.add(ep)
|
|
db.commit()
|
|
return RedirectResponse(url="/admin/endpoints", status_code=303)
|
|
|
|
@router.post("/endpoints/toggle")
|
|
async def toggle_endpoint(id: int = Form(...), db: Session = Depends(get_db)):
|
|
ep = db.query(WebhookEndpoint).filter(WebhookEndpoint.id == id).first()
|
|
if ep:
|
|
ep.is_active = not ep.is_active
|
|
db.commit()
|
|
return RedirectResponse(url="/admin/endpoints", status_code=303)
|
|
|
|
@router.post("/endpoints/delete")
|
|
async def delete_endpoint(id: int = Form(...), db: Session = Depends(get_db)):
|
|
db.query(WebhookEndpoint).filter(WebhookEndpoint.id == id).delete()
|
|
db.commit()
|
|
return RedirectResponse(url="/admin/endpoints", status_code=303)
|
|
|
|
# --- Endpoint Details & Rules ---
|
|
@router.get("/endpoints/{id}", response_class=HTMLResponse)
|
|
async def endpoint_detail(id: int, request: Request, db: Session = Depends(get_db)):
|
|
ep = db.query(WebhookEndpoint).filter(WebhookEndpoint.id == id).first()
|
|
if not ep:
|
|
return RedirectResponse(url="/admin/endpoints")
|
|
|
|
# Workaround: Fetch all rules for this endpoint and reconstruct tree in Python.
|
|
all_rules = db.query(ProcessingRule).options(joinedload(ProcessingRule.actions)).filter(
|
|
ProcessingRule.endpoint_id == id
|
|
).order_by(ProcessingRule.priority.desc()).all()
|
|
|
|
# Build tree manually
|
|
rule_map = {r.id: r for r in all_rules}
|
|
root_rules = []
|
|
|
|
# Initialize children list for each rule object (dynamically attached)
|
|
for r in all_rules:
|
|
r.child_rules = []
|
|
|
|
for r in all_rules:
|
|
if r.parent_rule_id:
|
|
parent = rule_map.get(r.parent_rule_id)
|
|
if parent:
|
|
parent.child_rules.append(r)
|
|
else:
|
|
root_rules.append(r)
|
|
|
|
# Load resources for modals
|
|
targets = db.query(Target).all()
|
|
channels = db.query(NotificationChannel).all()
|
|
tmpls = db.query(MessageTemplate).all()
|
|
|
|
return await render("admin/endpoint_detail.html", {
|
|
"request": request,
|
|
"ep": ep,
|
|
"root_rules": root_rules,
|
|
"targets": targets,
|
|
"channels": channels,
|
|
"templates": tmpls,
|
|
"active_page": "endpoints"
|
|
})
|
|
|
|
@router.post("/endpoints/{id}/rules")
|
|
async def add_rule(
|
|
id: int,
|
|
match_field: str = Form(...),
|
|
match_value: str = Form(...),
|
|
operator: str = Form("eq"),
|
|
parent_rule_id: Optional[Union[int, str]] = Form(None),
|
|
priority: int = Form(0),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
# Handle empty string from form for optional int
|
|
final_parent_id = None
|
|
if parent_rule_id and str(parent_rule_id).strip():
|
|
try:
|
|
final_parent_id = int(parent_rule_id)
|
|
except ValueError:
|
|
pass
|
|
|
|
rule = ProcessingRule(
|
|
endpoint_id=id,
|
|
match_field=match_field,
|
|
match_value=match_value,
|
|
operator=operator,
|
|
parent_rule_id=final_parent_id,
|
|
priority=priority
|
|
)
|
|
db.add(rule)
|
|
db.commit()
|
|
return RedirectResponse(url=f"/admin/endpoints/{id}", status_code=303)
|
|
|
|
@router.post("/rules/delete")
|
|
async def delete_rule(id: int = Form(...), endpoint_id: int = Form(...), db: Session = Depends(get_db)):
|
|
db.query(ProcessingRule).filter(ProcessingRule.id == id).delete()
|
|
db.commit()
|
|
return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303)
|
|
|
|
@router.post("/rules/update")
|
|
async def update_rule(
|
|
id: int = Form(...),
|
|
endpoint_id: int = Form(...),
|
|
match_field: str = Form(...),
|
|
match_value: str = Form(...),
|
|
operator: str = Form("eq"),
|
|
parent_rule_id: Optional[str] = Form(None),
|
|
priority: int = Form(0),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
rule = db.query(ProcessingRule).filter(ProcessingRule.id == id).first()
|
|
if not rule:
|
|
raise HTTPException(status_code=404, detail="Rule not found")
|
|
rule.match_field = match_field
|
|
rule.match_value = match_value
|
|
rule.operator = operator
|
|
rule.priority = priority
|
|
if parent_rule_id is not None and str(parent_rule_id).strip() != "":
|
|
try:
|
|
rule.parent_rule_id = int(parent_rule_id)
|
|
except ValueError:
|
|
rule.parent_rule_id = None
|
|
else:
|
|
rule.parent_rule_id = None
|
|
db.commit()
|
|
return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303)
|
|
|
|
@router.post("/actions/update")
|
|
async def update_action(
|
|
id: int = Form(...),
|
|
endpoint_id: int = Form(...),
|
|
action_type: str = Form(...),
|
|
target_id: Optional[str] = Form(None),
|
|
channel_id: Optional[str] = Form(None),
|
|
template_id: Optional[str] = Form(None),
|
|
template_vars_str: Optional[str] = Form(None),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
action = db.query(RuleAction).filter(RuleAction.id == id).first()
|
|
if not action:
|
|
raise HTTPException(status_code=404, detail="Action not found")
|
|
t_vars = None
|
|
if template_vars_str and template_vars_str.strip():
|
|
try:
|
|
t_vars = json.loads(template_vars_str)
|
|
except Exception:
|
|
t_vars = None
|
|
def clean_int(val):
|
|
if val and str(val).strip():
|
|
try:
|
|
return int(val)
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
action.action_type = action_type
|
|
action.target_id = clean_int(target_id) if action_type == 'forward' else None
|
|
action.channel_id = clean_int(channel_id) if action_type == 'notify' else None
|
|
action.template_id = clean_int(template_id) if action_type == 'notify' else None
|
|
action.template_vars = t_vars
|
|
db.commit()
|
|
return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303)
|
|
|
|
def _duplicate_rule_tree(db: Session, src_rule: ProcessingRule, endpoint_id: int, new_parent_id: Optional[int]) -> int:
|
|
new_rule = ProcessingRule(
|
|
endpoint_id=endpoint_id,
|
|
match_field=src_rule.match_field,
|
|
operator=src_rule.operator,
|
|
match_value=src_rule.match_value,
|
|
priority=src_rule.priority,
|
|
parent_rule_id=new_parent_id
|
|
)
|
|
db.add(new_rule)
|
|
db.commit()
|
|
db.refresh(new_rule)
|
|
for a in src_rule.actions:
|
|
db.add(RuleAction(
|
|
rule_id=new_rule.id,
|
|
action_type=a.action_type,
|
|
target_id=a.target_id if a.action_type == 'forward' else None,
|
|
channel_id=a.channel_id if a.action_type == 'notify' else None,
|
|
template_id=a.template_id if a.action_type == 'notify' else None,
|
|
template_vars=a.template_vars
|
|
))
|
|
db.commit()
|
|
children = db.query(ProcessingRule).filter(ProcessingRule.parent_rule_id == src_rule.id).all()
|
|
for child in children:
|
|
_duplicate_rule_tree(db, child, endpoint_id, new_rule.id)
|
|
return new_rule.id
|
|
|
|
@router.post("/rules/duplicate")
|
|
async def duplicate_rule(
|
|
rule_id: int = Form(...),
|
|
endpoint_id: int = Form(...),
|
|
parent_rule_id: Optional[str] = Form(None),
|
|
include_children: Optional[str] = Form("true"),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
src = db.query(ProcessingRule).filter(ProcessingRule.id == rule_id).first()
|
|
if not src:
|
|
raise HTTPException(status_code=404, detail="Rule not found")
|
|
new_parent = None
|
|
if parent_rule_id and str(parent_rule_id).strip():
|
|
try:
|
|
new_parent = int(parent_rule_id)
|
|
except ValueError:
|
|
new_parent = None
|
|
new_rule_id = _duplicate_rule_tree(db, src, endpoint_id, new_parent) if (include_children or include_children.lower() == "true") else None
|
|
if not new_rule_id:
|
|
new_rule = ProcessingRule(
|
|
endpoint_id=endpoint_id,
|
|
match_field=src.match_field,
|
|
operator=src.operator,
|
|
match_value=src.match_value,
|
|
priority=src.priority,
|
|
parent_rule_id=new_parent
|
|
)
|
|
db.add(new_rule)
|
|
db.commit()
|
|
db.refresh(new_rule)
|
|
for a in src.actions:
|
|
db.add(RuleAction(
|
|
rule_id=new_rule.id,
|
|
action_type=a.action_type,
|
|
target_id=a.target_id if a.action_type == 'forward' else None,
|
|
channel_id=a.channel_id if a.action_type == 'notify' else None,
|
|
template_id=a.template_id if a.action_type == 'notify' else None,
|
|
template_vars=a.template_vars
|
|
))
|
|
db.commit()
|
|
return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303)
|
|
|
|
@router.post("/actions/duplicate")
|
|
async def duplicate_action(id: int = Form(...), endpoint_id: int = Form(...), db: Session = Depends(get_db)):
|
|
src = db.query(RuleAction).filter(RuleAction.id == id).first()
|
|
if not src:
|
|
raise HTTPException(status_code=404, detail="Action not found")
|
|
db.add(RuleAction(
|
|
rule_id=src.rule_id,
|
|
action_type=src.action_type,
|
|
target_id=src.target_id if src.action_type == 'forward' else None,
|
|
channel_id=src.channel_id if src.action_type == 'notify' else None,
|
|
template_id=src.template_id if src.action_type == 'notify' else None,
|
|
template_vars=src.template_vars
|
|
))
|
|
db.commit()
|
|
return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303)
|
|
@router.post("/rules/{rule_id}/actions")
|
|
async def add_action(
|
|
rule_id: int,
|
|
endpoint_id: int = Form(...),
|
|
action_type: str = Form(...),
|
|
target_id: Optional[Union[int, str]] = Form(None),
|
|
channel_id: Optional[Union[int, str]] = Form(None),
|
|
template_id: Optional[Union[int, str]] = Form(None),
|
|
template_vars_str: Optional[str] = Form(None),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
t_vars = None
|
|
if template_vars_str and template_vars_str.strip():
|
|
try:
|
|
t_vars = json.loads(template_vars_str)
|
|
except Exception:
|
|
pass
|
|
|
|
# Helper to clean int params
|
|
def clean_int(val):
|
|
if val and str(val).strip():
|
|
try:
|
|
return int(val)
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
action = RuleAction(
|
|
rule_id=rule_id,
|
|
action_type=action_type,
|
|
target_id=clean_int(target_id) if action_type == 'forward' else None,
|
|
channel_id=clean_int(channel_id) if action_type == 'notify' else None,
|
|
template_id=clean_int(template_id) if action_type == 'notify' else None,
|
|
template_vars=t_vars
|
|
)
|
|
db.add(action)
|
|
db.commit()
|
|
return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303)
|
|
|
|
@router.post("/actions/delete")
|
|
async def delete_action(id: int = Form(...), endpoint_id: int = Form(...), db: Session = Depends(get_db)):
|
|
db.query(RuleAction).filter(RuleAction.id == id).delete()
|
|
db.commit()
|
|
return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303)
|
|
|
|
|
|
# --- Targets ---
|
|
@router.get("/targets", response_class=HTMLResponse)
|
|
async def list_targets(request: Request, db: Session = Depends(get_db)):
|
|
targets = db.query(Target).all()
|
|
return await render("admin/targets.html", {"request": request, "targets": targets, "active_page": "targets"})
|
|
|
|
@router.post("/targets")
|
|
async def add_target(name: str = Form(...), url: str = Form(...), timeout_ms: int = Form(5000), db: Session = Depends(get_db)):
|
|
t = Target(name=name, url=url, timeout_ms=timeout_ms)
|
|
db.add(t)
|
|
db.commit()
|
|
return RedirectResponse(url="/admin/targets", status_code=303)
|
|
|
|
@router.post("/targets/update")
|
|
async def update_target(id: int = Form(...), name: str = Form(...), url: str = Form(...), timeout_ms: int = Form(5000), db: Session = Depends(get_db)):
|
|
t = db.query(Target).filter(Target.id == id).first()
|
|
if t:
|
|
t.name = name
|
|
t.url = url
|
|
t.timeout_ms = timeout_ms
|
|
db.commit()
|
|
return RedirectResponse(url="/admin/targets", status_code=303)
|
|
|
|
@router.post("/targets/delete")
|
|
async def delete_target(id: int = Form(...), db: Session = Depends(get_db)):
|
|
db.query(Target).filter(Target.id == id).delete()
|
|
db.commit()
|
|
return RedirectResponse(url="/admin/targets", status_code=303)
|
|
|
|
# --- Channels ---
|
|
@router.get("/channels", response_class=HTMLResponse)
|
|
async def list_channels(request: Request, db: Session = Depends(get_db)):
|
|
channels = db.query(NotificationChannel).all()
|
|
return await render("admin/channels.html", {"request": request, "channels": channels, "active_page": "channels"})
|
|
|
|
@router.post("/channels")
|
|
async def add_channel(name: str = Form(...), channel_type: str = Form(...), webhook_url: str = Form(...), db: Session = Depends(get_db)):
|
|
c = NotificationChannel(name=name, channel_type=channel_type, webhook_url=webhook_url)
|
|
db.add(c)
|
|
db.commit()
|
|
return RedirectResponse(url="/admin/channels", status_code=303)
|
|
|
|
@router.post("/channels/update")
|
|
async def update_channel(id: int = Form(...), name: str = Form(...), channel_type: str = Form(...), webhook_url: str = Form(...), db: Session = Depends(get_db)):
|
|
c = db.query(NotificationChannel).filter(NotificationChannel.id == id).first()
|
|
if c:
|
|
c.name = name
|
|
c.channel_type = channel_type
|
|
c.webhook_url = webhook_url
|
|
db.commit()
|
|
return RedirectResponse(url="/admin/channels", status_code=303)
|
|
|
|
@router.post("/channels/delete")
|
|
async def delete_channel(id: int = Form(...), db: Session = Depends(get_db)):
|
|
db.query(NotificationChannel).filter(NotificationChannel.id == id).delete()
|
|
db.commit()
|
|
return RedirectResponse(url="/admin/channels", status_code=303)
|
|
|
|
# --- Templates ---
|
|
@router.get("/templates", response_class=HTMLResponse)
|
|
async def list_templates(request: Request, db: Session = Depends(get_db)):
|
|
tmpls = db.query(MessageTemplate).all()
|
|
return await render("admin/templates.html", {"request": request, "templates": tmpls, "active_page": "templates"})
|
|
|
|
@router.post("/templates")
|
|
async def add_template(name: str = Form(...), template_content: str = Form(...), db: Session = Depends(get_db)):
|
|
t = MessageTemplate(name=name, template_content=template_content)
|
|
db.add(t)
|
|
db.commit()
|
|
return RedirectResponse(url="/admin/templates", status_code=303)
|
|
|
|
@router.post("/templates/update")
|
|
async def update_template(id: int = Form(...), name: str = Form(...), template_content: str = Form(...), db: Session = Depends(get_db)):
|
|
t = db.query(MessageTemplate).filter(MessageTemplate.id == id).first()
|
|
if t:
|
|
t.name = name
|
|
t.template_content = template_content
|
|
db.commit()
|
|
return RedirectResponse(url="/admin/templates", status_code=303)
|
|
|
|
@router.post("/templates/delete")
|
|
async def delete_template(id: int = Form(...), db: Session = Depends(get_db)):
|
|
db.query(MessageTemplate).filter(MessageTemplate.id == id).delete()
|
|
db.commit()
|
|
return RedirectResponse(url="/admin/templates", status_code=303)
|
|
|
|
# --- Logs ---
|
|
@router.get("/logs", response_class=HTMLResponse)
|
|
async def list_logs(request: Request, db: Session = Depends(get_db)):
|
|
logs = db.query(RequestLog).options(joinedload(RequestLog.delivery_logs))\
|
|
.order_by(RequestLog.received_at.desc()).limit(100).all()
|
|
return await render("admin/logs.html", {"request": request, "logs": logs, "active_page": "logs"})
|
|
|
|
@router.post("/logs/clear")
|
|
async def clear_logs(db: Session = Depends(get_db)):
|
|
db.query(DeliveryLog).delete()
|
|
db.query(RequestLog).delete()
|
|
db.commit()
|
|
return RedirectResponse(url="/admin/logs", status_code=303)
|
|
|
|
@router.post("/logs/{id}/replay")
|
|
async def replay_log(id: int, db: Session = Depends(get_db)):
|
|
log = db.query(RequestLog).filter(RequestLog.id == id).first()
|
|
if not log:
|
|
return JSONResponse({"error": "Log not found"}, status_code=404)
|
|
|
|
# Find endpoint
|
|
endpoint = db.query(WebhookEndpoint).filter(WebhookEndpoint.namespace == log.namespace).first()
|
|
if not endpoint or not endpoint.is_active:
|
|
return JSONResponse({"error": "Endpoint inactive or missing"}, status_code=400)
|
|
|
|
# Re-process
|
|
routed, notified = await engine.process(endpoint.id, log.raw_body)
|
|
|
|
new_log = RequestLog(
|
|
namespace=log.namespace,
|
|
remark=log.remark,
|
|
event_no=log.event_no,
|
|
raw_body=log.raw_body,
|
|
status="replay"
|
|
)
|
|
db.add(new_log)
|
|
db.commit()
|
|
db.refresh(new_log)
|
|
|
|
for r in routed:
|
|
db.add(DeliveryLog(
|
|
request_id=new_log.id,
|
|
target_name=r.get("target"),
|
|
type="relay",
|
|
status="success" if r.get("ok") else "failed",
|
|
response_summary=str(r.get("error") or "OK")
|
|
))
|
|
|
|
for n in notified:
|
|
db.add(DeliveryLog(
|
|
request_id=new_log.id,
|
|
target_name=n.get("channel"),
|
|
type="notify",
|
|
status="success" if n.get("ok") else "failed",
|
|
response_summary=str(n.get("error") or "OK")
|
|
))
|
|
|
|
db.commit()
|
|
return JSONResponse({"status": "ok", "new_log_id": new_log.id})
|