Files
houhuan 7baf784a39 feat: processing flow enhancement + responsive UI
Phase 2 - Processing flow:
- Multi-task monitoring: store supports concurrent task tracking
- Task retry: POST /api/tasks/{id}/retry re-runs failed tasks
- Dashboard multi-task cards with progress, error details, retry/dismiss
- Log panel expanded from 10 to 50 lines with "view all" link

Phase 3 - UI/UX:
- Mobile sidebar drawer (< 768px) with hamburger menu
- Layout responsive styles (768px, 480px breakpoints)
- Tasks/Logs pages responsive (stat cards, filters, columns)
- File views responsive (header wrap, button sizing)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-13 19:18:18 +08:00

716 lines
29 KiB
Python

"""Processing endpoints: OCR, Excel conversion, merge, and full pipeline."""
import asyncio
import logging
import os
import sys
import threading
import traceback
from pathlib import Path
from typing import Optional, List
from fastapi import APIRouter, HTTPException, Depends, Request
from pydantic import BaseModel
from ..auth.dependencies import get_current_user
from ..services.service_wrapper import ServiceWrapper
from ..services.db_schema import upsert_file_relation
router = APIRouter(prefix="/api/processing", tags=["processing"])
_wrapper = ServiceWrapper(max_workers=3)
# ── Thread-safe log capture ──
_tlocal = threading.local()
class TaskLogHandler(logging.Handler):
"""Capture all log records during task execution and forward to tm.add_log()"""
def emit(self, record: logging.LogRecord):
ctx = getattr(_tlocal, 'ctx', None)
if ctx:
tm = ctx.get('tm')
task_id = ctx.get('task_id')
if tm and task_id:
msg = self.format(record)
if any(skip in msg for skip in ['DEBUG:', 'urllib3', 'charset_normalizer']):
return
tm.add_log(task_id, msg)
_log_handler = TaskLogHandler()
_log_handler.setLevel(logging.DEBUG)
_log_handler.setFormatter(logging.Formatter('%(message)s'))
_root_logger = logging.getLogger()
_configured = False
def _setup_log_capture():
global _configured
if not _configured:
_root_logger.addHandler(_log_handler)
_configured = True
def _start_log_capture(tm, task_id: str):
_setup_log_capture()
_root_logger.setLevel(logging.DEBUG)
_tlocal.ctx = {'tm': tm, 'task_id': task_id}
def _stop_log_capture():
_tlocal.ctx = None
def _add_result_file(name: str):
files = getattr(_tlocal, 'result_files', None)
if files is not None:
files.append(name)
def _wrap_with_capture(tm, task_id, func):
"""Wrap a do_work function with log capture setup/teardown."""
def wrapped():
_start_log_capture(tm, task_id)
_tlocal.result_files = []
try:
return func()
finally:
_stop_log_capture()
return wrapped
_project_root = Path(__file__).resolve().parent.parent.parent.parent
_input_dir = _project_root / "data" / "input"
_output_dir = _project_root / "data" / "output"
_result_dir = _project_root / "data" / "result"
class PipelineRequest(BaseModel):
files: Optional[List[str]] = None
supplier: Optional[str] = None
class SingleFileRequest(BaseModel):
filename: str
class MergeBatchRequest(BaseModel):
filenames: List[str]
class TaskResponse(BaseModel):
task_id: str
status: str
message: str
def _get_task_manager(request: Request):
return request.state.task_manager
def _list_input_files(filter_ext: Optional[List[str]] = None) -> List[Path]:
if not _input_dir.is_dir():
return []
files = []
for f in sorted(_input_dir.iterdir()):
if f.is_file():
if filter_ext is None or f.suffix.lower() in filter_ext:
files.append(f)
return files
def _list_files_in(directory: Path, filter_ext: List[str] = None) -> List[Path]:
if not directory.is_dir():
return []
files = []
for f in sorted(directory.iterdir()):
if f.is_file():
if filter_ext is None or f.suffix.lower() in filter_ext:
files.append(f)
return files
def _run_background(coro):
"""Schedule a coroutine as a background task."""
asyncio.ensure_future(coro)
def _run_background_with_log(coro, tm, task_id: str):
"""Schedule a coroutine with log capture during execution."""
async def _wrapped():
_start_log_capture(tm, task_id)
try:
await coro
finally:
_stop_log_capture()
asyncio.ensure_future(_wrapped())
def _get_product_db():
from app.core.db.product_db import ProductDatabase
return ProductDatabase(
str(_project_root / 'data' / 'product_cache.db'),
str(_project_root / 'templates' / '商品资料.xlsx')
)
def _learn_products_from_excel(excel_path: Path, tm, task_id, source: str = 'ocr'):
"""从处理后的Excel文件学习商品数据到记忆库"""
try:
from app.core.utils.file_utils import smart_read_excel
df = smart_read_excel(str(excel_path))
if df is None or df.empty:
return
except Exception:
return
from app.core.handlers.column_mapper import ColumnMapper
barcode_col = ColumnMapper.find_column(list(df.columns), 'barcode')
if not barcode_col:
return
name_col = ColumnMapper.find_column(list(df.columns), 'name')
spec_col = ColumnMapper.find_column(list(df.columns), 'specification')
unit_col = ColumnMapper.find_column(list(df.columns), 'unit')
price_col = ColumnMapper.find_column(list(df.columns), 'unit_price') or ColumnMapper.find_column(list(df.columns), 'price')
db = _get_product_db()
barcodes = [str(r.get(barcode_col, '')).strip() for _, r in df.iterrows() if str(r.get(barcode_col, '')).strip()]
memory = db.load_batch(barcodes)
learned = 0
for _, row in df.iterrows():
barcode = str(row.get(barcode_col, '')).strip()
if not barcode or barcode == 'nan':
continue
price = 0.0
if price_col:
try:
p = row.get(price_col)
if p is not None and str(p).strip() not in ('', 'nan', 'None'):
price = float(p)
except (ValueError, TypeError):
pass
product = {
'barcode': barcode,
'name': str(row.get(name_col, '')).strip() if name_col else '',
'specification': str(row.get(spec_col, '')).strip() if spec_col else '',
'unit': str(row.get(unit_col, '')).strip() if unit_col else '',
'price': price,
}
# 1. 记忆辅助补全
filled, fill_log = db.fill_from_memory(barcode, product, memory)
if fill_log:
tm.add_log(task_id, f" {fill_log}")
# 2. 价格预警
warn = db.price_warning(barcode, price, memory)
if warn:
tm.add_log(task_id, f" {warn}")
# 3. 学习
log = db.learn_from_product(filled, source=source, memory=memory, add_log=None)
if log:
tm.add_log(task_id, f" {log}")
learned += 1
if learned:
tm.add_log(task_id, f"[记忆库] 从 {excel_path.name} 学习了 {learned} 条商品数据")
# ---------------------------------------------------------------------------
# Batch endpoints
# ---------------------------------------------------------------------------
@router.post("/ocr-batch", response_model=TaskResponse)
async def ocr_batch(
request: Request,
current_user: dict = Depends(get_current_user),
):
"""Run OCR on all images in input/."""
tm = _get_task_manager(request)
task = tm.create_task("批量OCR识别")
task.metadata = {"endpoint": "/api/processing/ocr-batch", "body": {}}
image_exts = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'}
files = _list_input_files(filter_ext=list(image_exts))
if not files:
raise HTTPException(400, "input/ 目录中没有图片文件")
async def _bg():
def do_work():
from app.services.ocr_service import OCRService
svc = OCRService()
total = len(files)
for i, f in enumerate(files):
# Skip check
out_stem = f.stem
# OCR output could be .xlsx or .xls
out_xlsx = _output_dir / f"{out_stem}.xlsx"
out_xls = _output_dir / f"{out_stem}.xls"
if out_xlsx.exists() or out_xls.exists():
out_name = out_xlsx.name if out_xlsx.exists() else out_xls.name
tm.add_log(task.id, f"[跳过] {f.name} 已OCR过 → {out_name}")
upsert_file_relation(input_image=f.name, output_excel=out_name, status='ocr_done')
continue
tm.update_progress(task.id, int((i / total) * 100), f"正在识别: {f.name}")
tm.add_log(task.id, f"[OCR] 处理 {f.name}")
try:
svc.process_image(str(f))
# Find the output file
for ext in ['.xlsx', '.xls']:
candidate = _output_dir / f"{out_stem}{ext}"
if candidate.exists():
upsert_file_relation(input_image=f.name, output_excel=candidate.name, status='ocr_done')
_add_result_file(candidate.name)
break
tm.add_log(task.id, f"[OCR] 完成: {f.name}")
# Learn products into memory from OCR output
out_file = _output_dir / f"{out_stem}.xlsx"
if not out_file.exists():
out_file = _output_dir / f"{out_stem}.xls"
if out_file.exists():
_learn_products_from_excel(out_file, tm, task.id, source='ocr')
except Exception as e:
tm.add_log(task.id, f"[OCR] 失败: {f.name} - {e}")
result_files = list(getattr(_tlocal, 'result_files', []))
tm.set_completed(task.id, result_files=result_files, message=f"OCR完成,共处理 {total} 个文件")
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message="OCR任务已创建")
@router.post("/excel", response_model=TaskResponse)
async def process_excel(
request: Request,
body: PipelineRequest = PipelineRequest(),
current_user: dict = Depends(get_current_user),
):
"""Convert OCR output Excel files to standardized purchase orders."""
tm = _get_task_manager(request)
task = tm.create_task("Excel标准化处理")
task.metadata = {"endpoint": "/api/processing/excel", "body": body.dict()}
excel_exts = {'.xls', '.xlsx'}
if body.files:
files = [_output_dir / f for f in body.files if (_output_dir / f).is_file()]
else:
files = _list_files_in(_output_dir, filter_ext=list(excel_exts))
if not files:
raise HTTPException(400, "output/ 目录中没有Excel文件")
async def _bg():
def do_work():
from app.services.order_service import OrderService
svc = OrderService()
total = len(files)
for i, f in enumerate(files):
# Skip check
result_name = f"采购单_{f.stem}.xls"
result_path = _result_dir / result_name
if result_path.exists():
tm.add_log(task.id, f"[跳过] {f.name} 已处理过 → {result_name}")
upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done')
_add_result_file(result_name)
continue
tm.update_progress(task.id, int((i / total) * 100), f"正在处理: {f.name}")
tm.add_log(task.id, f"[Excel] 处理 {f.name}")
try:
svc.process_excel(str(f))
# Find result file
if result_path.exists():
upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done')
_add_result_file(result_name)
tm.add_log(task.id, f"[Excel] 完成: {f.name}")
# Learn products into memory from purchase order result
if result_path.exists():
_learn_products_from_excel(result_path, tm, task.id, source='ocr')
except Exception as e:
tm.add_log(task.id, f"[Excel] 失败: {f.name} - {e}")
result_files = list(getattr(_tlocal, 'result_files', []))
tm.set_completed(task.id, result_files=result_files, message=f"Excel处理完成,共 {total} 个文件")
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message="Excel处理任务已创建")
@router.post("/merge", response_model=TaskResponse)
async def merge_orders(
request: Request,
body: MergeBatchRequest = MergeBatchRequest(filenames=[]),
current_user: dict = Depends(get_current_user),
):
"""Merge selected purchase order files into one PosPal template."""
tm = _get_task_manager(request)
task = tm.create_task("合并采购单")
task.metadata = {"endpoint": "/api/processing/merge", "body": body.dict()}
# If specific files provided, use them; otherwise merge all
if body.filenames:
file_paths = [_result_dir / f for f in body.filenames if (_result_dir / f).is_file()]
else:
file_paths = list(_result_dir.glob("采购单_*.xls"))
if not file_paths:
raise HTTPException(400, "没有找到可合并的采购单文件")
async def _bg():
def do_work():
from app.core.excel.merger import PurchaseOrderMerger
tm.update_progress(task.id, 20, "正在合并采购单...")
tm.add_log(task.id, f"[合并] 合并 {len(file_paths)} 个文件")
try:
from app.config.settings import ConfigManager
merger = PurchaseOrderMerger(ConfigManager())
result = merger.process([str(f) for f in file_paths])
if result:
merged_name = Path(result).name
upsert_file_relation(result_purchase=merged_name, status='merged')
tm.add_log(task.id, f"[合并] 完成: {merged_name}")
tm.set_completed(task.id, result_files=[merged_name], message="合并完成")
else:
tm.set_failed(task.id, "合并返回空结果")
except Exception as e:
tm.add_log(task.id, f"[合并] 失败: {e}")
tm.set_failed(task.id, str(e))
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message="合并任务已创建")
@router.post("/pipeline", response_model=TaskResponse)
async def full_pipeline(
request: Request,
body: PipelineRequest = PipelineRequest(),
current_user: dict = Depends(get_current_user),
):
"""Run the full pipeline: OCR -> Excel -> Result (NO merge)."""
tm = _get_task_manager(request)
task = tm.create_task("一键全流程处理")
task.metadata = {"endpoint": "/api/processing/pipeline", "body": body.dict()}
async def _bg():
def do_work():
try:
# Step 1: OCR
tm.update_progress(task.id, 0, "步骤 1/2: OCR识别")
tm.add_log(task.id, "[Pipeline] 开始OCR识别")
from app.services.ocr_service import OCRService
ocr_svc = OCRService()
image_exts = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'}
images = _list_input_files(filter_ext=list(image_exts))
for i, f in enumerate(images):
pct = int((i / max(len(images), 1)) * 40)
# Skip check
out_stem = f.stem
out_xlsx = _output_dir / f"{out_stem}.xlsx"
out_xls = _output_dir / f"{out_stem}.xls"
if out_xlsx.exists() or out_xls.exists():
out_name = out_xlsx.name if out_xlsx.exists() else out_xls.name
tm.add_log(task.id, f"[跳过] {f.name} 已OCR过 → {out_name}")
upsert_file_relation(input_image=f.name, output_excel=out_name, status='ocr_done')
tm.update_progress(task.id, pct, f"跳过: {f.name}")
continue
tm.update_progress(task.id, pct, f"OCR: {f.name}")
try:
ocr_svc.process_image(str(f))
for ext in ['.xlsx', '.xls']:
candidate = _output_dir / f"{out_stem}{ext}"
if candidate.exists():
upsert_file_relation(input_image=f.name, output_excel=candidate.name, status='ocr_done')
_add_result_file(candidate.name)
break
tm.add_log(task.id, f"[OCR] 完成: {f.name}")
out_file = _output_dir / f"{out_stem}.xlsx"
if not out_file.exists():
out_file = _output_dir / f"{out_stem}.xls"
if out_file.exists():
_learn_products_from_excel(out_file, tm, task.id, source='ocr')
except Exception as e:
tm.add_log(task.id, f"[OCR] 失败: {f.name} - {e}")
# Step 2: Excel conversion
tm.update_progress(task.id, 45, "步骤 2/2: Excel标准化")
tm.add_log(task.id, "[Pipeline] 开始Excel处理")
from app.services.order_service import OrderService
order_svc = OrderService()
excel_files = list(_output_dir.glob("*.xls")) + list(_output_dir.glob("*.xlsx"))
for i, f in enumerate(excel_files):
pct = 45 + int((i / max(len(excel_files), 1)) * 55)
# Skip check
result_name = f"采购单_{f.stem}.xls"
result_path = _result_dir / result_name
if result_path.exists():
tm.add_log(task.id, f"[跳过] {f.name} 已处理过 → {result_name}")
upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done')
_add_result_file(result_name)
tm.update_progress(task.id, pct, f"跳过: {f.name}")
continue
tm.update_progress(task.id, pct, f"Excel: {f.name}")
try:
order_svc.process_excel(str(f))
if result_path.exists():
upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done')
_add_result_file(result_name)
tm.add_log(task.id, f"[Excel] 完成: {f.name}")
if result_path.exists():
_learn_products_from_excel(result_path, tm, task.id, source='ocr')
except Exception as e:
tm.add_log(task.id, f"[Excel] 失败: {f.name} - {e}")
result_files = list(getattr(_tlocal, 'result_files', []))
tm.set_completed(task.id, result_files=result_files, message="全流程处理完成(不含合并)")
except Exception as e:
tb = traceback.format_exc()
tm.add_log(task.id, f"[错误] {tb}")
tm.set_failed(task.id, str(e))
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message="全流程任务已创建")
# ---------------------------------------------------------------------------
# Single-file endpoints
# ---------------------------------------------------------------------------
@router.post("/ocr-single", response_model=TaskResponse)
async def ocr_single(
request: Request,
body: SingleFileRequest,
current_user: dict = Depends(get_current_user),
):
"""OCR a single image file."""
tm = _get_task_manager(request)
task = tm.create_task(f"OCR: {body.filename}")
task.metadata = {"endpoint": "/api/processing/ocr-single", "body": body.dict()}
file_path = _input_dir / body.filename
if not file_path.is_file():
raise HTTPException(404, f"文件不存在: {body.filename}")
async def _bg():
def do_work():
from app.services.ocr_service import OCRService
svc = OCRService()
tm.update_progress(task.id, 10, f"正在识别: {body.filename}")
tm.add_log(task.id, f"[OCR] 处理 {body.filename}")
try:
svc.process_image(str(file_path))
# Find output
stem = file_path.stem
for ext in ['.xlsx', '.xls']:
candidate = _output_dir / f"{stem}{ext}"
if candidate.exists():
upsert_file_relation(input_image=body.filename, output_excel=candidate.name, status='ocr_done')
_add_result_file(candidate.name)
break
tm.add_log(task.id, f"[OCR] 完成: {body.filename}")
result_files = list(getattr(_tlocal, 'result_files', []))
tm.set_completed(task.id, result_files=result_files, message=f"OCR完成: {body.filename}")
except Exception as e:
tm.add_log(task.id, f"[OCR] 失败: {e}")
tm.set_failed(task.id, str(e))
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message=f"OCR任务已创建: {body.filename}")
@router.post("/excel-single", response_model=TaskResponse)
async def excel_single(
request: Request,
body: SingleFileRequest,
current_user: dict = Depends(get_current_user),
):
"""Process a single Excel file to purchase order."""
tm = _get_task_manager(request)
task = tm.create_task(f"Excel处理: {body.filename}")
task.metadata = {"endpoint": "/api/processing/excel-single", "body": body.dict()}
file_path = _output_dir / body.filename
if not file_path.is_file():
raise HTTPException(404, f"文件不存在: {body.filename}")
async def _bg():
def do_work():
from app.services.order_service import OrderService
svc = OrderService()
tm.update_progress(task.id, 10, f"正在处理: {body.filename}")
tm.add_log(task.id, f"[Excel] 处理 {body.filename}")
try:
svc.process_excel(str(file_path))
result_name = f"采购单_{file_path.stem}.xls"
if (_result_dir / result_name).exists():
upsert_file_relation(output_excel=body.filename, result_purchase=result_name, status='done')
tm.add_log(task.id, f"[Excel] 完成: {body.filename}")
result_files = list(getattr(_tlocal, 'result_files', []))
tm.set_completed(task.id, result_files=result_files, message=f"Excel处理完成: {body.filename}")
except Exception as e:
tm.add_log(task.id, f"[Excel] 失败: {e}")
tm.set_failed(task.id, str(e))
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message=f"Excel处理任务已创建: {body.filename}")
@router.post("/pipeline-single", response_model=TaskResponse)
async def pipeline_single(
request: Request,
body: SingleFileRequest,
current_user: dict = Depends(get_current_user),
):
"""Full pipeline for a single image: OCR -> Excel -> Result (no merge)."""
tm = _get_task_manager(request)
task = tm.create_task(f"全流程: {body.filename}")
task.metadata = {"endpoint": "/api/processing/pipeline-single", "body": body.dict()}
file_path = _input_dir / body.filename
if not file_path.is_file():
raise HTTPException(404, f"文件不存在: {body.filename}")
async def _bg():
def do_work():
try:
stem = file_path.stem
# Step 1: OCR
tm.update_progress(task.id, 10, "步骤 1/2: OCR识别")
tm.add_log(task.id, f"[Pipeline] OCR: {body.filename}")
from app.services.ocr_service import OCRService
ocr_svc = OCRService()
out_xlsx = _output_dir / f"{stem}.xlsx"
out_xls = _output_dir / f"{stem}.xls"
if out_xlsx.exists() or out_xls.exists():
out_name = out_xlsx.name if out_xlsx.exists() else out_xls.name
tm.add_log(task.id, f"[跳过] 已OCR过 → {out_name}")
upsert_file_relation(input_image=body.filename, output_excel=out_name, status='ocr_done')
_add_result_file(out_name)
else:
ocr_svc.process_image(str(file_path))
for ext in ['.xlsx', '.xls']:
candidate = _output_dir / f"{stem}{ext}"
if candidate.exists():
upsert_file_relation(input_image=body.filename, output_excel=candidate.name, status='ocr_done')
_add_result_file(candidate.name)
break
tm.add_log(task.id, f"[OCR] 完成")
# Step 2: Excel
tm.update_progress(task.id, 50, "步骤 2/2: Excel处理")
tm.add_log(task.id, f"[Pipeline] Excel处理")
from app.services.order_service import OrderService
order_svc = OrderService()
result_name = f"采购单_{stem}.xls"
result_path = _result_dir / result_name
if result_path.exists():
tm.add_log(task.id, f"[跳过] 已处理过 → {result_name}")
upsert_file_relation(output_excel=f"{stem}.xlsx", result_purchase=result_name, status='done')
else:
# Find the output excel
excel_file = out_xlsx if out_xlsx.exists() else (out_xls if out_xls.exists() else None)
if excel_file:
order_svc.process_excel(str(excel_file))
if result_path.exists():
upsert_file_relation(output_excel=excel_file.name, result_purchase=result_name, status='done')
tm.add_log(task.id, f"[Excel] 完成")
else:
tm.add_log(task.id, f"[错误] OCR未生成Excel文件")
result_files = list(getattr(_tlocal, 'result_files', []))
tm.set_completed(task.id, result_files=result_files, message=f"全流程完成: {body.filename}")
except Exception as e:
tb = traceback.format_exc()
tm.add_log(task.id, f"[错误] {tb}")
tm.set_failed(task.id, str(e))
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message=f"全流程任务已创建: {body.filename}")
@router.post("/merge-batch", response_model=TaskResponse)
async def merge_batch(
request: Request,
body: MergeBatchRequest,
current_user: dict = Depends(get_current_user),
):
"""Merge selected purchase order files into one PosPal template."""
tm = _get_task_manager(request)
task = tm.create_task("批量合并采购单")
task.metadata = {"endpoint": "/api/processing/merge-batch", "body": body.dict()}
file_paths = [_result_dir / f for f in body.filenames if (_result_dir / f).is_file()]
if not file_paths:
raise HTTPException(400, "没有找到可合并的采购单文件")
async def _bg():
def do_work():
from app.core.excel.merger import PurchaseOrderMerger
tm.update_progress(task.id, 20, f"正在合并 {len(file_paths)} 个采购单...")
tm.add_log(task.id, f"[合并] 合并文件: {', '.join(f.name for f in file_paths)}")
try:
from app.config.settings import ConfigManager
merger = PurchaseOrderMerger(ConfigManager())
result = merger.process([str(f) for f in file_paths])
if result:
merged_name = Path(result).name
upsert_file_relation(result_purchase=merged_name, status='merged')
tm.add_log(task.id, f"[合并] 完成: {merged_name}")
tm.set_completed(task.id, result_files=[merged_name], message="批量合并完成")
else:
tm.set_failed(task.id, "合并返回空结果")
except Exception as e:
tm.add_log(task.id, f"[合并] 失败: {e}")
tm.set_failed(task.id, str(e))
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
_run_background(_bg())
return TaskResponse(task_id=task.id, status="accepted", message="批量合并任务已创建")
# ---------------------------------------------------------------------------
# Status endpoint
# ---------------------------------------------------------------------------
@router.get("/status/{task_id}")
async def get_task_status(
task_id: str,
request: Request,
current_user: dict = Depends(get_current_user),
):
tm = _get_task_manager(request)
task = tm.get_task(task_id)
if not task:
raise HTTPException(404, "任务不存在")
return task.to_dict()