105 lines
3.4 KiB
Python
105 lines
3.4 KiB
Python
import asyncio
|
|
from fastapi import FastAPI, Request, BackgroundTasks, Body
|
|
from fastapi.responses import JSONResponse
|
|
from app.logging import get_logger
|
|
from app.admin import router as admin_router
|
|
from app.db import SessionLocal, WebhookEndpoint, RequestLog, DeliveryLog, init_db
|
|
from app.services.engine import engine
|
|
from app.models import IncomingPayload
|
|
from contextlib import asynccontextmanager
|
|
|
|
logger = get_logger("app")
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
# Startup: Initialize DB
|
|
logger.info("Initializing database...")
|
|
init_db()
|
|
yield
|
|
# Shutdown logic if any
|
|
|
|
app = FastAPI(lifespan=lifespan)
|
|
app.include_router(admin_router)
|
|
|
|
def save_logs(namespace: str, payload_dict: dict, routed: list, notified: list):
|
|
try:
|
|
db = SessionLocal()
|
|
# Create RequestLog
|
|
req_log = RequestLog(
|
|
namespace=namespace,
|
|
remark=str(payload_dict.get("remark", "")),
|
|
event_no=str(payload_dict.get("event_define_no", "")),
|
|
raw_body=payload_dict,
|
|
status="success"
|
|
)
|
|
db.add(req_log)
|
|
db.commit()
|
|
db.refresh(req_log)
|
|
|
|
# Create DeliveryLogs
|
|
for r in routed:
|
|
db.add(DeliveryLog(
|
|
request_id=req_log.id,
|
|
target_name=r.get("target"),
|
|
type="relay",
|
|
status="success" if r.get("ok") else "failed",
|
|
response_summary=str(r.get("error") or "OK")
|
|
))
|
|
|
|
for n in notified:
|
|
db.add(DeliveryLog(
|
|
request_id=req_log.id,
|
|
target_name=n.get("channel"),
|
|
type="notify",
|
|
status="success" if n.get("ok") else "failed",
|
|
response_summary=str(n.get("error") or "OK")
|
|
))
|
|
|
|
db.commit()
|
|
db.close()
|
|
except Exception as e:
|
|
logger.error(f"Failed to save logs: {e}")
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "ok"}
|
|
|
|
@app.post("/webhook/{namespace}")
|
|
async def webhook(namespace: str, payload: IncomingPayload = Body(...), background_tasks: BackgroundTasks = BackgroundTasks()):
|
|
db = SessionLocal()
|
|
endpoint = db.query(WebhookEndpoint).filter(WebhookEndpoint.namespace == namespace).first()
|
|
|
|
if not endpoint:
|
|
# Auto-create for convenience if needed, or reject. For now reject if not exists.
|
|
# Or better: log warning but don't process.
|
|
# But per requirements "add endpoint", so we expect it to exist.
|
|
db.close()
|
|
return JSONResponse({"error": "Endpoint not found"}, status_code=404)
|
|
|
|
if not endpoint.is_active:
|
|
db.close()
|
|
return JSONResponse({"error": "Endpoint inactive"}, status_code=403)
|
|
|
|
endpoint_id = endpoint.id
|
|
db.close()
|
|
|
|
# payload is validated by Pydantic; convert to plain dict for engine
|
|
try:
|
|
body_dict = payload.model_dump()
|
|
except Exception:
|
|
return JSONResponse({"error": "Invalid payload"}, status_code=400)
|
|
|
|
# Use new engine
|
|
routed, notified = await engine.process(endpoint_id, body_dict)
|
|
|
|
# Async save logs
|
|
background_tasks.add_task(save_logs, namespace, body_dict, routed, notified)
|
|
|
|
result = {
|
|
"namespace": namespace,
|
|
"routed": routed,
|
|
"notified": notified
|
|
}
|
|
logger.info({"event": "webhook_processed", "namespace": namespace, "routed": len(routed), "notified": len(notified)})
|
|
return JSONResponse(result)
|