feat: shadcn主题 + 文件关系追踪 + 处理流程修复

前端:
- 全站应用 shadcn/ui 主题 (zinc灰调, Inter字体, 1px细边框, 无硬阴影)
- 重写 global.css / Dashboard.vue / Login.vue / Layout.vue 样式
- 新增文件处理子页面: 采购单(Orders), 表格处理(Tables), 图片处理(Images)
- 侧边栏使用 el-sub-menu 组织文件处理导航

后端:
- 新增 file_relations 表追踪 input→output→result 链路
- 新增 /files/relations, /files/stats/detailed 等关系查询API
- 新增 ocr-single, excel-single, pipeline-single, merge-batch 端点
- 处理流程增加跳过逻辑 (已处理文件自动跳过)
- 全流程不再自动合并, 合并仅在采购单页面手动触发

Bug修复:
- TaskManager: asyncio.create_task 在线程池中无事件循环 → 改用 _schedule() 调度
- PurchaseOrderMerger 缺少 config 参数 → 传入 ConfigManager()
- FastAPI regex= 弃用 → 改为 pattern=
- merger.process() 接收 Path 对象 → 转为字符串
@
This commit is contained in:
2026-05-05 14:16:30 +08:00
parent dedc3b4183
commit 0721ed099c
13 changed files with 2341 additions and 602 deletions
+101 -4
View File
@@ -12,7 +12,11 @@ from pydantic import BaseModel
from ..auth.dependencies import get_current_user, get_current_user_flexible
from ..config import MAX_UPLOAD_SIZE, ALLOWED_EXTENSIONS
from ..services.db_schema import insert_file_metadata, query_file_history, query_file_stats
from ..services.db_schema import (
insert_file_metadata, query_file_history, query_file_stats,
query_file_relations, delete_file_relations, sync_file_relations,
query_file_relations_stats,
)
logger = logging.getLogger(__name__)
@@ -54,6 +58,7 @@ def _record_file_action(filename: str, directory: str, size: int, action: str, u
@router.post("/upload", response_model=UploadResponse)
async def upload_file(
file: UploadFile = File(...),
target: str = Query("input", pattern="^(input|output)$"),
current_user: dict = Depends(get_current_user),
):
_ensure_dirs()
@@ -68,10 +73,13 @@ async def upload_file(
if len(content) > MAX_UPLOAD_SIZE:
raise HTTPException(400, f"文件过大,最大 {MAX_UPLOAD_SIZE // 1024 // 1024}MB")
# Choose target directory
target_dir = _output_dir if target == "output" else _input_dir
# Save with secure name
from werkzeug.utils import secure_filename
safe_name = secure_filename(file.filename) or file.filename
dest = _input_dir / safe_name
dest = target_dir / safe_name
# Avoid overwrite: add suffix if exists
counter = 0
@@ -79,10 +87,10 @@ async def upload_file(
suffix = Path(safe_name).suffix
while dest.exists():
counter += 1
dest = _input_dir / f"{stem}_{counter}{suffix}"
dest = target_dir / f"{stem}_{counter}{suffix}"
dest.write_bytes(content)
_record_file_action(dest.name, "input", len(content), "upload", current_user.get("username"))
_record_file_action(dest.name, target, len(content), "upload", current_user.get("username"))
return UploadResponse(
filename=dest.name,
@@ -154,6 +162,10 @@ async def delete_file(
size = file_path.stat().st_size
file_path.unlink()
_record_file_action(filename, directory, size, "delete", current_user.get("username"))
# Cascade: clean up relation table
_cleanup_relation_for_deleted_file(directory, filename)
return {"message": f"已删除 {filename}"}
@@ -202,3 +214,88 @@ async def get_file_stats(
):
"""Return file storage statistics per directory."""
return {"directories": query_file_stats()}
# ---------------------------------------------------------------------------
# File relations
# ---------------------------------------------------------------------------
class RelationDeleteRequest(BaseModel):
ids: List[int]
def _cleanup_relation_for_deleted_file(directory: str, filename: str):
"""Clean up relation table when a file is deleted."""
import sqlite3
from ..services.db_schema import _db_path
try:
conn = sqlite3.connect(_db_path)
conn.row_factory = sqlite3.Row
try:
if directory == "input":
row = conn.execute("SELECT id FROM file_relations WHERE input_image = ?", (filename,)).fetchone()
if row:
conn.execute("UPDATE file_relations SET input_image = NULL, updated_at = datetime('now') WHERE id = ?", (row['id'],))
# Delete if no other fields
check = conn.execute("SELECT * FROM file_relations WHERE id = ?", (row['id'],)).fetchone()
if check and not check['output_excel'] and not check['result_purchase']:
conn.execute("DELETE FROM file_relations WHERE id = ?", (row['id'],))
elif directory == "output":
row = conn.execute("SELECT id FROM file_relations WHERE output_excel = ?", (filename,)).fetchone()
if row:
conn.execute("UPDATE file_relations SET output_excel = NULL, updated_at = datetime('now') WHERE id = ?", (row['id'],))
check = conn.execute("SELECT * FROM file_relations WHERE id = ?", (row['id'],)).fetchone()
if check and not check['input_image'] and not check['result_purchase']:
conn.execute("DELETE FROM file_relations WHERE id = ?", (row['id'],))
elif directory == "result":
row = conn.execute("SELECT id FROM file_relations WHERE result_purchase = ?", (filename,)).fetchone()
if row:
conn.execute("UPDATE file_relations SET result_purchase = NULL, updated_at = datetime('now') WHERE id = ?", (row['id'],))
check = conn.execute("SELECT * FROM file_relations WHERE id = ?", (row['id'],)).fetchone()
if check and not check['input_image'] and not check['output_excel']:
conn.execute("DELETE FROM file_relations WHERE id = ?", (row['id'],))
conn.commit()
finally:
conn.close()
except Exception:
logger.debug("Failed to cleanup relation for %s/%s", directory, filename, exc_info=True)
@router.get("/relations")
async def get_file_relations(
view: Optional[str] = Query(None, pattern="^(orders|tables|images)$"),
status: Optional[str] = None,
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
current_user: dict = Depends(get_current_user),
):
"""Query file relations with optional view filter."""
items, total = query_file_relations(view=view, status=status, page=page, page_size=page_size)
return {"items": items, "total": total}
@router.get("/stats/detailed")
async def get_detailed_stats(
current_user: dict = Depends(get_current_user),
):
"""Get detailed file statistics for Dashboard."""
return query_file_relations_stats()
@router.post("/relations/sync")
async def sync_relations(
current_user: dict = Depends(get_current_user),
):
"""Scan directories and rebuild file_relations table."""
sync_file_relations()
return {"message": "文件关系表已重建"}
@router.delete("/relations")
async def delete_relations(
body: RelationDeleteRequest,
current_user: dict = Depends(get_current_user),
):
"""Delete file relation records by IDs."""
delete_file_relations(body.ids)
return {"message": f"已删除 {len(body.ids)} 条关系记录"}
+381 -97
View File
@@ -1,5 +1,6 @@
"""Processing endpoints: OCR, Excel conversion, merge, and full pipeline."""
import asyncio
import os
import sys
import traceback
@@ -11,6 +12,7 @@ from pydantic import BaseModel
from ..auth.dependencies import get_current_user
from ..services.service_wrapper import ServiceWrapper
from ..services.db_schema import upsert_file_relation
router = APIRouter(prefix="/api/processing", tags=["processing"])
@@ -23,8 +25,16 @@ _result_dir = _project_root / "data" / "result"
class PipelineRequest(BaseModel):
files: Optional[List[str]] = None # specific files, or None = all in input/
supplier: Optional[str] = None # force supplier type
files: Optional[List[str]] = None
supplier: Optional[str] = None
class SingleFileRequest(BaseModel):
filename: str
class MergeBatchRequest(BaseModel):
filenames: List[str]
class TaskResponse(BaseModel):
@@ -48,6 +58,26 @@ def _list_input_files(filter_ext: Optional[List[str]] = None) -> List[Path]:
return files
def _list_files_in(directory: Path, filter_ext: List[str] = None) -> List[Path]:
if not directory.is_dir():
return []
files = []
for f in sorted(directory.iterdir()):
if f.is_file():
if filter_ext is None or f.suffix.lower() in filter_ext:
files.append(f)
return files
def _run_background(coro):
"""Schedule a coroutine as a background task."""
asyncio.ensure_future(coro)
# ---------------------------------------------------------------------------
# Batch endpoints
# ---------------------------------------------------------------------------
@router.post("/ocr-batch", response_model=TaskResponse)
async def ocr_batch(
request: Request,
@@ -62,27 +92,43 @@ async def ocr_batch(
if not files:
raise HTTPException(400, "input/ 目录中没有图片文件")
async def _run():
try:
async def _bg():
def do_work():
from app.services.ocr_service import OCRService
svc = OCRService()
total = len(files)
for i, f in enumerate(files):
# Skip check
out_stem = f.stem
# OCR output could be .xlsx or .xls
out_xlsx = _output_dir / f"{out_stem}.xlsx"
out_xls = _output_dir / f"{out_stem}.xls"
if out_xlsx.exists() or out_xls.exists():
out_name = out_xlsx.name if out_xlsx.exists() else out_xls.name
tm.add_log(task.id, f"[跳过] {f.name} 已OCR过 → {out_name}")
upsert_file_relation(input_image=f.name, output_excel=out_name, status='ocr_done')
continue
tm.update_progress(task.id, int((i / total) * 100), f"正在识别: {f.name}")
tm.add_log(task.id, f"[OCR] 处理 {f.name}")
try:
svc.process_single(str(f), str(_output_dir))
svc.process_image(str(f))
# Find the output file
for ext in ['.xlsx', '.xls']:
candidate = _output_dir / f"{out_stem}{ext}"
if candidate.exists():
upsert_file_relation(input_image=f.name, output_excel=candidate.name, status='ocr_done')
break
tm.add_log(task.id, f"[OCR] 完成: {f.name}")
except Exception as e:
tm.add_log(task.id, f"[OCR] 失败: {f.name} - {e}")
result_files = [f.name for f in _output_dir.iterdir() if f.is_file()]
tm.set_completed(task.id, result_files=result_files, message=f"OCR完成,共处理 {total} 个文件")
except Exception as e:
tm.set_failed(task.id, str(e))
import asyncio
asyncio.create_task(_run())
await _wrapper.run_sync(do_work)
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message="OCR任务已创建")
@@ -92,7 +138,7 @@ async def process_excel(
body: PipelineRequest = PipelineRequest(),
current_user: dict = Depends(get_current_user),
):
"""Convert OCR output Excel files to standardized format."""
"""Convert OCR output Excel files to standardized purchase orders."""
tm = _get_task_manager(request)
task = tm.create_task("Excel标准化处理")
@@ -100,61 +146,87 @@ async def process_excel(
if body.files:
files = [_output_dir / f for f in body.files if (_output_dir / f).is_file()]
else:
files = _list_input_files(filter_ext=list(excel_exts))
if not files:
files = _list_input_files_from(_output_dir, filter_ext=list(excel_exts))
files = _list_files_in(_output_dir, filter_ext=list(excel_exts))
if not files:
raise HTTPException(400, "没有找到Excel文件")
raise HTTPException(400, "output/ 目录中没有Excel文件")
async def _run():
try:
async def _bg():
def do_work():
from app.services.order_service import OrderService
svc = OrderService()
total = len(files)
for i, f in enumerate(files):
# Skip check
result_name = f"采购单_{f.stem}.xls"
result_path = _result_dir / result_name
if result_path.exists():
tm.add_log(task.id, f"[跳过] {f.name} 已处理过 → {result_name}")
upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done')
continue
tm.update_progress(task.id, int((i / total) * 100), f"正在处理: {f.name}")
tm.add_log(task.id, f"[Excel] 处理 {f.name}")
try:
svc.process_excel(str(f), str(_result_dir))
svc.process_excel(str(f))
# Find result file
if result_path.exists():
upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done')
tm.add_log(task.id, f"[Excel] 完成: {f.name}")
except Exception as e:
tm.add_log(task.id, f"[Excel] 失败: {f.name} - {e}")
result_files = [f.name for f in _result_dir.iterdir() if f.is_file()]
tm.set_completed(task.id, result_files=result_files, message=f"Excel处理完成,共 {total} 个文件")
except Exception as e:
tm.set_failed(task.id, str(e))
import asyncio
asyncio.create_task(_run())
await _wrapper.run_sync(do_work)
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message="Excel处理任务已创建")
@router.post("/merge", response_model=TaskResponse)
async def merge_orders(
request: Request,
body: MergeBatchRequest = MergeBatchRequest(filenames=[]),
current_user: dict = Depends(get_current_user),
):
"""Merge all processed Excel files into a single purchase order."""
"""Merge selected purchase order files into one PosPal template."""
tm = _get_task_manager(request)
task = tm.create_task("合并采购单")
async def _run():
try:
from app.services.order_service import OrderService
svc = OrderService()
# If specific files provided, use them; otherwise merge all
if body.filenames:
file_paths = [_result_dir / f for f in body.filenames if (_result_dir / f).is_file()]
else:
file_paths = list(_result_dir.glob("采购单_*.xls"))
if not file_paths:
raise HTTPException(400, "没有找到可合并的采购单文件")
async def _bg():
def do_work():
from app.core.excel.merger import PurchaseOrderMerger
tm.update_progress(task.id, 20, "正在合并采购单...")
tm.add_log(task.id, "[合并] 开始合并")
result = svc.merge_orders(str(_result_dir))
tm.add_log(task.id, f"[合并] 完成: {result}")
tm.set_completed(task.id, result_files=[result] if result else [], message="合并完成")
except Exception as e:
tm.set_failed(task.id, str(e))
tm.add_log(task.id, f"[合并] 合并 {len(file_paths)} 个文件")
try:
from app.config.settings import ConfigManager
merger = PurchaseOrderMerger(ConfigManager())
result = merger.process([str(f) for f in file_paths])
if result:
merged_name = Path(result).name
upsert_file_relation(result_purchase=merged_name, status='merged')
tm.add_log(task.id, f"[合并] 完成: {merged_name}")
tm.set_completed(task.id, result_files=[merged_name], message="合并完成")
else:
tm.set_failed(task.id, "合并返回空结果")
except Exception as e:
tm.add_log(task.id, f"[合并] 失败: {e}")
tm.set_failed(task.id, str(e))
import asyncio
asyncio.create_task(_run())
await _wrapper.run_sync(do_work)
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message="合并任务已创建")
@@ -164,68 +236,291 @@ async def full_pipeline(
body: PipelineRequest = PipelineRequest(),
current_user: dict = Depends(get_current_user),
):
"""Run the full pipeline: OCR Excel → Merge."""
"""Run the full pipeline: OCR -> Excel -> Result (NO merge)."""
tm = _get_task_manager(request)
task = tm.create_task("一键全流程处理")
async def _run():
try:
# Step 1: OCR
tm.update_progress(task.id, 0, "步骤 1/3: OCR识别")
tm.add_log(task.id, "[Pipeline] 开始OCR识别")
from app.services.ocr_service import OCRService
ocr_svc = OCRService()
image_exts = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'}
images = _list_input_files(filter_ext=list(image_exts))
for i, f in enumerate(images):
pct = int((i / max(len(images), 1)) * 30)
tm.update_progress(task.id, pct, f"OCR: {f.name}")
try:
ocr_svc.process_single(str(f), str(_output_dir))
tm.add_log(task.id, f"[OCR] 完成: {f.name}")
except Exception as e:
tm.add_log(task.id, f"[OCR] 失败: {f.name} - {e}")
# Step 2: Excel conversion
tm.update_progress(task.id, 35, "步骤 2/3: Excel标准化")
tm.add_log(task.id, "[Pipeline] 开始Excel处理")
from app.services.order_service import OrderService
order_svc = OrderService()
excel_files = list(_output_dir.glob("*.xls")) + list(_output_dir.glob("*.xlsx"))
for i, f in enumerate(excel_files):
pct = 35 + int((i / max(len(excel_files), 1)) * 35)
tm.update_progress(task.id, pct, f"Excel: {f.name}")
try:
order_svc.process_excel(str(f), str(_result_dir))
tm.add_log(task.id, f"[Excel] 完成: {f.name}")
except Exception as e:
tm.add_log(task.id, f"[Excel] 失败: {f.name} - {e}")
# Step 3: Merge
tm.update_progress(task.id, 75, "步骤 3/3: 合并采购单")
tm.add_log(task.id, "[Pipeline] 开始合并")
async def _bg():
def do_work():
try:
result = order_svc.merge_orders(str(_result_dir))
tm.add_log(task.id, f"[合并] 完成: {result}")
# Step 1: OCR
tm.update_progress(task.id, 0, "步骤 1/2: OCR识别")
tm.add_log(task.id, "[Pipeline] 开始OCR识别")
from app.services.ocr_service import OCRService
ocr_svc = OCRService()
image_exts = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'}
images = _list_input_files(filter_ext=list(image_exts))
for i, f in enumerate(images):
pct = int((i / max(len(images), 1)) * 40)
# Skip check
out_stem = f.stem
out_xlsx = _output_dir / f"{out_stem}.xlsx"
out_xls = _output_dir / f"{out_stem}.xls"
if out_xlsx.exists() or out_xls.exists():
out_name = out_xlsx.name if out_xlsx.exists() else out_xls.name
tm.add_log(task.id, f"[跳过] {f.name} 已OCR过 → {out_name}")
upsert_file_relation(input_image=f.name, output_excel=out_name, status='ocr_done')
tm.update_progress(task.id, pct, f"跳过: {f.name}")
continue
tm.update_progress(task.id, pct, f"OCR: {f.name}")
try:
ocr_svc.process_image(str(f))
for ext in ['.xlsx', '.xls']:
candidate = _output_dir / f"{out_stem}{ext}"
if candidate.exists():
upsert_file_relation(input_image=f.name, output_excel=candidate.name, status='ocr_done')
break
tm.add_log(task.id, f"[OCR] 完成: {f.name}")
except Exception as e:
tm.add_log(task.id, f"[OCR] 失败: {f.name} - {e}")
# Step 2: Excel conversion
tm.update_progress(task.id, 45, "步骤 2/2: Excel标准化")
tm.add_log(task.id, "[Pipeline] 开始Excel处理")
from app.services.order_service import OrderService
order_svc = OrderService()
excel_files = list(_output_dir.glob("*.xls")) + list(_output_dir.glob("*.xlsx"))
for i, f in enumerate(excel_files):
pct = 45 + int((i / max(len(excel_files), 1)) * 55)
# Skip check
result_name = f"采购单_{f.stem}.xls"
result_path = _result_dir / result_name
if result_path.exists():
tm.add_log(task.id, f"[跳过] {f.name} 已处理过 → {result_name}")
upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done')
tm.update_progress(task.id, pct, f"跳过: {f.name}")
continue
tm.update_progress(task.id, pct, f"Excel: {f.name}")
try:
order_svc.process_excel(str(f))
if result_path.exists():
upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done')
tm.add_log(task.id, f"[Excel] 完成: {f.name}")
except Exception as e:
tm.add_log(task.id, f"[Excel] 失败: {f.name} - {e}")
result_files = [f.name for f in _result_dir.iterdir() if f.is_file()]
tm.set_completed(task.id, result_files=result_files, message="全流程处理完成(不含合并)")
except Exception as e:
tm.add_log(task.id, f"[合并] 失败: {e}")
result = None
tb = traceback.format_exc()
tm.add_log(task.id, f"[错误] {tb}")
tm.set_failed(task.id, str(e))
result_files = [f.name for f in _result_dir.iterdir() if f.is_file()]
tm.set_completed(task.id, result_files=result_files, message="全流程处理完成")
except Exception as e:
tb = traceback.format_exc()
tm.add_log(task.id, f"[错误] {tb}")
tm.set_failed(task.id, str(e))
import asyncio
asyncio.create_task(_run())
await _wrapper.run_sync(do_work)
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message="全流程任务已创建")
# ---------------------------------------------------------------------------
# Single-file endpoints
# ---------------------------------------------------------------------------
@router.post("/ocr-single", response_model=TaskResponse)
async def ocr_single(
request: Request,
body: SingleFileRequest,
current_user: dict = Depends(get_current_user),
):
"""OCR a single image file."""
tm = _get_task_manager(request)
task = tm.create_task(f"OCR: {body.filename}")
file_path = _input_dir / body.filename
if not file_path.is_file():
raise HTTPException(404, f"文件不存在: {body.filename}")
async def _bg():
def do_work():
from app.services.ocr_service import OCRService
svc = OCRService()
tm.update_progress(task.id, 10, f"正在识别: {body.filename}")
tm.add_log(task.id, f"[OCR] 处理 {body.filename}")
try:
svc.process_image(str(file_path))
# Find output
stem = file_path.stem
for ext in ['.xlsx', '.xls']:
candidate = _output_dir / f"{stem}{ext}"
if candidate.exists():
upsert_file_relation(input_image=body.filename, output_excel=candidate.name, status='ocr_done')
break
tm.add_log(task.id, f"[OCR] 完成: {body.filename}")
result_files = [f.name for f in _output_dir.iterdir() if f.is_file()]
tm.set_completed(task.id, result_files=result_files, message=f"OCR完成: {body.filename}")
except Exception as e:
tm.add_log(task.id, f"[OCR] 失败: {e}")
tm.set_failed(task.id, str(e))
await _wrapper.run_sync(do_work)
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message=f"OCR任务已创建: {body.filename}")
@router.post("/excel-single", response_model=TaskResponse)
async def excel_single(
request: Request,
body: SingleFileRequest,
current_user: dict = Depends(get_current_user),
):
"""Process a single Excel file to purchase order."""
tm = _get_task_manager(request)
task = tm.create_task(f"Excel处理: {body.filename}")
file_path = _output_dir / body.filename
if not file_path.is_file():
raise HTTPException(404, f"文件不存在: {body.filename}")
async def _bg():
def do_work():
from app.services.order_service import OrderService
svc = OrderService()
tm.update_progress(task.id, 10, f"正在处理: {body.filename}")
tm.add_log(task.id, f"[Excel] 处理 {body.filename}")
try:
svc.process_excel(str(file_path))
result_name = f"采购单_{file_path.stem}.xls"
if (_result_dir / result_name).exists():
upsert_file_relation(output_excel=body.filename, result_purchase=result_name, status='done')
tm.add_log(task.id, f"[Excel] 完成: {body.filename}")
result_files = [f.name for f in _result_dir.iterdir() if f.is_file()]
tm.set_completed(task.id, result_files=result_files, message=f"Excel处理完成: {body.filename}")
except Exception as e:
tm.add_log(task.id, f"[Excel] 失败: {e}")
tm.set_failed(task.id, str(e))
await _wrapper.run_sync(do_work)
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message=f"Excel处理任务已创建: {body.filename}")
@router.post("/pipeline-single", response_model=TaskResponse)
async def pipeline_single(
request: Request,
body: SingleFileRequest,
current_user: dict = Depends(get_current_user),
):
"""Full pipeline for a single image: OCR -> Excel -> Result (no merge)."""
tm = _get_task_manager(request)
task = tm.create_task(f"全流程: {body.filename}")
file_path = _input_dir / body.filename
if not file_path.is_file():
raise HTTPException(404, f"文件不存在: {body.filename}")
async def _bg():
def do_work():
try:
stem = file_path.stem
# Step 1: OCR
tm.update_progress(task.id, 10, "步骤 1/2: OCR识别")
tm.add_log(task.id, f"[Pipeline] OCR: {body.filename}")
from app.services.ocr_service import OCRService
ocr_svc = OCRService()
out_xlsx = _output_dir / f"{stem}.xlsx"
out_xls = _output_dir / f"{stem}.xls"
if out_xlsx.exists() or out_xls.exists():
out_name = out_xlsx.name if out_xlsx.exists() else out_xls.name
tm.add_log(task.id, f"[跳过] 已OCR过 → {out_name}")
upsert_file_relation(input_image=body.filename, output_excel=out_name, status='ocr_done')
else:
ocr_svc.process_image(str(file_path))
for ext in ['.xlsx', '.xls']:
candidate = _output_dir / f"{stem}{ext}"
if candidate.exists():
upsert_file_relation(input_image=body.filename, output_excel=candidate.name, status='ocr_done')
break
tm.add_log(task.id, f"[OCR] 完成")
# Step 2: Excel
tm.update_progress(task.id, 50, "步骤 2/2: Excel处理")
tm.add_log(task.id, f"[Pipeline] Excel处理")
from app.services.order_service import OrderService
order_svc = OrderService()
result_name = f"采购单_{stem}.xls"
result_path = _result_dir / result_name
if result_path.exists():
tm.add_log(task.id, f"[跳过] 已处理过 → {result_name}")
upsert_file_relation(output_excel=f"{stem}.xlsx", result_purchase=result_name, status='done')
else:
# Find the output excel
excel_file = out_xlsx if out_xlsx.exists() else (out_xls if out_xls.exists() else None)
if excel_file:
order_svc.process_excel(str(excel_file))
if result_path.exists():
upsert_file_relation(output_excel=excel_file.name, result_purchase=result_name, status='done')
tm.add_log(task.id, f"[Excel] 完成")
else:
tm.add_log(task.id, f"[错误] OCR未生成Excel文件")
result_files = [f.name for f in _result_dir.iterdir() if f.is_file()]
tm.set_completed(task.id, result_files=result_files, message=f"全流程完成: {body.filename}")
except Exception as e:
tb = traceback.format_exc()
tm.add_log(task.id, f"[错误] {tb}")
tm.set_failed(task.id, str(e))
await _wrapper.run_sync(do_work)
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message=f"全流程任务已创建: {body.filename}")
@router.post("/merge-batch", response_model=TaskResponse)
async def merge_batch(
request: Request,
body: MergeBatchRequest,
current_user: dict = Depends(get_current_user),
):
"""Merge selected purchase order files into one PosPal template."""
tm = _get_task_manager(request)
task = tm.create_task("批量合并采购单")
file_paths = [_result_dir / f for f in body.filenames if (_result_dir / f).is_file()]
if not file_paths:
raise HTTPException(400, "没有找到可合并的采购单文件")
async def _bg():
def do_work():
from app.core.excel.merger import PurchaseOrderMerger
tm.update_progress(task.id, 20, f"正在合并 {len(file_paths)} 个采购单...")
tm.add_log(task.id, f"[合并] 合并文件: {', '.join(f.name for f in file_paths)}")
try:
from app.config.settings import ConfigManager
merger = PurchaseOrderMerger(ConfigManager())
result = merger.process([str(f) for f in file_paths])
if result:
merged_name = Path(result).name
upsert_file_relation(result_purchase=merged_name, status='merged')
tm.add_log(task.id, f"[合并] 完成: {merged_name}")
tm.set_completed(task.id, result_files=[merged_name], message="批量合并完成")
else:
tm.set_failed(task.id, "合并返回空结果")
except Exception as e:
tm.add_log(task.id, f"[合并] 失败: {e}")
tm.set_failed(task.id, str(e))
await _wrapper.run_sync(do_work)
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message="批量合并任务已创建")
# ---------------------------------------------------------------------------
# Status endpoint
# ---------------------------------------------------------------------------
@router.get("/status/{task_id}")
async def get_task_status(
task_id: str,
@@ -237,14 +532,3 @@ async def get_task_status(
if not task:
raise HTTPException(404, "任务不存在")
return task.to_dict()
def _list_input_files_from(directory: Path, filter_ext: List[str] = None) -> List[Path]:
if not directory.is_dir():
return []
files = []
for f in sorted(directory.iterdir()):
if f.is_file():
if filter_ext is None or f.suffix.lower() in filter_ext:
files.append(f)
return files