WebhockTransfer/app/admin.py

512 lines
20 KiB
Python

from fastapi import APIRouter, Request, Form, Depends, HTTPException, Body
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session, joinedload, aliased
from typing import List, Optional, Union
import json
from app.db import SessionLocal, Target, NotificationChannel, MessageTemplate, WebhookEndpoint, RequestLog, DeliveryLog, ProcessingRule, RuleAction
from app.services.stats import stats_service
from app.services.engine import engine
from app.templates import safe_render
router = APIRouter(prefix="/admin", tags=["admin"])
templates = Jinja2Templates(directory="templates")
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
async def global_stats_processor(request: Request):
return {
"system_stats": {
"uptime": stats_service.get_uptime(),
"today_count": stats_service.get_today_count(),
"latest_log": stats_service.get_latest_log_time()
}
}
async def render(template_name: str, context: dict):
stats = await global_stats_processor(context["request"])
context.update(stats)
return templates.TemplateResponse(template_name, context)
@router.get("/", response_class=HTMLResponse)
async def admin_index(request: Request, db: Session = Depends(get_db)):
endpoints = db.query(WebhookEndpoint).filter(WebhookEndpoint.is_active == True).all()
return await render("admin/dashboard.html", {"request": request, "active_page": "dashboard", "endpoints": endpoints})
# --- Endpoints ---
@router.get("/endpoints", response_class=HTMLResponse)
async def list_endpoints(request: Request, db: Session = Depends(get_db)):
endpoints = db.query(WebhookEndpoint).order_by(WebhookEndpoint.created_at.desc()).all()
return await render("admin/endpoints.html", {"request": request, "endpoints": endpoints, "active_page": "endpoints"})
@router.post("/endpoints")
async def add_endpoint(namespace: str = Form(...), description: str = Form(None), db: Session = Depends(get_db)):
if not namespace.replace("-", "").replace("_", "").isalnum():
pass
ep = WebhookEndpoint(namespace=namespace, description=description)
db.add(ep)
db.commit()
return RedirectResponse(url="/admin/endpoints", status_code=303)
@router.post("/endpoints/toggle")
async def toggle_endpoint(id: int = Form(...), db: Session = Depends(get_db)):
ep = db.query(WebhookEndpoint).filter(WebhookEndpoint.id == id).first()
if ep:
ep.is_active = not ep.is_active
db.commit()
return RedirectResponse(url="/admin/endpoints", status_code=303)
@router.post("/endpoints/delete")
async def delete_endpoint(id: int = Form(...), db: Session = Depends(get_db)):
db.query(WebhookEndpoint).filter(WebhookEndpoint.id == id).delete()
db.commit()
return RedirectResponse(url="/admin/endpoints", status_code=303)
# --- Endpoint Details & Rules ---
@router.get("/endpoints/{id}", response_class=HTMLResponse)
async def endpoint_detail(id: int, request: Request, db: Session = Depends(get_db)):
ep = db.query(WebhookEndpoint).filter(WebhookEndpoint.id == id).first()
if not ep:
return RedirectResponse(url="/admin/endpoints")
# Workaround: Fetch all rules for this endpoint and reconstruct tree in Python.
all_rules = db.query(ProcessingRule).options(joinedload(ProcessingRule.actions)).filter(
ProcessingRule.endpoint_id == id
).order_by(ProcessingRule.priority.desc()).all()
# Build tree manually
rule_map = {r.id: r for r in all_rules}
root_rules = []
# Initialize children list for each rule object (dynamically attached)
for r in all_rules:
r.child_rules = []
for r in all_rules:
if r.parent_rule_id:
parent = rule_map.get(r.parent_rule_id)
if parent:
parent.child_rules.append(r)
else:
root_rules.append(r)
# Load resources for modals
targets = db.query(Target).all()
channels = db.query(NotificationChannel).all()
tmpls = db.query(MessageTemplate).all()
return await render("admin/endpoint_detail.html", {
"request": request,
"ep": ep,
"root_rules": root_rules,
"targets": targets,
"channels": channels,
"templates": tmpls,
"active_page": "endpoints"
})
@router.post("/endpoints/{id}/rules")
async def add_rule(
id: int,
match_field: str = Form(...),
match_value: str = Form(...),
operator: str = Form("eq"),
parent_rule_id: Optional[Union[int, str]] = Form(None),
priority: int = Form(0),
db: Session = Depends(get_db)
):
# Handle empty string from form for optional int
final_parent_id = None
if parent_rule_id and str(parent_rule_id).strip():
try:
final_parent_id = int(parent_rule_id)
except ValueError:
pass
rule = ProcessingRule(
endpoint_id=id,
match_field=match_field,
match_value=match_value,
operator=operator,
parent_rule_id=final_parent_id,
priority=priority
)
db.add(rule)
db.commit()
return RedirectResponse(url=f"/admin/endpoints/{id}", status_code=303)
@router.post("/rules/delete")
async def delete_rule(id: int = Form(...), endpoint_id: int = Form(...), db: Session = Depends(get_db)):
db.query(ProcessingRule).filter(ProcessingRule.id == id).delete()
db.commit()
return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303)
@router.post("/rules/update")
async def update_rule(
id: int = Form(...),
endpoint_id: int = Form(...),
match_field: str = Form(...),
match_value: str = Form(...),
operator: str = Form("eq"),
parent_rule_id: Optional[str] = Form(None),
priority: int = Form(0),
db: Session = Depends(get_db)
):
rule = db.query(ProcessingRule).filter(ProcessingRule.id == id).first()
if not rule:
raise HTTPException(status_code=404, detail="Rule not found")
rule.match_field = match_field
rule.match_value = match_value
rule.operator = operator
rule.priority = priority
if parent_rule_id is not None and str(parent_rule_id).strip() != "":
try:
rule.parent_rule_id = int(parent_rule_id)
except ValueError:
rule.parent_rule_id = None
else:
rule.parent_rule_id = None
db.commit()
return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303)
@router.post("/actions/update")
async def update_action(
id: int = Form(...),
endpoint_id: int = Form(...),
action_type: str = Form(...),
target_id: Optional[str] = Form(None),
channel_id: Optional[str] = Form(None),
template_id: Optional[str] = Form(None),
template_vars_str: Optional[str] = Form(None),
db: Session = Depends(get_db)
):
action = db.query(RuleAction).filter(RuleAction.id == id).first()
if not action:
raise HTTPException(status_code=404, detail="Action not found")
t_vars = None
if template_vars_str and template_vars_str.strip():
try:
t_vars = json.loads(template_vars_str)
except Exception:
t_vars = None
def clean_int(val):
if val and str(val).strip():
try:
return int(val)
except ValueError:
return None
return None
action.action_type = action_type
action.target_id = clean_int(target_id) if action_type == 'forward' else None
action.channel_id = clean_int(channel_id) if action_type == 'notify' else None
action.template_id = clean_int(template_id) if action_type == 'notify' else None
action.template_vars = t_vars
db.commit()
return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303)
def _duplicate_rule_tree(db: Session, src_rule: ProcessingRule, endpoint_id: int, new_parent_id: Optional[int]) -> int:
new_rule = ProcessingRule(
endpoint_id=endpoint_id,
match_field=src_rule.match_field,
operator=src_rule.operator,
match_value=src_rule.match_value,
priority=src_rule.priority,
parent_rule_id=new_parent_id
)
db.add(new_rule)
db.commit()
db.refresh(new_rule)
for a in src_rule.actions:
db.add(RuleAction(
rule_id=new_rule.id,
action_type=a.action_type,
target_id=a.target_id if a.action_type == 'forward' else None,
channel_id=a.channel_id if a.action_type == 'notify' else None,
template_id=a.template_id if a.action_type == 'notify' else None,
template_vars=a.template_vars
))
db.commit()
children = db.query(ProcessingRule).filter(ProcessingRule.parent_rule_id == src_rule.id).all()
for child in children:
_duplicate_rule_tree(db, child, endpoint_id, new_rule.id)
return new_rule.id
@router.post("/rules/duplicate")
async def duplicate_rule(
rule_id: int = Form(...),
endpoint_id: int = Form(...),
parent_rule_id: Optional[str] = Form(None),
include_children: Optional[str] = Form("true"),
db: Session = Depends(get_db)
):
src = db.query(ProcessingRule).filter(ProcessingRule.id == rule_id).first()
if not src:
raise HTTPException(status_code=404, detail="Rule not found")
new_parent = None
if parent_rule_id and str(parent_rule_id).strip():
try:
new_parent = int(parent_rule_id)
except ValueError:
new_parent = None
new_rule_id = _duplicate_rule_tree(db, src, endpoint_id, new_parent) if (include_children or include_children.lower() == "true") else None
if not new_rule_id:
new_rule = ProcessingRule(
endpoint_id=endpoint_id,
match_field=src.match_field,
operator=src.operator,
match_value=src.match_value,
priority=src.priority,
parent_rule_id=new_parent
)
db.add(new_rule)
db.commit()
db.refresh(new_rule)
for a in src.actions:
db.add(RuleAction(
rule_id=new_rule.id,
action_type=a.action_type,
target_id=a.target_id if a.action_type == 'forward' else None,
channel_id=a.channel_id if a.action_type == 'notify' else None,
template_id=a.template_id if a.action_type == 'notify' else None,
template_vars=a.template_vars
))
db.commit()
return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303)
@router.post("/actions/duplicate")
async def duplicate_action(id: int = Form(...), endpoint_id: int = Form(...), db: Session = Depends(get_db)):
src = db.query(RuleAction).filter(RuleAction.id == id).first()
if not src:
raise HTTPException(status_code=404, detail="Action not found")
db.add(RuleAction(
rule_id=src.rule_id,
action_type=src.action_type,
target_id=src.target_id if src.action_type == 'forward' else None,
channel_id=src.channel_id if src.action_type == 'notify' else None,
template_id=src.template_id if src.action_type == 'notify' else None,
template_vars=src.template_vars
))
db.commit()
return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303)
@router.post("/rules/{rule_id}/actions")
async def add_action(
rule_id: int,
endpoint_id: int = Form(...),
action_type: str = Form(...),
target_id: Optional[Union[int, str]] = Form(None),
channel_id: Optional[Union[int, str]] = Form(None),
template_id: Optional[Union[int, str]] = Form(None),
template_vars_str: Optional[str] = Form(None),
db: Session = Depends(get_db)
):
t_vars = None
if template_vars_str and template_vars_str.strip():
try:
t_vars = json.loads(template_vars_str)
except Exception:
pass
# Helper to clean int params
def clean_int(val):
if val and str(val).strip():
try:
return int(val)
except ValueError:
return None
return None
action = RuleAction(
rule_id=rule_id,
action_type=action_type,
target_id=clean_int(target_id) if action_type == 'forward' else None,
channel_id=clean_int(channel_id) if action_type == 'notify' else None,
template_id=clean_int(template_id) if action_type == 'notify' else None,
template_vars=t_vars
)
db.add(action)
db.commit()
return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303)
@router.post("/actions/delete")
async def delete_action(id: int = Form(...), endpoint_id: int = Form(...), db: Session = Depends(get_db)):
db.query(RuleAction).filter(RuleAction.id == id).delete()
db.commit()
return RedirectResponse(url=f"/admin/endpoints/{endpoint_id}", status_code=303)
# --- Targets ---
@router.get("/targets", response_class=HTMLResponse)
async def list_targets(request: Request, db: Session = Depends(get_db)):
targets = db.query(Target).all()
return await render("admin/targets.html", {"request": request, "targets": targets, "active_page": "targets"})
@router.post("/targets")
async def add_target(name: str = Form(...), url: str = Form(...), timeout_ms: int = Form(5000), db: Session = Depends(get_db)):
t = Target(name=name, url=url, timeout_ms=timeout_ms)
db.add(t)
db.commit()
return RedirectResponse(url="/admin/targets", status_code=303)
@router.post("/targets/update")
async def update_target(id: int = Form(...), name: str = Form(...), url: str = Form(...), timeout_ms: int = Form(5000), db: Session = Depends(get_db)):
t = db.query(Target).filter(Target.id == id).first()
if t:
t.name = name
t.url = url
t.timeout_ms = timeout_ms
db.commit()
return RedirectResponse(url="/admin/targets", status_code=303)
@router.post("/targets/delete")
async def delete_target(id: int = Form(...), db: Session = Depends(get_db)):
db.query(Target).filter(Target.id == id).delete()
db.commit()
return RedirectResponse(url="/admin/targets", status_code=303)
# --- Channels ---
@router.get("/channels", response_class=HTMLResponse)
async def list_channels(request: Request, db: Session = Depends(get_db)):
channels = db.query(NotificationChannel).all()
return await render("admin/channels.html", {"request": request, "channels": channels, "active_page": "channels"})
@router.post("/channels")
async def add_channel(name: str = Form(...), channel_type: str = Form(...), webhook_url: str = Form(...), db: Session = Depends(get_db)):
c = NotificationChannel(name=name, channel_type=channel_type, webhook_url=webhook_url)
db.add(c)
db.commit()
return RedirectResponse(url="/admin/channels", status_code=303)
@router.post("/channels/update")
async def update_channel(id: int = Form(...), name: str = Form(...), channel_type: str = Form(...), webhook_url: str = Form(...), db: Session = Depends(get_db)):
c = db.query(NotificationChannel).filter(NotificationChannel.id == id).first()
if c:
c.name = name
c.channel_type = channel_type
c.webhook_url = webhook_url
db.commit()
return RedirectResponse(url="/admin/channels", status_code=303)
@router.post("/channels/delete")
async def delete_channel(id: int = Form(...), db: Session = Depends(get_db)):
db.query(NotificationChannel).filter(NotificationChannel.id == id).delete()
db.commit()
return RedirectResponse(url="/admin/channels", status_code=303)
# --- Templates ---
@router.get("/templates", response_class=HTMLResponse)
async def list_templates(request: Request, db: Session = Depends(get_db)):
tmpls = db.query(MessageTemplate).all()
return await render("admin/templates.html", {"request": request, "templates": tmpls, "active_page": "templates"})
@router.post("/templates")
async def add_template(name: str = Form(...), template_content: str = Form(...), db: Session = Depends(get_db)):
t = MessageTemplate(name=name, template_content=template_content)
db.add(t)
db.commit()
return RedirectResponse(url="/admin/templates", status_code=303)
@router.post("/templates/update")
async def update_template(id: int = Form(...), name: str = Form(...), template_content: str = Form(...), db: Session = Depends(get_db)):
t = db.query(MessageTemplate).filter(MessageTemplate.id == id).first()
if t:
t.name = name
t.template_content = template_content
db.commit()
return RedirectResponse(url="/admin/templates", status_code=303)
@router.post("/templates/delete")
async def delete_template(id: int = Form(...), db: Session = Depends(get_db)):
db.query(MessageTemplate).filter(MessageTemplate.id == id).delete()
db.commit()
return RedirectResponse(url="/admin/templates", status_code=303)
@router.post("/templates/preview")
async def preview_template(data: dict = Body(...), db: Session = Depends(get_db)):
"""
Preview a template with an optional sample payload.
Request JSON: { "template_content": "...", "sample_payload": {...}, "vars": {...} }
"""
template_content = data.get("template_content")
if not template_content:
return JSONResponse({"error": "template_content is required"}, status_code=400)
sample_payload = data.get("sample_payload") or {}
# Build render context using engine's flatten helper if available
try:
render_context = engine._flatten_payload(sample_payload) if hasattr(engine, "_flatten_payload") else dict(sample_payload)
# Merge any provided template vars
extra_vars = data.get("vars") or {}
if isinstance(extra_vars, dict):
render_context.update(extra_vars)
rendered = safe_render(template_content, render_context)
return JSONResponse({"rendered": rendered})
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=400)
# --- Logs ---
@router.get("/logs", response_class=HTMLResponse)
async def list_logs(request: Request, db: Session = Depends(get_db)):
logs = db.query(RequestLog).options(joinedload(RequestLog.delivery_logs))\
.order_by(RequestLog.received_at.desc()).limit(100).all()
return await render("admin/logs.html", {"request": request, "logs": logs, "active_page": "logs"})
@router.post("/logs/clear")
async def clear_logs(db: Session = Depends(get_db)):
db.query(DeliveryLog).delete()
db.query(RequestLog).delete()
db.commit()
return RedirectResponse(url="/admin/logs", status_code=303)
@router.post("/logs/{id}/replay")
async def replay_log(id: int, db: Session = Depends(get_db)):
log = db.query(RequestLog).filter(RequestLog.id == id).first()
if not log:
return JSONResponse({"error": "Log not found"}, status_code=404)
# Find endpoint
endpoint = db.query(WebhookEndpoint).filter(WebhookEndpoint.namespace == log.namespace).first()
if not endpoint or not endpoint.is_active:
return JSONResponse({"error": "Endpoint inactive or missing"}, status_code=400)
# Re-process
routed, notified = await engine.process(endpoint.id, log.raw_body)
new_log = RequestLog(
namespace=log.namespace,
remark=log.remark,
event_no=log.event_no,
raw_body=log.raw_body,
status="replay"
)
db.add(new_log)
db.commit()
db.refresh(new_log)
for r in routed:
db.add(DeliveryLog(
request_id=new_log.id,
target_name=r.get("target"),
type="relay",
status="success" if r.get("ok") else "failed",
response_summary=str(r.get("error") or "OK")
))
for n in notified:
db.add(DeliveryLog(
request_id=new_log.id,
target_name=n.get("channel"),
type="notify",
status="success" if n.get("ok") else "failed",
response_summary=str(n.get("error") or "OK")
))
db.commit()
return JSONResponse({"status": "ok", "new_log_id": new_log.id})