# 日志系统 + 任务历史 + 文件管理 Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Add persistent logging, task history, and file operation tracking to the web backend, with two new frontend pages for viewing logs and task history. **Architecture:** Single SQLite DB (`data/web_data.db`) with three tables. FastAPI middleware captures HTTP logs. TaskManager modified to persist task lifecycle to DB. File operations record metadata. Frontend adds two sidebar pages (Tasks, Logs) and enhances Dashboard. **Tech Stack:** FastAPI middleware, SQLite (via existing DBPool), Vue 3 + Element Plus, Pinia --- ## File Structure ### New files | File | Responsibility | |------|----------------| | `web/backend/services/db_schema.py` | SQLite table creation, auto-cleanup of 30-day-old records | | `web/backend/middleware/__init__.py` | Package init | | `web/backend/middleware/logging.py` | HTTP request logging middleware | | `web/backend/routers/logs.py` | Log query API endpoints | | `web/backend/routers/tasks.py` | Task history query + retry API endpoints | | `web/frontend/src/views/Tasks.vue` | Task history page | | `web/frontend/src/views/Logs.vue` | Log viewer page | ### Modified files | File | Changes | |------|---------| | `web/backend/main.py` | Import and mount middleware + new routers, call `init_db()` in lifespan | | `web/backend/services/task_manager.py` | Add `db_pool` reference, persist task lifecycle to `task_history` table | | `web/backend/routers/files.py` | Record upload/delete/clear operations to `file_metadata` table | | `web/frontend/src/router/index.ts` | Add `/tasks` and `/logs` routes | | `web/frontend/src/views/Layout.vue` | Add `Timer` and `Notebook` icons, add two nav items | | `web/frontend/src/views/Dashboard.vue` | Dynamic storage stats, file history button | --- ### Task 1: Database Schema — `db_schema.py` **Files:** - Create: `web/backend/services/db_schema.py` - [ ] **Step 1: Create `web/backend/services/db_schema.py`** ```python """SQLite schema initialization and cleanup for web_data.db""" import sqlite3 from datetime import datetime, timedelta from pathlib import Path _project_root = Path(__file__).resolve().parent.parent.parent.parent _db_path = str(_project_root / "data" / "web_data.db") CREATE_TABLES_SQL = """ CREATE TABLE IF NOT EXISTS http_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, method TEXT NOT NULL, path TEXT NOT NULL, status_code INTEGER, duration_ms REAL, user TEXT, ip TEXT, detail TEXT ); CREATE INDEX IF NOT EXISTS idx_http_logs_timestamp ON http_logs(timestamp); CREATE INDEX IF NOT EXISTS idx_http_logs_status ON http_logs(status_code); CREATE TABLE IF NOT EXISTS task_history ( id TEXT PRIMARY KEY, name TEXT NOT NULL, status TEXT NOT NULL, progress INTEGER DEFAULT 0, message TEXT, result_files TEXT, error TEXT, log_lines TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, completed_at TEXT ); CREATE INDEX IF NOT EXISTS idx_task_history_status ON task_history(status); CREATE INDEX IF NOT EXISTS idx_task_history_created ON task_history(created_at); CREATE TABLE IF NOT EXISTS file_metadata ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT NOT NULL, directory TEXT NOT NULL, size INTEGER, action TEXT NOT NULL, user TEXT, timestamp TEXT NOT NULL, task_id TEXT ); CREATE INDEX IF NOT EXISTS idx_file_metadata_timestamp ON file_metadata(timestamp); """ def init_db(): """Create tables if they don't exist. Called once on startup.""" conn = sqlite3.connect(_db_path) try: conn.executescript(CREATE_TABLES_SQL) conn.commit() finally: conn.close() def cleanup_old_records(): """Delete records older than 30 days from all tables.""" cutoff = (datetime.now() - timedelta(days=30)).isoformat() conn = sqlite3.connect(_db_path) try: conn.execute("DELETE FROM http_logs WHERE timestamp < ?", (cutoff,)) conn.execute("DELETE FROM task_history WHERE created_at < ?", (cutoff,)) conn.execute("DELETE FROM file_metadata WHERE timestamp < ?", (cutoff,)) conn.commit() finally: conn.close() def insert_http_log(method: str, path: str, status_code: int, duration_ms: float, user: str = None, ip: str = None, detail: str = None): """Insert an HTTP log record.""" conn = sqlite3.connect(_db_path) try: conn.execute( "INSERT INTO http_logs (timestamp, method, path, status_code, duration_ms, user, ip, detail) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (datetime.now().isoformat(), method, path, status_code, duration_ms, user, ip, detail), ) conn.commit() finally: conn.close() def insert_task(task_id: str, name: str, status: str, created_at: str): """Insert a new task record.""" conn = sqlite3.connect(_db_path) try: conn.execute( "INSERT INTO task_history (id, name, status, progress, message, result_files, error, log_lines, created_at, updated_at) VALUES (?, ?, ?, 0, '', '[]', '[]', '[]', ?, ?)", (task_id, name, status, created_at, created_at), ) conn.commit() finally: conn.close() def update_task(task_id: str, **fields): """Update specific fields of a task record.""" if not fields: return sets = ", ".join(f"{k}=?" for k in fields) vals = list(fields.values()) vals.append(task_id) conn = sqlite3.connect(_db_path) try: conn.execute(f"UPDATE task_history SET {sets} WHERE id=?", vals) conn.commit() finally: conn.close() def insert_file_metadata(filename: str, directory: str, size: int, action: str, user: str = None, task_id: str = None): """Insert a file operation record.""" conn = sqlite3.connect(_db_path) try: conn.execute( "INSERT INTO file_metadata (filename, directory, size, action, user, timestamp, task_id) VALUES (?, ?, ?, ?, ?, ?, ?)", (filename, directory, size, action, user, datetime.now().isoformat(), task_id), ) conn.commit() finally: conn.close() def query_http_logs(page: int = 1, page_size: int = 50, method: str = None, status_code: int = None, path: str = None, start_date: str = None, end_date: str = None): """Query HTTP logs with filters and pagination.""" conn = sqlite3.connect(_db_path) conn.row_factory = sqlite3.Row try: where = [] params = [] if method: where.append("method = ?") params.append(method) if status_code: where.append("status_code = ?") params.append(status_code) if path: where.append("path LIKE ?") params.append(f"%{path}%") if start_date: where.append("timestamp >= ?") params.append(start_date) if end_date: where.append("timestamp <= ?") params.append(end_date) where_clause = " WHERE " + " AND ".join(where) if where else "" count = conn.execute(f"SELECT COUNT(*) FROM http_logs{where_clause}", params).fetchone()[0] offset = (page - 1) * page_size rows = conn.execute( f"SELECT * FROM http_logs{where_clause} ORDER BY id DESC LIMIT ? OFFSET ?", params + [page_size, offset], ).fetchall() return {"items": [dict(r) for r in rows], "total": count} finally: conn.close() def query_http_log_stats(): """Get HTTP log statistics for today.""" conn = sqlite3.connect(_db_path) try: today = datetime.now().strftime("%Y-%m-%d") row = conn.execute( "SELECT COUNT(*) as total, SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) as errors, AVG(duration_ms) as avg_ms FROM http_logs WHERE timestamp >= ?", (today,), ).fetchone() total = row[0] or 0 errors = row[1] or 0 avg_ms = round(row[2] or 0, 1) error_rate = round((errors / total * 100) if total > 0 else 0, 1) return {"today_count": total, "error_count": errors, "avg_duration_ms": avg_ms, "error_rate": error_rate} finally: conn.close() def query_task_history(page: int = 1, page_size: int = 50, status: str = None, name: str = None, search: str = None): """Query task history with filters and pagination.""" conn = sqlite3.connect(_db_path) conn.row_factory = sqlite3.Row try: where = [] params = [] if status: where.append("status = ?") params.append(status) if name: where.append("name = ?") params.append(name) if search: where.append("(name LIKE ? OR id LIKE ?)") params.extend([f"%{search}%", f"%{search}%"]) where_clause = " WHERE " + " AND ".join(where) if where else "" count = conn.execute(f"SELECT COUNT(*) FROM task_history{where_clause}", params).fetchone()[0] offset = (page - 1) * page_size rows = conn.execute( f"SELECT * FROM task_history{where_clause} ORDER BY created_at DESC LIMIT ? OFFSET ?", params + [page_size, offset], ).fetchall() items = [] for r in rows: d = dict(r) import json d["result_files"] = json.loads(d.get("result_files") or "[]") d["log_lines"] = json.loads(d.get("log_lines") or "[]") items.append(d) return {"items": items, "total": count} finally: conn.close() def query_task_by_id(task_id: str): """Get a single task by ID.""" conn = sqlite3.connect(_db_path) conn.row_factory = sqlite3.Row try: row = conn.execute("SELECT * FROM task_history WHERE id=?", (task_id,)).fetchone() if not row: return None d = dict(row) import json d["result_files"] = json.loads(d.get("result_files") or "[]") d["log_lines"] = json.loads(d.get("log_lines") or "[]") return d finally: conn.close() def query_task_stats(): """Get task statistics.""" conn = sqlite3.connect(_db_path) try: rows = conn.execute("SELECT status, COUNT(*) FROM task_history GROUP BY status").fetchall() stats = {r[0]: r[1] for r in rows} return { "total": sum(stats.values()), "completed": stats.get("completed", 0), "failed": stats.get("failed", 0), "running": stats.get("running", 0), } finally: conn.close() def query_file_history(page: int = 1, page_size: int = 50, directory: str = None, action: str = None): """Query file operation history with filters and pagination.""" conn = sqlite3.connect(_db_path) conn.row_factory = sqlite3.Row try: where = [] params = [] if directory: where.append("directory = ?") params.append(directory) if action: where.append("action = ?") params.append(action) where_clause = " WHERE " + " AND ".join(where) if where else "" count = conn.execute(f"SELECT COUNT(*) FROM file_metadata{where_clause}", params).fetchone()[0] offset = (page - 1) * page_size rows = conn.execute( f"SELECT * FROM file_metadata{where_clause} ORDER BY id DESC LIMIT ? OFFSET ?", params + [page_size, offset], ).fetchall() return {"items": [dict(r) for r in rows], "total": count} finally: conn.close() def query_file_stats(): """Get file storage statistics per directory.""" dir_map = { "input": _project_root / "data" / "input", "output": _project_root / "data" / "output", "result": _project_root / "data" / "result", } stats = [] for name, dir_path in dir_map.items(): if dir_path.is_dir(): files = [f for f in dir_path.iterdir() if f.is_file()] total_size = sum(f.stat().st_size for f in files) stats.append({"name": name, "file_count": len(files), "total_size": total_size}) else: stats.append({"name": name, "file_count": 0, "total_size": 0}) return {"directories": stats} ``` - [ ] **Step 2: Verify the module imports cleanly** Run: `cd "f:\vebing coding\xiaoaitext" && python -c "from web.backend.services.db_schema import init_db; print('OK')"` Expected: `OK` - [ ] **Step 3: Commit** ```bash git add web/backend/services/db_schema.py git commit -m "feat: add db_schema for http_logs, task_history, file_metadata tables" ``` --- ### Task 2: HTTP Logging Middleware **Files:** - Create: `web/backend/middleware/__init__.py` - Create: `web/backend/middleware/logging.py` - [ ] **Step 1: Create `web/backend/middleware/__init__.py`** ```python ``` (Empty init file to make it a package.) - [ ] **Step 2: Create `web/backend/middleware/logging.py`** ```python """HTTP request logging middleware""" import time from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response class LoggingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): # Skip static assets and WebSocket path = request.url.path if path.startswith("/assets") or path.startswith("/ws") or path == "/favicon.ico": return await call_next(request) start = time.time() response = await call_next(request) duration_ms = (time.time() - start) * 1000 # Extract username from request state (set by auth middleware if available) user = None if hasattr(request.state, "user"): user = request.state.user.get("username") # Async write — don't block the response try: from ..services.db_schema import insert_http_log import asyncio asyncio.create_task(asyncio.get_event_loop().run_in_executor( None, lambda: insert_http_log( method=request.method, path=path, status_code=response.status_code, duration_ms=round(duration_ms, 2), user=user, ip=request.client.host if request.client else None, ), )) except Exception: pass # Never let logging failures break requests return response ``` - [ ] **Step 3: Verify imports** Run: `cd "f:\vebing coding\xiaoaitext" && python -c "from web.backend.middleware.logging import LoggingMiddleware; print('OK')"` Expected: `OK` - [ ] **Step 4: Commit** ```bash git add web/backend/middleware/__init__.py web/backend/middleware/logging.py git commit -m "feat: add HTTP request logging middleware" ``` --- ### Task 3: TaskManager DB Persistence **Files:** - Modify: `web/backend/services/task_manager.py` - [ ] **Step 1: Rewrite `web/backend/services/task_manager.py`** The full file with DB persistence added. Key changes: `__init__` accepts `db_pool`, `create_task` writes to DB, `update_progress`/`add_log`/`set_completed`/`set_failed` update DB. ```python """Background task tracking + WebSocket broadcast + DB persistence""" import uuid import json import asyncio from enum import Enum from datetime import datetime from typing import Dict, List, Optional, Set from dataclasses import dataclass, field class TaskStatus(str, Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" @dataclass class Task: id: str name: str status: TaskStatus = TaskStatus.PENDING progress: int = 0 message: str = "" result_files: List[str] = field(default_factory=list) error: Optional[str] = None log_lines: List[str] = field(default_factory=list) def to_dict(self) -> dict: return { "task_id": self.id, "name": self.name, "status": self.status.value, "progress": self.progress, "message": self.message, "result_files": self.result_files, "error": self.error, "log_lines": self.log_lines[-100:], } class TaskManager: def __init__(self, db_pool=None): self._tasks: Dict[str, Task] = {} self._connections: Dict[str, Set] = {} self._db = db_pool def set_db_pool(self, db_pool): """Set the DB pool reference (called during startup).""" self._db = db_pool def create_task(self, name: str) -> Task: task_id = str(uuid.uuid4())[:8] task = Task(id=task_id, name=name) self._tasks[task_id] = task self._connections[task_id] = set() # Persist to DB if self._db: from .db_schema import insert_task now = datetime.now().isoformat() asyncio.create_task(self._db.execute_write( insert_task, task_id, name, TaskStatus.PENDING.value, now )) return task def get_task(self, task_id: str) -> Optional[Task]: return self._tasks.get(task_id) def update_progress(self, task_id: str, progress: int, message: str = ""): task = self._tasks.get(task_id) if not task: return task.progress = progress task.message = message if task.status == TaskStatus.PENDING: task.status = TaskStatus.RUNNING # Persist to DB if self._db: from .db_schema import update_task asyncio.create_task(self._db.execute_write( update_task, task_id, status=task.status.value, progress=progress, message=message, updated_at=datetime.now().isoformat(), )) asyncio.create_task(self._broadcast(task_id)) def add_log(self, task_id: str, line: str): task = self._tasks.get(task_id) if not task: return task.log_lines.append(line) # Persist to DB if self._db: from .db_schema import update_task asyncio.create_task(self._db.execute_write( update_task, task_id, log_lines=json.dumps(task.log_lines[-200:]), updated_at=datetime.now().isoformat(), )) asyncio.create_task(self._broadcast(task_id)) def set_completed(self, task_id: str, result_files: List[str] = None, message: str = ""): task = self._tasks.get(task_id) if not task: return task.status = TaskStatus.COMPLETED task.progress = 100 task.message = message or "处理完成" if result_files: task.result_files = result_files # Persist to DB if self._db: from .db_schema import update_task now = datetime.now().isoformat() asyncio.create_task(self._db.execute_write( update_task, task_id, status=TaskStatus.COMPLETED.value, progress=100, message=task.message, result_files=json.dumps(result_files or []), completed_at=now, updated_at=now, )) asyncio.create_task(self._broadcast(task_id)) def set_failed(self, task_id: str, error: str): task = self._tasks.get(task_id) if not task: return task.status = TaskStatus.FAILED task.error = error task.message = f"处理失败: {error}" # Persist to DB if self._db: from .db_schema import update_task now = datetime.now().isoformat() asyncio.create_task(self._db.execute_write( update_task, task_id, status=TaskStatus.FAILED.value, error=error, message=task.message, completed_at=now, updated_at=now, )) asyncio.create_task(self._broadcast(task_id)) def subscribe(self, task_id: str, websocket): if task_id in self._connections: self._connections[task_id].add(websocket) def unsubscribe(self, task_id: str, websocket): if task_id in self._connections: self._connections[task_id].discard(websocket) async def _broadcast(self, task_id: str): task = self._tasks.get(task_id) if not task: return data = task.to_dict() dead = set() for ws in self._connections.get(task_id, set()): try: await ws.send_json(data) except Exception: dead.add(ws) for ws in dead: self._connections[task_id].discard(ws) ``` - [ ] **Step 2: Verify the module imports cleanly** Run: `cd "f:\vebing coding\xiaoaitext" && python -c "from web.backend.services.task_manager import TaskManager; print('OK')"` Expected: `OK` - [ ] **Step 3: Commit** ```bash git add web/backend/services/task_manager.py git commit -m "feat: persist task lifecycle to SQLite via TaskManager" ``` --- ### Task 4: File Metadata Tracking **Files:** - Modify: `web/backend/routers/files.py` - [ ] **Step 1: Add file metadata recording to `files.py`** Add `insert_file_metadata` import and call it in upload, delete, and clear endpoints. The full modified file: ```python """File upload, download, and listing endpoints.""" import os import shutil from pathlib import Path from typing import List, Optional from fastapi import APIRouter, HTTPException, UploadFile, File, Depends, Request from fastapi.responses import FileResponse from pydantic import BaseModel from ..auth.dependencies import get_current_user, get_current_user_flexible from ..config import MAX_UPLOAD_SIZE, ALLOWED_EXTENSIONS router = APIRouter(prefix="/api/files", tags=["files"]) # Resolve data directories relative to project root _project_root = Path(__file__).resolve().parent.parent.parent.parent _input_dir = _project_root / "data" / "input" _output_dir = _project_root / "data" / "output" _result_dir = _project_root / "data" / "result" class FileItem(BaseModel): name: str size: int modified: float directory: str class UploadResponse(BaseModel): filename: str size: int path: str def _ensure_dirs(): for d in [_input_dir, _output_dir, _result_dir]: d.mkdir(parents=True, exist_ok=True) def _record_file_action(filename: str, directory: str, size: int, action: str, user: str = None): """Record file operation to metadata DB (best-effort, non-blocking).""" try: from ..services.db_schema import insert_file_metadata insert_file_metadata(filename=filename, directory=directory, size=size, action=action, user=user) except Exception: pass @router.post("/upload", response_model=UploadResponse) async def upload_file( file: UploadFile = File(...), current_user: dict = Depends(get_current_user), ): _ensure_dirs() # Validate extension ext = Path(file.filename).suffix.lower() if ext not in ALLOWED_EXTENSIONS: raise HTTPException(400, f"不支持的文件类型: {ext}") # Validate size content = await file.read() if len(content) > MAX_UPLOAD_SIZE: raise HTTPException(400, f"文件过大,最大 {MAX_UPLOAD_SIZE // 1024 // 1024}MB") # Save with secure name from werkzeug.utils import secure_filename safe_name = secure_filename(file.filename) or file.filename dest = _input_dir / safe_name # Avoid overwrite: add suffix if exists counter = 0 stem = Path(safe_name).stem suffix = Path(safe_name).suffix while dest.exists(): counter += 1 dest = _input_dir / f"{stem}_{counter}{suffix}" dest.write_bytes(content) # Record metadata _record_file_action(dest.name, "input", len(content), "upload", current_user.get("username")) return UploadResponse( filename=dest.name, size=len(content), path=str(dest.relative_to(_project_root)), ) @router.get("/list") async def list_files( directory: str = "input", current_user: dict = Depends(get_current_user), ) -> List[FileItem]: dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir} target_dir = dir_map.get(directory) if not target_dir or not target_dir.is_dir(): return [] files = [] for f in sorted(target_dir.iterdir()): if f.is_file(): stat = f.stat() files.append(FileItem( name=f.name, size=stat.st_size, modified=stat.st_mtime, directory=directory, )) return files @router.get("/download/{directory}/{filename}") async def download_file( directory: str, filename: str, current_user: dict = Depends(get_current_user_flexible), ): dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir} target_dir = dir_map.get(directory) if not target_dir: raise HTTPException(404, "目录不存在") file_path = target_dir / filename if not file_path.is_file(): raise HTTPException(404, "文件不存在") return FileResponse( str(file_path), filename=filename, media_type="application/octet-stream", ) @router.delete("/{directory}/{filename}") async def delete_file( directory: str, filename: str, current_user: dict = Depends(get_current_user), ): dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir} target_dir = dir_map.get(directory) if not target_dir: raise HTTPException(404, "目录不存在") file_path = target_dir / filename if not file_path.is_file(): raise HTTPException(404, "文件不存在") size = file_path.stat().st_size file_path.unlink() # Record metadata _record_file_action(filename, directory, size, "delete", current_user.get("username")) return {"message": f"已删除 {filename}"} @router.post("/clear/{directory}") async def clear_directory( directory: str, current_user: dict = Depends(get_current_user), ): dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir} target_dir = dir_map.get(directory) if not target_dir: raise HTTPException(404, "目录不存在") count = 0 for f in target_dir.iterdir(): if f.is_file(): f.unlink() count += 1 # Record metadata _record_file_action("*", directory, 0, "clear", current_user.get("username")) return {"message": f"已清除 {count} 个文件", "count": count} @router.get("/history") async def file_history( page: int = 1, page_size: int = 50, directory: str = None, action: str = None, current_user: dict = Depends(get_current_user), ): from ..services.db_schema import query_file_history return query_file_history(page=page, page_size=page_size, directory=directory, action=action) @router.get("/stats") async def file_stats( current_user: dict = Depends(get_current_user), ): from ..services.db_schema import query_file_stats return query_file_stats() ``` - [ ] **Step 2: Commit** ```bash git add web/backend/routers/files.py git commit -m "feat: record file upload/delete/clear operations to metadata table" ``` --- ### Task 5: Logs API Router **Files:** - Create: `web/backend/routers/logs.py` - [ ] **Step 1: Create `web/backend/routers/logs.py`** ```python """HTTP log query endpoints.""" from fastapi import APIRouter, Depends, Query from pydantic import BaseModel from typing import Optional from ..auth.dependencies import get_current_user router = APIRouter(prefix="/api/logs", tags=["logs"]) class LogItem(BaseModel): id: int timestamp: str method: str path: str status_code: Optional[int] = None duration_ms: Optional[float] = None user: Optional[str] = None ip: Optional[str] = None detail: Optional[str] = None class LogListResponse(BaseModel): items: list total: int class LogStatsResponse(BaseModel): today_count: int error_count: int avg_duration_ms: float error_rate: float @router.get("", response_model=LogListResponse) async def list_logs( page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=200), method: Optional[str] = None, status_code: Optional[int] = None, path: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, current_user: dict = Depends(get_current_user), ): from ..services.db_schema import query_http_logs return query_http_logs( page=page, page_size=page_size, method=method, status_code=status_code, path=path, start_date=start_date, end_date=end_date, ) @router.get("/stats", response_model=LogStatsResponse) async def get_log_stats( current_user: dict = Depends(get_current_user), ): from ..services.db_schema import query_http_log_stats return query_http_log_stats() ``` - [ ] **Step 2: Commit** ```bash git add web/backend/routers/logs.py git commit -m "feat: add HTTP log query API endpoints" ``` --- ### Task 6: Tasks API Router **Files:** - Create: `web/backend/routers/tasks.py` - [ ] **Step 1: Create `web/backend/routers/tasks.py`** ```python """Task history query and retry endpoints.""" import asyncio from fastapi import APIRouter, HTTPException, Depends, Query, Request from pydantic import BaseModel from typing import Optional from ..auth.dependencies import get_current_user router = APIRouter(prefix="/api/tasks", tags=["tasks"]) class TaskListResponse(BaseModel): items: list total: int class TaskStatsResponse(BaseModel): total: int completed: int failed: int running: int @router.get("", response_model=TaskListResponse) async def list_tasks( page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=200), status: Optional[str] = None, name: Optional[str] = None, search: Optional[str] = None, current_user: dict = Depends(get_current_user), ): from ..services.db_schema import query_task_history return query_task_history( page=page, page_size=page_size, status=status, name=name, search=search, ) @router.get("/stats", response_model=TaskStatsResponse) async def get_task_stats( current_user: dict = Depends(get_current_user), ): from ..services.db_schema import query_task_stats return query_task_stats() @router.get("/{task_id}") async def get_task_detail( task_id: str, current_user: dict = Depends(get_current_user), ): from ..services.db_schema import query_task_by_id task = query_task_by_id(task_id) if not task: raise HTTPException(404, "任务不存在") return task @router.post("/{task_id}/retry") async def retry_task( task_id: str, request: Request, current_user: dict = Depends(get_current_user), ): from ..services.db_schema import query_task_by_id task = query_task_by_id(task_id) if not task: raise HTTPException(404, "任务不存在") if task["status"] != "failed": raise HTTPException(400, "只能重试失败的任务") # Map task name to processing endpoint name = task["name"] endpoint_map = { "批量OCR识别": "/api/processing/ocr-batch", "Excel标准化处理": "/api/processing/excel", "合并采购单": "/api/processing/merge", "一键全流程处理": "/api/processing/pipeline", } endpoint = endpoint_map.get(name) if not endpoint: raise HTTPException(400, f"未知的任务类型: {name}") # Call the processing endpoint internally from httpx import AsyncClient async with AsyncClient() as client: # Forward auth header auth_header = request.headers.get("authorization", "") resp = await client.post( f"http://127.0.0.1:8000{endpoint}", headers={"authorization": auth_header}, json={}, ) if resp.status_code >= 400: raise HTTPException(resp.status_code, resp.json().get("detail", "重试失败")) return resp.json() ``` - [ ] **Step 2: Commit** ```bash git add web/backend/routers/tasks.py git commit -m "feat: add task history query and retry API endpoints" ``` --- ### Task 7: Register Everything in `main.py` **Files:** - Modify: `web/backend/main.py` - [ ] **Step 1: Update `web/backend/main.py`** Changes: 1. Import `db_schema` functions and `LoggingMiddleware` 2. In lifespan: call `init_db()`, `cleanup_old_records()`, set `db_pool` on `task_manager` 3. Add `LoggingMiddleware` after CORS 4. Register `logs_router` and `tasks_router` Full file: ```python """FastAPI application entry point for the web-based OCR order processing system.""" import sys import os from contextlib import asynccontextmanager from pathlib import Path # Ensure app/ is importable _web_dir = Path(__file__).resolve().parent.parent # web/ _project_root = _web_dir.parent # project root if str(_project_root) not in sys.path: sys.path.insert(0, str(_project_root)) from fastapi import FastAPI from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from .config import get_or_generate_secret # noqa: trigger secret generation from .services.task_manager import TaskManager from .services.db_pool import DBPool from .auth.router import router as auth_router from .routers.files import router as files_router from .routers.processing import router as processing_router from .routers.memory import router as memory_router from .routers.config_api import router as config_router from .routers.barcodes import router as barcodes_router from .routers.sync import router as sync_router from .routers.websocket import router as ws_router from .routers.logs import router as logs_router from .routers.tasks import router as tasks_router from .middleware.logging import LoggingMiddleware # Shared singletons task_manager = TaskManager() db_pool = DBPool() @asynccontextmanager async def lifespan(app: FastAPI): """Initialize shared resources on startup.""" from app.config.settings import ConfigManager ConfigManager() # Initialize DB and cleanup old records from .services.db_schema import init_db, cleanup_old_records init_db() cleanup_old_records() # Wire up DB pool to task manager task_manager.set_db_pool(db_pool) app.state.task_manager = task_manager app.state.db_pool = db_pool yield app = FastAPI( title="益选 OCR 订单处理系统", version="1.0.0", lifespan=lifespan, ) # CORS from fastapi.middleware.cors import CORSMiddleware app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:5173", "http://127.0.0.1:5173", "http://localhost:8000"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # HTTP logging middleware (after CORS, before routes) app.add_middleware(LoggingMiddleware) # Make task_manager and db_pool accessible via request.state @app.middleware("http") async def inject_services(request, call_next): request.state.task_manager = task_manager request.state.db_pool = db_pool return await call_next(request) # Mount routers app.include_router(auth_router) app.include_router(files_router) app.include_router(processing_router) app.include_router(memory_router) app.include_router(config_router) app.include_router(barcodes_router) app.include_router(sync_router) app.include_router(ws_router) app.include_router(logs_router) app.include_router(tasks_router) # Serve Vue SPA static files _static_dir = Path(__file__).resolve().parent / "static" if _static_dir.is_dir(): app.mount("/assets", StaticFiles(directory=str(_static_dir / "assets")), name="assets") @app.get("/{full_path:path}") async def serve_spa(full_path: str): """Catch-all: serve index.html for Vue Router history mode.""" file_path = _static_dir / full_path if file_path.is_file(): return FileResponse(str(file_path)) return FileResponse(str(_static_dir / "index.html")) ``` - [ ] **Step 2: Test server startup** Run: `cd "f:\vebing coding\xiaoaitext" && python -c "from web.backend.main import app; print('OK')"` Expected: `OK` - [ ] **Step 3: Commit** ```bash git add web/backend/main.py git commit -m "feat: register logging middleware, logs/tasks routers, init DB on startup" ``` --- ### Task 8: Frontend — Tasks.vue (Task History Page) **Files:** - Create: `web/frontend/src/views/Tasks.vue` - [ ] **Step 1: Create `web/frontend/src/views/Tasks.vue`** ```vue {{ taskStats.total }} 总任务数 {{ taskStats.completed }} 成功 {{ taskStats.failed }} 失败 {{ taskStats.running }} 运行中 任务历史 刷新 {{ row.id }} {{ statusLabel(row.status) }} {{ formatTime(row.created_at) }} 详情 重试 共 {{ total }} 条记录 任务ID{{ detailTask.id }} 类型{{ detailTask.name }} 状态{{ statusLabel(detailTask.status) }} 进度{{ detailTask.progress }}% 结果文件 {{ f }} 执行日志 暂无日志 {{ line }} 关闭 ``` - [ ] **Step 2: Commit** ```bash git add web/frontend/src/views/Tasks.vue git commit -m "feat: add Tasks page with stats, filters, detail dialog, retry" ``` --- ### Task 9: Frontend — Logs.vue (Log Viewer Page) **Files:** - Create: `web/frontend/src/views/Logs.vue` - [ ] **Step 1: Create `web/frontend/src/views/Logs.vue`** ```vue {{ logStats.today_count }} 今日请求 {{ logStats.error_count }} 错误数 {{ logStats.avg_duration_ms }}ms 平均耗时 {{ logStats.error_rate }}% 错误率 请求日志 刷新 {{ formatTime(row.timestamp) }} {{ row.method }} {{ row.status_code }} {{ row.duration_ms?.toFixed(0) }}ms 共 {{ total }} 条记录 ``` - [ ] **Step 2: Commit** ```bash git add web/frontend/src/views/Logs.vue git commit -m "feat: add Logs page with stats, filters, color-coded table" ``` --- ### Task 10: Frontend — Router + Layout Registration **Files:** - Modify: `web/frontend/src/router/index.ts` - Modify: `web/frontend/src/views/Layout.vue` - [ ] **Step 1: Update `web/frontend/src/router/index.ts`** Add two child routes: ```typescript import { createRouter, createWebHistory } from 'vue-router' import { useAuthStore } from '../stores/auth' const router = createRouter({ history: createWebHistory(), routes: [ { path: '/login', name: 'Login', component: () => import('../views/Login.vue'), meta: { requiresAuth: false }, }, { path: '/', component: () => import('../views/Layout.vue'), meta: { requiresAuth: true }, children: [ { path: '', name: 'Dashboard', component: () => import('../views/Dashboard.vue'), }, { path: 'memory', name: 'Memory', component: () => import('../views/Memory.vue'), }, { path: 'barcodes', name: 'Barcodes', component: () => import('../views/Barcodes.vue'), }, { path: 'config', name: 'Config', component: () => import('../views/Config.vue'), }, { path: 'sync', name: 'Sync', component: () => import('../views/Sync.vue'), }, { path: 'tasks', name: 'Tasks', component: () => import('../views/Tasks.vue'), }, { path: 'logs', name: 'Logs', component: () => import('../views/Logs.vue'), }, ], }, ], }) router.beforeEach((to, from, next) => { const authStore = useAuthStore() if (to.meta.requiresAuth !== false && !authStore.isAuthenticated) { next('/login') } else { next() } }) export default router ``` - [ ] **Step 2: Update `web/frontend/src/views/Layout.vue` imports and navItems** In the `