"""File upload, download, and listing endpoints.""" import logging import os import shutil from pathlib import Path from typing import List, Optional from fastapi import APIRouter, HTTPException, UploadFile, File, Depends, Query, Request from fastapi.responses import FileResponse, JSONResponse from pydantic import BaseModel from ..auth.dependencies import get_current_user, get_current_user_flexible from ..config import MAX_UPLOAD_SIZE, ALLOWED_EXTENSIONS from ..services.db_schema import ( insert_file_metadata, query_file_history, query_file_stats, query_file_relations, delete_file_relations, sync_file_relations, query_file_relations_stats, reset_file_cache, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/files", tags=["files"]) # Resolve data directories relative to project root _project_root = Path(__file__).resolve().parent.parent.parent.parent _input_dir = _project_root / "data" / "input" _output_dir = _project_root / "data" / "output" _result_dir = _project_root / "data" / "result" class FileItem(BaseModel): name: str size: int modified: float directory: str class UploadResponse(BaseModel): filename: str size: int path: str def _ensure_dirs(): for d in [_input_dir, _output_dir, _result_dir]: d.mkdir(parents=True, exist_ok=True) def _record_file_action(filename: str, directory: str, size: int, action: str, user: str = None): """Record a file operation to the metadata table. Best-effort, non-blocking.""" try: insert_file_metadata(filename=filename, directory=directory, size=size, action=action, user=user) except Exception: logger.debug("Failed to record file metadata for %s/%s action=%s", directory, filename, action, exc_info=True) @router.post("/upload", response_model=UploadResponse) async def upload_file( file: UploadFile = File(...), target: str = Query("input", pattern="^(input|output)$"), current_user: dict = Depends(get_current_user), ): _ensure_dirs() # Validate extension ext = Path(file.filename).suffix.lower() if ext not in ALLOWED_EXTENSIONS: raise HTTPException(400, f"不支持的文件类型: {ext}") # Validate size content = await file.read() if len(content) > MAX_UPLOAD_SIZE: raise HTTPException(400, f"文件过大,最大 {MAX_UPLOAD_SIZE // 1024 // 1024}MB") # Choose target directory target_dir = _output_dir if target == "output" else _input_dir # Save with secure name from werkzeug.utils import secure_filename safe_name = secure_filename(file.filename) or file.filename dest = target_dir / safe_name # Avoid overwrite: add suffix if exists counter = 0 stem = Path(safe_name).stem suffix = Path(safe_name).suffix while dest.exists(): counter += 1 dest = target_dir / f"{stem}_{counter}{suffix}" dest.write_bytes(content) _record_file_action(dest.name, target, len(content), "upload", current_user.get("username")) return UploadResponse( filename=dest.name, size=len(content), path=str(dest.relative_to(_project_root)), ) @router.get("/list") async def list_files( directory: str = "input", current_user: dict = Depends(get_current_user), ) -> List[FileItem]: dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir} target_dir = dir_map.get(directory) if not target_dir or not target_dir.is_dir(): return [] files = [] for f in sorted(target_dir.iterdir()): if f.is_file(): stat = f.stat() files.append(FileItem( name=f.name, size=stat.st_size, modified=stat.st_mtime, directory=directory, )) return files @router.get("/download/{directory}/{filename}") async def download_file( directory: str, filename: str, current_user: dict = Depends(get_current_user_flexible), ): dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir} target_dir = dir_map.get(directory) if not target_dir: raise HTTPException(404, "目录不存在") file_path = target_dir / filename if not file_path.is_file(): raise HTTPException(404, "文件不存在") return FileResponse( str(file_path), filename=filename, media_type="application/octet-stream", ) @router.delete("/{directory}/{filename}") async def delete_file( directory: str, filename: str, current_user: dict = Depends(get_current_user), ): dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir} target_dir = dir_map.get(directory) if not target_dir: raise HTTPException(404, "目录不存在") file_path = target_dir / filename if not file_path.is_file(): raise HTTPException(404, "文件不存在") size = file_path.stat().st_size file_path.unlink() _record_file_action(filename, directory, size, "delete", current_user.get("username")) # Cascade: clean up relation table _cleanup_relation_for_deleted_file(directory, filename) return {"message": f"已删除 {filename}"} class BatchDeleteRequest(BaseModel): files: list[dict] @router.post("/batch-delete") async def batch_delete_files( req: BatchDeleteRequest, current_user: dict = Depends(get_current_user), ): """Batch delete files from disk and clean up relation records.""" dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir} deleted = 0 errors = [] for item in req.files: d = item.get("directory", "") fname = item.get("filename", "") if d not in dir_map or not fname: errors.append(f"无效参数: {d}/{fname}") continue file_path = dir_map[d] / fname try: if file_path.exists(): size = file_path.stat().st_size file_path.unlink() deleted += 1 _record_file_action(fname, d, size, "delete", current_user.get("username")) _cleanup_relation_for_deleted_file(d, fname) except Exception as e: errors.append(f"{fname}: {str(e)}") return {"deleted": deleted, "errors": errors} @router.post("/clear/{directory}") async def clear_directory( directory: str, current_user: dict = Depends(get_current_user), ): dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir} target_dir = dir_map.get(directory) if not target_dir: raise HTTPException(404, "目录不存在") count = 0 for f in target_dir.iterdir(): if f.is_file(): f.unlink() count += 1 _record_file_action("*", directory, 0, "clear", current_user.get("username")) return {"message": f"已清除 {count} 个文件", "count": count} @router.get("/history") async def get_file_history( page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=200), directory: Optional[str] = None, action: Optional[str] = None, current_user: dict = Depends(get_current_user), ): """Query file operation history with pagination and optional filters.""" offset = (page - 1) * page_size rows = query_file_history( directory=directory, action=action, limit=page_size, offset=offset, ) return {"page": page, "page_size": page_size, "items": rows} @router.get("/stats") async def get_file_stats( current_user: dict = Depends(get_current_user), ): """Return file storage statistics per directory.""" return {"directories": query_file_stats()} # --------------------------------------------------------------------------- # File relations # --------------------------------------------------------------------------- class RelationDeleteRequest(BaseModel): ids: List[int] def _cleanup_relation_for_deleted_file(directory: str, filename: str): """Clean up relation table when a file is deleted.""" import sqlite3 from ..services.db_schema import _db_path try: conn = sqlite3.connect(_db_path) conn.row_factory = sqlite3.Row try: if directory == "input": row = conn.execute("SELECT id FROM file_relations WHERE input_image = ?", (filename,)).fetchone() if row: conn.execute("UPDATE file_relations SET input_image = NULL, updated_at = datetime('now') WHERE id = ?", (row['id'],)) # Delete if no other fields check = conn.execute("SELECT * FROM file_relations WHERE id = ?", (row['id'],)).fetchone() if check and not check['output_excel'] and not check['result_purchase']: conn.execute("DELETE FROM file_relations WHERE id = ?", (row['id'],)) elif directory == "output": row = conn.execute("SELECT id FROM file_relations WHERE output_excel = ?", (filename,)).fetchone() if row: conn.execute("UPDATE file_relations SET output_excel = NULL, updated_at = datetime('now') WHERE id = ?", (row['id'],)) check = conn.execute("SELECT * FROM file_relations WHERE id = ?", (row['id'],)).fetchone() if check and not check['input_image'] and not check['result_purchase']: conn.execute("DELETE FROM file_relations WHERE id = ?", (row['id'],)) elif directory == "result": row = conn.execute("SELECT id FROM file_relations WHERE result_purchase = ?", (filename,)).fetchone() if row: conn.execute("UPDATE file_relations SET result_purchase = NULL, updated_at = datetime('now') WHERE id = ?", (row['id'],)) check = conn.execute("SELECT * FROM file_relations WHERE id = ?", (row['id'],)).fetchone() if check and not check['input_image'] and not check['output_excel']: conn.execute("DELETE FROM file_relations WHERE id = ?", (row['id'],)) conn.commit() finally: conn.close() except Exception: logger.debug("Failed to cleanup relation for %s/%s", directory, filename, exc_info=True) @router.get("/relations") async def get_file_relations( view: Optional[str] = Query(None, pattern="^(orders|tables|images)$"), status: Optional[str] = None, page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=200), sort_by: Optional[str] = None, sort_order: str = "desc", sync: bool = Query(True, description="Auto-sync file relations before querying"), current_user: dict = Depends(get_current_user), ): """Query file relations with optional view filter.""" if sync: sync_file_relations() items, total = query_file_relations(view=view, status=status, page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order) return {"items": items, "total": total} @router.get("/stats/detailed") async def get_detailed_stats( current_user: dict = Depends(get_current_user), ): """Get detailed file statistics for Dashboard.""" return query_file_relations_stats() @router.post("/relations/sync") async def sync_relations( current_user: dict = Depends(get_current_user), ): """Scan directories and rebuild file_relations table.""" sync_file_relations() return {"message": "文件关系表已重建"} class ResetCacheRequest(BaseModel): files: list[dict] # [{input_image, output_excel, result_purchase}, ...] @router.post("/reset-cache") async def reset_cache( req: ResetCacheRequest, current_user: dict = Depends(get_current_user), ): """Delete output/result files and reset status to pending for reprocessing. Each item in files should have: {input_image?, output_excel?, result_purchase?} The corresponding files on disk are deleted, and the relation status is reset. """ result = reset_file_cache(req.files) return result @router.delete("/relations") async def delete_relations( body: RelationDeleteRequest, current_user: dict = Depends(get_current_user), ): """Delete file relation records by IDs.""" delete_file_relations(body.ids) return {"message": f"已删除 {len(body.ids)} 条关系记录"} # --------------------------------------------------------------------------- # File preview # --------------------------------------------------------------------------- @router.get("/preview/{directory}/{filename:path}") async def preview_file( directory: str, filename: str, current_user: dict = Depends(get_current_user), ): """Preview file content: images served directly, Excel returned as JSON grid.""" # Security: only allow specific directories if directory not in ("input", "output", "result"): raise HTTPException(403, "不允许访问该目录") dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir} file_path = dir_map[directory] / filename if not file_path.is_file(): raise HTTPException(404, f"文件不存在: {filename}") ext = file_path.suffix.lower() # Images: serve directly if ext in ('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp'): return FileResponse(str(file_path)) # Excel: read and return as JSON grid if ext in ('.xls', '.xlsx'): try: import pandas as pd from fastapi.responses import JSONResponse df = pd.read_excel(str(file_path), header=None) # Fill NaN with empty string df = df.fillna('') rows = [] for _, row in df.iterrows(): rows.append([str(v) if v != '' else '' for v in row]) # Limit to first 200 rows return JSONResponse({"type": "excel", "rows": rows[:200], "total_rows": len(rows)}) except Exception as e: raise HTTPException(500, f"读取文件失败: {e}") raise HTTPException(400, f"不支持预览的文件类型: {ext}")