Files
orc-order-v2/web/backend/routers/files.py
T
2026-05-12 21:45:28 +08:00

381 lines
13 KiB
Python

"""File upload, download, and listing endpoints."""
import logging
import os
import shutil
from pathlib import Path
from typing import List, Optional
from fastapi import APIRouter, HTTPException, UploadFile, File, Depends, Query, Request
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel
from ..auth.dependencies import get_current_user, get_current_user_flexible
from ..config import MAX_UPLOAD_SIZE, ALLOWED_EXTENSIONS
from ..services.db_schema import (
insert_file_metadata, query_file_history, query_file_stats,
query_file_relations, delete_file_relations, sync_file_relations,
query_file_relations_stats,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/files", tags=["files"])
# Resolve data directories relative to project root
_project_root = Path(__file__).resolve().parent.parent.parent.parent
_input_dir = _project_root / "data" / "input"
_output_dir = _project_root / "data" / "output"
_result_dir = _project_root / "data" / "result"
class FileItem(BaseModel):
name: str
size: int
modified: float
directory: str
class UploadResponse(BaseModel):
filename: str
size: int
path: str
def _ensure_dirs():
for d in [_input_dir, _output_dir, _result_dir]:
d.mkdir(parents=True, exist_ok=True)
def _record_file_action(filename: str, directory: str, size: int, action: str, user: str = None):
"""Record a file operation to the metadata table. Best-effort, non-blocking."""
try:
insert_file_metadata(filename=filename, directory=directory, size=size, action=action, user=user)
except Exception:
logger.debug("Failed to record file metadata for %s/%s action=%s", directory, filename, action, exc_info=True)
@router.post("/upload", response_model=UploadResponse)
async def upload_file(
file: UploadFile = File(...),
target: str = Query("input", pattern="^(input|output)$"),
current_user: dict = Depends(get_current_user),
):
_ensure_dirs()
# Validate extension
ext = Path(file.filename).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(400, f"不支持的文件类型: {ext}")
# Validate size
content = await file.read()
if len(content) > MAX_UPLOAD_SIZE:
raise HTTPException(400, f"文件过大,最大 {MAX_UPLOAD_SIZE // 1024 // 1024}MB")
# Choose target directory
target_dir = _output_dir if target == "output" else _input_dir
# Save with secure name
from werkzeug.utils import secure_filename
safe_name = secure_filename(file.filename) or file.filename
dest = target_dir / safe_name
# Avoid overwrite: add suffix if exists
counter = 0
stem = Path(safe_name).stem
suffix = Path(safe_name).suffix
while dest.exists():
counter += 1
dest = target_dir / f"{stem}_{counter}{suffix}"
dest.write_bytes(content)
_record_file_action(dest.name, target, len(content), "upload", current_user.get("username"))
return UploadResponse(
filename=dest.name,
size=len(content),
path=str(dest.relative_to(_project_root)),
)
@router.get("/list")
async def list_files(
directory: str = "input",
current_user: dict = Depends(get_current_user),
) -> List[FileItem]:
dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir}
target_dir = dir_map.get(directory)
if not target_dir or not target_dir.is_dir():
return []
files = []
for f in sorted(target_dir.iterdir()):
if f.is_file():
stat = f.stat()
files.append(FileItem(
name=f.name,
size=stat.st_size,
modified=stat.st_mtime,
directory=directory,
))
return files
@router.get("/download/{directory}/{filename}")
async def download_file(
directory: str,
filename: str,
current_user: dict = Depends(get_current_user_flexible),
):
dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir}
target_dir = dir_map.get(directory)
if not target_dir:
raise HTTPException(404, "目录不存在")
file_path = target_dir / filename
if not file_path.is_file():
raise HTTPException(404, "文件不存在")
return FileResponse(
str(file_path),
filename=filename,
media_type="application/octet-stream",
)
@router.delete("/{directory}/{filename}")
async def delete_file(
directory: str,
filename: str,
current_user: dict = Depends(get_current_user),
):
dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir}
target_dir = dir_map.get(directory)
if not target_dir:
raise HTTPException(404, "目录不存在")
file_path = target_dir / filename
if not file_path.is_file():
raise HTTPException(404, "文件不存在")
size = file_path.stat().st_size
file_path.unlink()
_record_file_action(filename, directory, size, "delete", current_user.get("username"))
# Cascade: clean up relation table
_cleanup_relation_for_deleted_file(directory, filename)
return {"message": f"已删除 {filename}"}
@router.post("/batch-delete")
async def batch_delete_files(
req: BatchDeleteRequest,
current_user: dict = Depends(get_current_user),
):
"""Batch delete files from disk and clean up relation records."""
dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir}
deleted = 0
errors = []
for item in req.files:
d = item.get("directory", "")
fname = item.get("filename", "")
if d not in dir_map or not fname:
errors.append(f"无效参数: {d}/{fname}")
continue
file_path = dir_map[d] / fname
try:
if file_path.exists():
size = file_path.stat().st_size
file_path.unlink()
deleted += 1
_record_file_action(fname, d, size, "delete", current_user.get("username"))
_cleanup_relation_for_deleted_file(d, fname)
except Exception as e:
errors.append(f"{fname}: {str(e)}")
return {"deleted": deleted, "errors": errors}
@router.post("/clear/{directory}")
async def clear_directory(
directory: str,
current_user: dict = Depends(get_current_user),
):
dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir}
target_dir = dir_map.get(directory)
if not target_dir:
raise HTTPException(404, "目录不存在")
count = 0
for f in target_dir.iterdir():
if f.is_file():
f.unlink()
count += 1
_record_file_action("*", directory, 0, "clear", current_user.get("username"))
return {"message": f"已清除 {count} 个文件", "count": count}
@router.get("/history")
async def get_file_history(
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
directory: Optional[str] = None,
action: Optional[str] = None,
current_user: dict = Depends(get_current_user),
):
"""Query file operation history with pagination and optional filters."""
offset = (page - 1) * page_size
rows = query_file_history(
directory=directory,
action=action,
limit=page_size,
offset=offset,
)
return {"page": page, "page_size": page_size, "items": rows}
@router.get("/stats")
async def get_file_stats(
current_user: dict = Depends(get_current_user),
):
"""Return file storage statistics per directory."""
return {"directories": query_file_stats()}
# ---------------------------------------------------------------------------
# File relations
# ---------------------------------------------------------------------------
class RelationDeleteRequest(BaseModel):
ids: List[int]
class BatchDeleteRequest(BaseModel):
files: list[dict]
def _cleanup_relation_for_deleted_file(directory: str, filename: str):
"""Clean up relation table when a file is deleted."""
import sqlite3
from ..services.db_schema import _db_path
try:
conn = sqlite3.connect(_db_path)
conn.row_factory = sqlite3.Row
try:
if directory == "input":
row = conn.execute("SELECT id FROM file_relations WHERE input_image = ?", (filename,)).fetchone()
if row:
conn.execute("UPDATE file_relations SET input_image = NULL, updated_at = datetime('now') WHERE id = ?", (row['id'],))
# Delete if no other fields
check = conn.execute("SELECT * FROM file_relations WHERE id = ?", (row['id'],)).fetchone()
if check and not check['output_excel'] and not check['result_purchase']:
conn.execute("DELETE FROM file_relations WHERE id = ?", (row['id'],))
elif directory == "output":
row = conn.execute("SELECT id FROM file_relations WHERE output_excel = ?", (filename,)).fetchone()
if row:
conn.execute("UPDATE file_relations SET output_excel = NULL, updated_at = datetime('now') WHERE id = ?", (row['id'],))
check = conn.execute("SELECT * FROM file_relations WHERE id = ?", (row['id'],)).fetchone()
if check and not check['input_image'] and not check['result_purchase']:
conn.execute("DELETE FROM file_relations WHERE id = ?", (row['id'],))
elif directory == "result":
row = conn.execute("SELECT id FROM file_relations WHERE result_purchase = ?", (filename,)).fetchone()
if row:
conn.execute("UPDATE file_relations SET result_purchase = NULL, updated_at = datetime('now') WHERE id = ?", (row['id'],))
check = conn.execute("SELECT * FROM file_relations WHERE id = ?", (row['id'],)).fetchone()
if check and not check['input_image'] and not check['output_excel']:
conn.execute("DELETE FROM file_relations WHERE id = ?", (row['id'],))
conn.commit()
finally:
conn.close()
except Exception:
logger.debug("Failed to cleanup relation for %s/%s", directory, filename, exc_info=True)
@router.get("/relations")
async def get_file_relations(
view: Optional[str] = Query(None, pattern="^(orders|tables|images)$"),
status: Optional[str] = None,
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
sort_by: Optional[str] = None,
sort_order: str = "desc",
current_user: dict = Depends(get_current_user),
):
"""Query file relations with optional view filter."""
items, total = query_file_relations(view=view, status=status, page=page, page_size=page_size,
sort_by=sort_by, sort_order=sort_order)
return {"items": items, "total": total}
@router.get("/stats/detailed")
async def get_detailed_stats(
current_user: dict = Depends(get_current_user),
):
"""Get detailed file statistics for Dashboard."""
return query_file_relations_stats()
@router.post("/relations/sync")
async def sync_relations(
current_user: dict = Depends(get_current_user),
):
"""Scan directories and rebuild file_relations table."""
sync_file_relations()
return {"message": "文件关系表已重建"}
@router.delete("/relations")
async def delete_relations(
body: RelationDeleteRequest,
current_user: dict = Depends(get_current_user),
):
"""Delete file relation records by IDs."""
delete_file_relations(body.ids)
return {"message": f"已删除 {len(body.ids)} 条关系记录"}
# ---------------------------------------------------------------------------
# File preview
# ---------------------------------------------------------------------------
@router.get("/preview/{directory}/{filename:path}")
async def preview_file(
directory: str,
filename: str,
current_user: dict = Depends(get_current_user),
):
"""Preview file content: images served directly, Excel returned as JSON grid."""
# Security: only allow specific directories
if directory not in ("input", "output", "result"):
raise HTTPException(403, "不允许访问该目录")
dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir}
file_path = dir_map[directory] / filename
if not file_path.is_file():
raise HTTPException(404, f"文件不存在: {filename}")
ext = file_path.suffix.lower()
# Images: serve directly
if ext in ('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp'):
return FileResponse(str(file_path))
# Excel: read and return as JSON grid
if ext in ('.xls', '.xlsx'):
try:
import pandas as pd
from fastapi.responses import JSONResponse
df = pd.read_excel(str(file_path), header=None)
# Fill NaN with empty string
df = df.fillna('')
rows = []
for _, row in df.iterrows():
rows.append([str(v) if v != '' else '' for v in row])
# Limit to first 200 rows
return JSONResponse({"type": "excel", "rows": rows[:200], "total_rows": len(rows)})
except Exception as e:
raise HTTPException(500, f"读取文件失败: {e}")
raise HTTPException(400, f"不支持预览的文件类型: {ext}")