e441ac82a8
- Cleaned up code in processing.py by removing inline semicolons and improving readability. - Updated upsert_file_relation calls to ensure consistent handling of file relations. - Enhanced query_file_relations in db_schema.py to support filtering by file existence. - Improved API error handling in index.ts with user-friendly messages for 401 and 403 errors. - Added online/offline status tracking in Layout.vue. - Implemented debounced search functionality across multiple views to optimize performance. - Introduced loading skeletons in Dashboard.vue for better user experience during data fetching. - Enhanced file preview cleanup logic in Images.vue, Orders.vue, and Tables.vue to prevent memory leaks. - Updated global styles to include new loading and notification animations.
708 lines
28 KiB
Python
708 lines
28 KiB
Python
"""Processing endpoints: OCR, Excel conversion, merge, and full pipeline."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import sys
|
|
import threading
|
|
import traceback
|
|
from pathlib import Path
|
|
from typing import Optional, List
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Request
|
|
from pydantic import BaseModel
|
|
|
|
from ..auth.dependencies import get_current_user
|
|
from ..services.service_wrapper import ServiceWrapper
|
|
from ..services.db_schema import upsert_file_relation
|
|
|
|
router = APIRouter(prefix="/api/processing", tags=["processing"])
|
|
|
|
_wrapper = ServiceWrapper(max_workers=3)
|
|
|
|
# ── Thread-safe log capture ──
|
|
_tlocal = threading.local()
|
|
|
|
|
|
class TaskLogHandler(logging.Handler):
|
|
"""Capture all log records during task execution and forward to tm.add_log()"""
|
|
|
|
def emit(self, record: logging.LogRecord):
|
|
ctx = getattr(_tlocal, 'ctx', None)
|
|
if ctx:
|
|
tm = ctx.get('tm')
|
|
task_id = ctx.get('task_id')
|
|
if tm and task_id:
|
|
msg = self.format(record)
|
|
if any(skip in msg for skip in ['DEBUG:', 'urllib3', 'charset_normalizer']):
|
|
return
|
|
tm.add_log(task_id, msg)
|
|
|
|
|
|
_log_handler = TaskLogHandler()
|
|
_log_handler.setLevel(logging.DEBUG)
|
|
_log_handler.setFormatter(logging.Formatter('%(message)s'))
|
|
_root_logger = logging.getLogger()
|
|
_configured = False
|
|
|
|
|
|
def _setup_log_capture():
|
|
global _configured
|
|
if not _configured:
|
|
_root_logger.addHandler(_log_handler)
|
|
_configured = True
|
|
|
|
|
|
def _start_log_capture(tm, task_id: str):
|
|
_setup_log_capture()
|
|
_root_logger.setLevel(logging.DEBUG)
|
|
_tlocal.ctx = {'tm': tm, 'task_id': task_id}
|
|
|
|
|
|
def _stop_log_capture():
|
|
_tlocal.ctx = None
|
|
|
|
|
|
def _add_result_file(name: str):
|
|
files = getattr(_tlocal, 'result_files', None)
|
|
if files is not None:
|
|
files.append(name)
|
|
|
|
|
|
def _wrap_with_capture(tm, task_id, func):
|
|
"""Wrap a do_work function with log capture setup/teardown."""
|
|
def wrapped():
|
|
_start_log_capture(tm, task_id)
|
|
_tlocal.result_files = []
|
|
try:
|
|
return func()
|
|
finally:
|
|
_stop_log_capture()
|
|
return wrapped
|
|
|
|
_project_root = Path(__file__).resolve().parent.parent.parent.parent
|
|
_input_dir = _project_root / "data" / "input"
|
|
_output_dir = _project_root / "data" / "output"
|
|
_result_dir = _project_root / "data" / "result"
|
|
|
|
|
|
class PipelineRequest(BaseModel):
|
|
files: Optional[List[str]] = None
|
|
supplier: Optional[str] = None
|
|
|
|
|
|
class SingleFileRequest(BaseModel):
|
|
filename: str
|
|
|
|
|
|
class MergeBatchRequest(BaseModel):
|
|
filenames: List[str]
|
|
|
|
|
|
class TaskResponse(BaseModel):
|
|
task_id: str
|
|
status: str
|
|
message: str
|
|
|
|
|
|
def _get_task_manager(request: Request):
|
|
return request.state.task_manager
|
|
|
|
|
|
def _list_input_files(filter_ext: Optional[List[str]] = None) -> List[Path]:
|
|
if not _input_dir.is_dir():
|
|
return []
|
|
files = []
|
|
for f in sorted(_input_dir.iterdir()):
|
|
if f.is_file():
|
|
if filter_ext is None or f.suffix.lower() in filter_ext:
|
|
files.append(f)
|
|
return files
|
|
|
|
|
|
def _list_files_in(directory: Path, filter_ext: List[str] = None) -> List[Path]:
|
|
if not directory.is_dir():
|
|
return []
|
|
files = []
|
|
for f in sorted(directory.iterdir()):
|
|
if f.is_file():
|
|
if filter_ext is None or f.suffix.lower() in filter_ext:
|
|
files.append(f)
|
|
return files
|
|
|
|
|
|
def _run_background(coro):
|
|
"""Schedule a coroutine as a background task."""
|
|
asyncio.ensure_future(coro)
|
|
|
|
|
|
def _run_background_with_log(coro, tm, task_id: str):
|
|
"""Schedule a coroutine with log capture during execution."""
|
|
|
|
async def _wrapped():
|
|
_start_log_capture(tm, task_id)
|
|
try:
|
|
await coro
|
|
finally:
|
|
_stop_log_capture()
|
|
|
|
asyncio.ensure_future(_wrapped())
|
|
|
|
|
|
def _get_product_db():
|
|
from app.core.db.product_db import ProductDatabase
|
|
return ProductDatabase(
|
|
str(_project_root / 'data' / 'product_cache.db'),
|
|
str(_project_root / 'templates' / '商品资料.xlsx')
|
|
)
|
|
|
|
|
|
def _learn_products_from_excel(excel_path: Path, tm, task_id, source: str = 'ocr'):
|
|
"""从处理后的Excel文件学习商品数据到记忆库"""
|
|
try:
|
|
from app.core.utils.file_utils import smart_read_excel
|
|
df = smart_read_excel(str(excel_path))
|
|
if df is None or df.empty:
|
|
return
|
|
except Exception:
|
|
return
|
|
|
|
from app.core.handlers.column_mapper import ColumnMapper
|
|
barcode_col = ColumnMapper.find_column(list(df.columns), 'barcode')
|
|
if not barcode_col:
|
|
return
|
|
name_col = ColumnMapper.find_column(list(df.columns), 'name')
|
|
spec_col = ColumnMapper.find_column(list(df.columns), 'specification')
|
|
unit_col = ColumnMapper.find_column(list(df.columns), 'unit')
|
|
price_col = ColumnMapper.find_column(list(df.columns), 'unit_price') or ColumnMapper.find_column(list(df.columns), 'price')
|
|
|
|
db = _get_product_db()
|
|
barcodes = [str(r.get(barcode_col, '')).strip() for _, r in df.iterrows() if str(r.get(barcode_col, '')).strip()]
|
|
memory = db.load_batch(barcodes)
|
|
|
|
learned = 0
|
|
for _, row in df.iterrows():
|
|
barcode = str(row.get(barcode_col, '')).strip()
|
|
if not barcode or barcode == 'nan':
|
|
continue
|
|
price = 0.0
|
|
if price_col:
|
|
try:
|
|
p = row.get(price_col)
|
|
if p is not None and str(p).strip() not in ('', 'nan', 'None'):
|
|
price = float(p)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
product = {
|
|
'barcode': barcode,
|
|
'name': str(row.get(name_col, '')).strip() if name_col else '',
|
|
'specification': str(row.get(spec_col, '')).strip() if spec_col else '',
|
|
'unit': str(row.get(unit_col, '')).strip() if unit_col else '',
|
|
'price': price,
|
|
}
|
|
|
|
# 1. 记忆辅助补全
|
|
filled, fill_log = db.fill_from_memory(barcode, product, memory)
|
|
if fill_log:
|
|
tm.add_log(task_id, f" {fill_log}")
|
|
|
|
# 2. 价格预警
|
|
warn = db.price_warning(barcode, price, memory)
|
|
if warn:
|
|
tm.add_log(task_id, f" {warn}")
|
|
|
|
# 3. 学习
|
|
log = db.learn_from_product(filled, source=source, memory=memory, add_log=None)
|
|
if log:
|
|
tm.add_log(task_id, f" {log}")
|
|
learned += 1
|
|
|
|
if learned:
|
|
tm.add_log(task_id, f"[记忆库] 从 {excel_path.name} 学习了 {learned} 条商品数据")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Batch endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/ocr-batch", response_model=TaskResponse)
|
|
async def ocr_batch(
|
|
request: Request,
|
|
current_user: dict = Depends(get_current_user),
|
|
):
|
|
"""Run OCR on all images in input/."""
|
|
tm = _get_task_manager(request)
|
|
task = tm.create_task("批量OCR识别")
|
|
|
|
image_exts = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'}
|
|
files = _list_input_files(filter_ext=list(image_exts))
|
|
if not files:
|
|
raise HTTPException(400, "input/ 目录中没有图片文件")
|
|
|
|
async def _bg():
|
|
def do_work():
|
|
from app.services.ocr_service import OCRService
|
|
svc = OCRService()
|
|
total = len(files)
|
|
for i, f in enumerate(files):
|
|
# Skip check
|
|
out_stem = f.stem
|
|
# OCR output could be .xlsx or .xls
|
|
out_xlsx = _output_dir / f"{out_stem}.xlsx"
|
|
out_xls = _output_dir / f"{out_stem}.xls"
|
|
if out_xlsx.exists() or out_xls.exists():
|
|
out_name = out_xlsx.name if out_xlsx.exists() else out_xls.name
|
|
tm.add_log(task.id, f"[跳过] {f.name} 已OCR过 → {out_name}")
|
|
upsert_file_relation(input_image=f.name, output_excel=out_name, status='ocr_done')
|
|
continue
|
|
|
|
tm.update_progress(task.id, int((i / total) * 100), f"正在识别: {f.name}")
|
|
tm.add_log(task.id, f"[OCR] 处理 {f.name}")
|
|
try:
|
|
svc.process_image(str(f))
|
|
# Find the output file
|
|
for ext in ['.xlsx', '.xls']:
|
|
candidate = _output_dir / f"{out_stem}{ext}"
|
|
if candidate.exists():
|
|
upsert_file_relation(input_image=f.name, output_excel=candidate.name, status='ocr_done')
|
|
_add_result_file(candidate.name)
|
|
break
|
|
tm.add_log(task.id, f"[OCR] 完成: {f.name}")
|
|
# Learn products into memory from OCR output
|
|
out_file = _output_dir / f"{out_stem}.xlsx"
|
|
if not out_file.exists():
|
|
out_file = _output_dir / f"{out_stem}.xls"
|
|
if out_file.exists():
|
|
_learn_products_from_excel(out_file, tm, task.id, source='ocr')
|
|
except Exception as e:
|
|
tm.add_log(task.id, f"[OCR] 失败: {f.name} - {e}")
|
|
|
|
result_files = list(getattr(_tlocal, 'result_files', []))
|
|
tm.set_completed(task.id, result_files=result_files, message=f"OCR完成,共处理 {total} 个文件")
|
|
|
|
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
|
|
|
|
_run_background(_bg())
|
|
return TaskResponse(task_id=task.id, status="accepted", message="OCR任务已创建")
|
|
|
|
|
|
@router.post("/excel", response_model=TaskResponse)
|
|
async def process_excel(
|
|
request: Request,
|
|
body: PipelineRequest = PipelineRequest(),
|
|
current_user: dict = Depends(get_current_user),
|
|
):
|
|
"""Convert OCR output Excel files to standardized purchase orders."""
|
|
tm = _get_task_manager(request)
|
|
task = tm.create_task("Excel标准化处理")
|
|
|
|
excel_exts = {'.xls', '.xlsx'}
|
|
if body.files:
|
|
files = [_output_dir / f for f in body.files if (_output_dir / f).is_file()]
|
|
else:
|
|
files = _list_files_in(_output_dir, filter_ext=list(excel_exts))
|
|
|
|
if not files:
|
|
raise HTTPException(400, "output/ 目录中没有Excel文件")
|
|
|
|
async def _bg():
|
|
def do_work():
|
|
from app.services.order_service import OrderService
|
|
svc = OrderService()
|
|
total = len(files)
|
|
for i, f in enumerate(files):
|
|
# Skip check
|
|
result_name = f"采购单_{f.stem}.xls"
|
|
result_path = _result_dir / result_name
|
|
if result_path.exists():
|
|
tm.add_log(task.id, f"[跳过] {f.name} 已处理过 → {result_name}")
|
|
upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done')
|
|
_add_result_file(result_name)
|
|
continue
|
|
|
|
tm.update_progress(task.id, int((i / total) * 100), f"正在处理: {f.name}")
|
|
tm.add_log(task.id, f"[Excel] 处理 {f.name}")
|
|
try:
|
|
svc.process_excel(str(f))
|
|
# Find result file
|
|
if result_path.exists():
|
|
upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done')
|
|
_add_result_file(result_name)
|
|
tm.add_log(task.id, f"[Excel] 完成: {f.name}")
|
|
# Learn products into memory from purchase order result
|
|
if result_path.exists():
|
|
_learn_products_from_excel(result_path, tm, task.id, source='ocr')
|
|
except Exception as e:
|
|
tm.add_log(task.id, f"[Excel] 失败: {f.name} - {e}")
|
|
|
|
result_files = list(getattr(_tlocal, 'result_files', []))
|
|
tm.set_completed(task.id, result_files=result_files, message=f"Excel处理完成,共 {total} 个文件")
|
|
|
|
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
|
|
|
|
_run_background(_bg())
|
|
return TaskResponse(task_id=task.id, status="accepted", message="Excel处理任务已创建")
|
|
|
|
|
|
@router.post("/merge", response_model=TaskResponse)
|
|
async def merge_orders(
|
|
request: Request,
|
|
body: MergeBatchRequest = MergeBatchRequest(filenames=[]),
|
|
current_user: dict = Depends(get_current_user),
|
|
):
|
|
"""Merge selected purchase order files into one PosPal template."""
|
|
tm = _get_task_manager(request)
|
|
task = tm.create_task("合并采购单")
|
|
|
|
# If specific files provided, use them; otherwise merge all
|
|
if body.filenames:
|
|
file_paths = [_result_dir / f for f in body.filenames if (_result_dir / f).is_file()]
|
|
else:
|
|
file_paths = list(_result_dir.glob("采购单_*.xls"))
|
|
|
|
if not file_paths:
|
|
raise HTTPException(400, "没有找到可合并的采购单文件")
|
|
|
|
async def _bg():
|
|
def do_work():
|
|
from app.core.excel.merger import PurchaseOrderMerger
|
|
tm.update_progress(task.id, 20, "正在合并采购单...")
|
|
tm.add_log(task.id, f"[合并] 合并 {len(file_paths)} 个文件")
|
|
try:
|
|
from app.config.settings import ConfigManager
|
|
merger = PurchaseOrderMerger(ConfigManager())
|
|
result = merger.process([str(f) for f in file_paths])
|
|
if result:
|
|
merged_name = Path(result).name
|
|
upsert_file_relation(result_purchase=merged_name, status='merged')
|
|
tm.add_log(task.id, f"[合并] 完成: {merged_name}")
|
|
tm.set_completed(task.id, result_files=[merged_name], message="合并完成")
|
|
else:
|
|
tm.set_failed(task.id, "合并返回空结果")
|
|
except Exception as e:
|
|
tm.add_log(task.id, f"[合并] 失败: {e}")
|
|
tm.set_failed(task.id, str(e))
|
|
|
|
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
|
|
|
|
_run_background(_bg())
|
|
return TaskResponse(task_id=task.id, status="accepted", message="合并任务已创建")
|
|
|
|
|
|
@router.post("/pipeline", response_model=TaskResponse)
|
|
async def full_pipeline(
|
|
request: Request,
|
|
body: PipelineRequest = PipelineRequest(),
|
|
current_user: dict = Depends(get_current_user),
|
|
):
|
|
"""Run the full pipeline: OCR -> Excel -> Result (NO merge)."""
|
|
tm = _get_task_manager(request)
|
|
task = tm.create_task("一键全流程处理")
|
|
|
|
async def _bg():
|
|
def do_work():
|
|
try:
|
|
# Step 1: OCR
|
|
tm.update_progress(task.id, 0, "步骤 1/2: OCR识别")
|
|
tm.add_log(task.id, "[Pipeline] 开始OCR识别")
|
|
from app.services.ocr_service import OCRService
|
|
ocr_svc = OCRService()
|
|
|
|
image_exts = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'}
|
|
images = _list_input_files(filter_ext=list(image_exts))
|
|
for i, f in enumerate(images):
|
|
pct = int((i / max(len(images), 1)) * 40)
|
|
|
|
# Skip check
|
|
out_stem = f.stem
|
|
out_xlsx = _output_dir / f"{out_stem}.xlsx"
|
|
out_xls = _output_dir / f"{out_stem}.xls"
|
|
if out_xlsx.exists() or out_xls.exists():
|
|
out_name = out_xlsx.name if out_xlsx.exists() else out_xls.name
|
|
tm.add_log(task.id, f"[跳过] {f.name} 已OCR过 → {out_name}")
|
|
upsert_file_relation(input_image=f.name, output_excel=out_name, status='ocr_done')
|
|
tm.update_progress(task.id, pct, f"跳过: {f.name}")
|
|
continue
|
|
|
|
tm.update_progress(task.id, pct, f"OCR: {f.name}")
|
|
try:
|
|
ocr_svc.process_image(str(f))
|
|
for ext in ['.xlsx', '.xls']:
|
|
candidate = _output_dir / f"{out_stem}{ext}"
|
|
if candidate.exists():
|
|
upsert_file_relation(input_image=f.name, output_excel=candidate.name, status='ocr_done')
|
|
_add_result_file(candidate.name)
|
|
break
|
|
tm.add_log(task.id, f"[OCR] 完成: {f.name}")
|
|
out_file = _output_dir / f"{out_stem}.xlsx"
|
|
if not out_file.exists():
|
|
out_file = _output_dir / f"{out_stem}.xls"
|
|
if out_file.exists():
|
|
_learn_products_from_excel(out_file, tm, task.id, source='ocr')
|
|
except Exception as e:
|
|
tm.add_log(task.id, f"[OCR] 失败: {f.name} - {e}")
|
|
|
|
# Step 2: Excel conversion
|
|
tm.update_progress(task.id, 45, "步骤 2/2: Excel标准化")
|
|
tm.add_log(task.id, "[Pipeline] 开始Excel处理")
|
|
from app.services.order_service import OrderService
|
|
order_svc = OrderService()
|
|
|
|
excel_files = list(_output_dir.glob("*.xls")) + list(_output_dir.glob("*.xlsx"))
|
|
for i, f in enumerate(excel_files):
|
|
pct = 45 + int((i / max(len(excel_files), 1)) * 55)
|
|
|
|
# Skip check
|
|
result_name = f"采购单_{f.stem}.xls"
|
|
result_path = _result_dir / result_name
|
|
if result_path.exists():
|
|
tm.add_log(task.id, f"[跳过] {f.name} 已处理过 → {result_name}")
|
|
upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done')
|
|
_add_result_file(result_name)
|
|
tm.update_progress(task.id, pct, f"跳过: {f.name}")
|
|
continue
|
|
|
|
tm.update_progress(task.id, pct, f"Excel: {f.name}")
|
|
try:
|
|
order_svc.process_excel(str(f))
|
|
if result_path.exists():
|
|
upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done')
|
|
_add_result_file(result_name)
|
|
tm.add_log(task.id, f"[Excel] 完成: {f.name}")
|
|
if result_path.exists():
|
|
_learn_products_from_excel(result_path, tm, task.id, source='ocr')
|
|
except Exception as e:
|
|
tm.add_log(task.id, f"[Excel] 失败: {f.name} - {e}")
|
|
|
|
result_files = list(getattr(_tlocal, 'result_files', []))
|
|
tm.set_completed(task.id, result_files=result_files, message="全流程处理完成(不含合并)")
|
|
except Exception as e:
|
|
tb = traceback.format_exc()
|
|
tm.add_log(task.id, f"[错误] {tb}")
|
|
tm.set_failed(task.id, str(e))
|
|
|
|
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
|
|
|
|
_run_background(_bg())
|
|
return TaskResponse(task_id=task.id, status="accepted", message="全流程任务已创建")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Single-file endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/ocr-single", response_model=TaskResponse)
|
|
async def ocr_single(
|
|
request: Request,
|
|
body: SingleFileRequest,
|
|
current_user: dict = Depends(get_current_user),
|
|
):
|
|
"""OCR a single image file."""
|
|
tm = _get_task_manager(request)
|
|
task = tm.create_task(f"OCR: {body.filename}")
|
|
|
|
file_path = _input_dir / body.filename
|
|
if not file_path.is_file():
|
|
raise HTTPException(404, f"文件不存在: {body.filename}")
|
|
|
|
async def _bg():
|
|
def do_work():
|
|
from app.services.ocr_service import OCRService
|
|
svc = OCRService()
|
|
tm.update_progress(task.id, 10, f"正在识别: {body.filename}")
|
|
tm.add_log(task.id, f"[OCR] 处理 {body.filename}")
|
|
try:
|
|
svc.process_image(str(file_path))
|
|
# Find output
|
|
stem = file_path.stem
|
|
for ext in ['.xlsx', '.xls']:
|
|
candidate = _output_dir / f"{stem}{ext}"
|
|
if candidate.exists():
|
|
upsert_file_relation(input_image=body.filename, output_excel=candidate.name, status='ocr_done')
|
|
_add_result_file(candidate.name)
|
|
break
|
|
tm.add_log(task.id, f"[OCR] 完成: {body.filename}")
|
|
result_files = list(getattr(_tlocal, 'result_files', []))
|
|
tm.set_completed(task.id, result_files=result_files, message=f"OCR完成: {body.filename}")
|
|
except Exception as e:
|
|
tm.add_log(task.id, f"[OCR] 失败: {e}")
|
|
tm.set_failed(task.id, str(e))
|
|
|
|
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
|
|
|
|
_run_background(_bg())
|
|
return TaskResponse(task_id=task.id, status="accepted", message=f"OCR任务已创建: {body.filename}")
|
|
|
|
|
|
@router.post("/excel-single", response_model=TaskResponse)
|
|
async def excel_single(
|
|
request: Request,
|
|
body: SingleFileRequest,
|
|
current_user: dict = Depends(get_current_user),
|
|
):
|
|
"""Process a single Excel file to purchase order."""
|
|
tm = _get_task_manager(request)
|
|
task = tm.create_task(f"Excel处理: {body.filename}")
|
|
|
|
file_path = _output_dir / body.filename
|
|
if not file_path.is_file():
|
|
raise HTTPException(404, f"文件不存在: {body.filename}")
|
|
|
|
async def _bg():
|
|
def do_work():
|
|
from app.services.order_service import OrderService
|
|
svc = OrderService()
|
|
tm.update_progress(task.id, 10, f"正在处理: {body.filename}")
|
|
tm.add_log(task.id, f"[Excel] 处理 {body.filename}")
|
|
try:
|
|
svc.process_excel(str(file_path))
|
|
result_name = f"采购单_{file_path.stem}.xls"
|
|
if (_result_dir / result_name).exists():
|
|
upsert_file_relation(output_excel=body.filename, result_purchase=result_name, status='done')
|
|
tm.add_log(task.id, f"[Excel] 完成: {body.filename}")
|
|
result_files = list(getattr(_tlocal, 'result_files', []))
|
|
tm.set_completed(task.id, result_files=result_files, message=f"Excel处理完成: {body.filename}")
|
|
except Exception as e:
|
|
tm.add_log(task.id, f"[Excel] 失败: {e}")
|
|
tm.set_failed(task.id, str(e))
|
|
|
|
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
|
|
|
|
_run_background(_bg())
|
|
return TaskResponse(task_id=task.id, status="accepted", message=f"Excel处理任务已创建: {body.filename}")
|
|
|
|
|
|
@router.post("/pipeline-single", response_model=TaskResponse)
|
|
async def pipeline_single(
|
|
request: Request,
|
|
body: SingleFileRequest,
|
|
current_user: dict = Depends(get_current_user),
|
|
):
|
|
"""Full pipeline for a single image: OCR -> Excel -> Result (no merge)."""
|
|
tm = _get_task_manager(request)
|
|
task = tm.create_task(f"全流程: {body.filename}")
|
|
|
|
file_path = _input_dir / body.filename
|
|
if not file_path.is_file():
|
|
raise HTTPException(404, f"文件不存在: {body.filename}")
|
|
|
|
async def _bg():
|
|
def do_work():
|
|
try:
|
|
stem = file_path.stem
|
|
|
|
# Step 1: OCR
|
|
tm.update_progress(task.id, 10, "步骤 1/2: OCR识别")
|
|
tm.add_log(task.id, f"[Pipeline] OCR: {body.filename}")
|
|
from app.services.ocr_service import OCRService
|
|
ocr_svc = OCRService()
|
|
|
|
out_xlsx = _output_dir / f"{stem}.xlsx"
|
|
out_xls = _output_dir / f"{stem}.xls"
|
|
if out_xlsx.exists() or out_xls.exists():
|
|
out_name = out_xlsx.name if out_xlsx.exists() else out_xls.name
|
|
tm.add_log(task.id, f"[跳过] 已OCR过 → {out_name}")
|
|
upsert_file_relation(input_image=body.filename, output_excel=out_name, status='ocr_done')
|
|
_add_result_file(out_name)
|
|
else:
|
|
ocr_svc.process_image(str(file_path))
|
|
for ext in ['.xlsx', '.xls']:
|
|
candidate = _output_dir / f"{stem}{ext}"
|
|
if candidate.exists():
|
|
upsert_file_relation(input_image=body.filename, output_excel=candidate.name, status='ocr_done')
|
|
_add_result_file(candidate.name)
|
|
break
|
|
tm.add_log(task.id, f"[OCR] 完成")
|
|
|
|
# Step 2: Excel
|
|
tm.update_progress(task.id, 50, "步骤 2/2: Excel处理")
|
|
tm.add_log(task.id, f"[Pipeline] Excel处理")
|
|
from app.services.order_service import OrderService
|
|
order_svc = OrderService()
|
|
|
|
result_name = f"采购单_{stem}.xls"
|
|
result_path = _result_dir / result_name
|
|
if result_path.exists():
|
|
tm.add_log(task.id, f"[跳过] 已处理过 → {result_name}")
|
|
upsert_file_relation(output_excel=f"{stem}.xlsx", result_purchase=result_name, status='done')
|
|
else:
|
|
# Find the output excel
|
|
excel_file = out_xlsx if out_xlsx.exists() else (out_xls if out_xls.exists() else None)
|
|
if excel_file:
|
|
order_svc.process_excel(str(excel_file))
|
|
if result_path.exists():
|
|
upsert_file_relation(output_excel=excel_file.name, result_purchase=result_name, status='done')
|
|
tm.add_log(task.id, f"[Excel] 完成")
|
|
else:
|
|
tm.add_log(task.id, f"[错误] OCR未生成Excel文件")
|
|
|
|
result_files = list(getattr(_tlocal, 'result_files', []))
|
|
tm.set_completed(task.id, result_files=result_files, message=f"全流程完成: {body.filename}")
|
|
except Exception as e:
|
|
tb = traceback.format_exc()
|
|
tm.add_log(task.id, f"[错误] {tb}")
|
|
tm.set_failed(task.id, str(e))
|
|
|
|
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
|
|
|
|
_run_background(_bg())
|
|
return TaskResponse(task_id=task.id, status="accepted", message=f"全流程任务已创建: {body.filename}")
|
|
|
|
|
|
@router.post("/merge-batch", response_model=TaskResponse)
|
|
async def merge_batch(
|
|
request: Request,
|
|
body: MergeBatchRequest,
|
|
current_user: dict = Depends(get_current_user),
|
|
):
|
|
"""Merge selected purchase order files into one PosPal template."""
|
|
tm = _get_task_manager(request)
|
|
task = tm.create_task("批量合并采购单")
|
|
|
|
file_paths = [_result_dir / f for f in body.filenames if (_result_dir / f).is_file()]
|
|
if not file_paths:
|
|
raise HTTPException(400, "没有找到可合并的采购单文件")
|
|
|
|
async def _bg():
|
|
def do_work():
|
|
from app.core.excel.merger import PurchaseOrderMerger
|
|
tm.update_progress(task.id, 20, f"正在合并 {len(file_paths)} 个采购单...")
|
|
tm.add_log(task.id, f"[合并] 合并文件: {', '.join(f.name for f in file_paths)}")
|
|
try:
|
|
from app.config.settings import ConfigManager
|
|
merger = PurchaseOrderMerger(ConfigManager())
|
|
result = merger.process([str(f) for f in file_paths])
|
|
if result:
|
|
merged_name = Path(result).name
|
|
upsert_file_relation(result_purchase=merged_name, status='merged')
|
|
tm.add_log(task.id, f"[合并] 完成: {merged_name}")
|
|
tm.set_completed(task.id, result_files=[merged_name], message="批量合并完成")
|
|
else:
|
|
tm.set_failed(task.id, "合并返回空结果")
|
|
except Exception as e:
|
|
tm.add_log(task.id, f"[合并] 失败: {e}")
|
|
tm.set_failed(task.id, str(e))
|
|
|
|
await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work))
|
|
|
|
_run_background(_bg())
|
|
return TaskResponse(task_id=task.id, status="accepted", message="批量合并任务已创建")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Status endpoint
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/status/{task_id}")
|
|
async def get_task_status(
|
|
task_id: str,
|
|
request: Request,
|
|
current_user: dict = Depends(get_current_user),
|
|
):
|
|
tm = _get_task_manager(request)
|
|
task = tm.get_task(task_id)
|
|
if not task:
|
|
raise HTTPException(404, "任务不存在")
|
|
return task.to_dict()
|