WebhockTransfer/app/main.py
houhuan 2bc7460f1f feat: 初始化Webhook中继系统项目
- 添加FastAPI应用基础结构,包括主入口、路由和模型定义
- 实现Webhook接收端点(/webhook/{namespace})和健康检查(/health)
- 添加管理后台路由和模板,支持端点、目标、渠道和模板管理
- 包含SQLite数据库模型定义和初始化逻辑
- 添加日志记录和统计服务
- 包含Dockerfile和配置示例文件
- 添加项目文档,包括设计、流程图和验收标准
2025-12-21 18:43:12 +08:00

103 lines
3.2 KiB
Python

import asyncio
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import JSONResponse
from app.logging import get_logger
from app.admin import router as admin_router
from app.db import SessionLocal, WebhookEndpoint, RequestLog, DeliveryLog, init_db
from app.services.engine import engine
from contextlib import asynccontextmanager
logger = get_logger("app")
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Initialize DB
logger.info("Initializing database...")
init_db()
yield
# Shutdown logic if any
app = FastAPI(lifespan=lifespan)
app.include_router(admin_router)
def save_logs(namespace: str, payload_dict: dict, routed: list, notified: list):
try:
db = SessionLocal()
# Create RequestLog
req_log = RequestLog(
namespace=namespace,
remark=str(payload_dict.get("remark", "")),
event_no=str(payload_dict.get("event_define_no", "")),
raw_body=payload_dict,
status="success"
)
db.add(req_log)
db.commit()
db.refresh(req_log)
# Create DeliveryLogs
for r in routed:
db.add(DeliveryLog(
request_id=req_log.id,
target_name=r.get("target"),
type="relay",
status="success" if r.get("ok") else "failed",
response_summary=str(r.get("error") or "OK")
))
for n in notified:
db.add(DeliveryLog(
request_id=req_log.id,
target_name=n.get("channel"),
type="notify",
status="success" if n.get("ok") else "failed",
response_summary=str(n.get("error") or "OK")
))
db.commit()
db.close()
except Exception as e:
logger.error(f"Failed to save logs: {e}")
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/webhook/{namespace}")
async def webhook(namespace: str, request: Request, background_tasks: BackgroundTasks):
db = SessionLocal()
endpoint = db.query(WebhookEndpoint).filter(WebhookEndpoint.namespace == namespace).first()
if not endpoint:
# Auto-create for convenience if needed, or reject. For now reject if not exists.
# Or better: log warning but don't process.
# But per requirements "add endpoint", so we expect it to exist.
db.close()
return JSONResponse({"error": "Endpoint not found"}, status_code=404)
if not endpoint.is_active:
db.close()
return JSONResponse({"error": "Endpoint inactive"}, status_code=403)
endpoint_id = endpoint.id
db.close()
try:
body = await request.json()
except Exception:
return JSONResponse({"error": "Invalid JSON"}, status_code=400)
# Use new engine
routed, notified = await engine.process(endpoint_id, body)
# Async save logs
background_tasks.add_task(save_logs, namespace, body, routed, notified)
result = {
"namespace": namespace,
"routed": routed,
"notified": notified
}
logger.info({"event": "webhook_processed", "namespace": namespace, "routed": len(routed), "notified": len(notified)})
return JSONResponse(result)