"""Cloud sync endpoints (Gitea-based).""" from concurrent.futures import ThreadPoolExecutor from pathlib import Path from fastapi import APIRouter, HTTPException, Depends, Request from pydantic import BaseModel from ..auth.dependencies import get_current_user from ..services.task_manager import TaskManager router = APIRouter(prefix="/api/sync", tags=["sync"]) _project_root = Path(__file__).resolve().parent.parent.parent.parent class SyncResponse(BaseModel): task_id: str status: str message: str def _get_sync(): from app.core.utils.cloud_sync import GiteaSync from app.config.settings import ConfigManager cfg = ConfigManager() return GiteaSync.from_config(cfg) def _run_sync_in_thread(tm, task_id, action_name, sync_method): """Run a blocking sync operation in a thread.""" def _run(): try: tm.update_progress(task_id, 10, "正在初始化同步...") sync = _get_sync() if sync is None: tm.set_failed(task_id, "Gitea 配置不完整,请先在系统配置中设置 base_url/owner/repo/token") return tm.update_progress(task_id, 30, f"正在{action_name}文件...") tm.add_log(task_id, f"[{action_name}] 开始{action_name}") result = sync_method(sync) tm.add_log(task_id, f"[{action_name}] 完成: {result}") tm.set_completed(task_id, message=f"{action_name}完成") except Exception as e: tm.set_failed(task_id, str(e)) pool = ThreadPoolExecutor(max_workers=1) pool.submit(_run) pool.shutdown(wait=False) @router.post("/push", response_model=SyncResponse) async def sync_push( request: Request, current_user: dict = Depends(get_current_user), ): tm = request.state.task_manager task = tm.create_task("推送到云端") _run_sync_in_thread(tm, task.id, "Push", lambda s: s.push()) return SyncResponse(task_id=task.id, status="accepted", message="推送任务已创建") @router.post("/pull", response_model=SyncResponse) async def sync_pull( request: Request, current_user: dict = Depends(get_current_user), ): tm = request.state.task_manager task = tm.create_task("从云端拉取") _run_sync_in_thread(tm, task.id, "Pull", lambda s: s.pull()) return SyncResponse(task_id=task.id, status="accepted", message="拉取任务已创建") @router.get("/status") async def sync_status( current_user: dict = Depends(get_current_user), ): try: from app.config.settings import ConfigManager import httpx as _httpx cfg = ConfigManager() base_url = cfg.get("Gitea", "base_url", fallback="").strip() owner = cfg.get("Gitea", "owner", fallback="").strip() repo = cfg.get("Gitea", "repo", fallback="").strip() token = cfg.get("Gitea", "token", fallback="").strip() enabled = bool(base_url and owner and repo and token) repo_url = f"{base_url}/{owner}/{repo}" if enabled else "" connected = False error = "" if enabled: try: async with _httpx.AsyncClient(timeout=10) as client: resp = await client.get( f"{base_url}/api/v1/repos/{owner}/{repo}", headers={"Authorization": f"token {token}"}, ) connected = resp.status_code == 200 if not connected: error = f"Gitea 返回 {resp.status_code}" except Exception as e: error = str(e) return {"enabled": enabled, "connected": connected, "repo_url": repo_url, "error": error} except Exception as e: return {"enabled": False, "connected": False, "repo_url": "", "error": str(e)}