"""Processing endpoints: OCR, Excel conversion, merge, and full pipeline.""" import asyncio import logging import os import sys import threading import traceback from pathlib import Path from typing import Optional, List from fastapi import APIRouter, HTTPException, Depends, Request from pydantic import BaseModel from ..auth.dependencies import get_current_user from ..services.service_wrapper import ServiceWrapper from ..services.db_schema import upsert_file_relation router = APIRouter(prefix="/api/processing", tags=["processing"]) _wrapper = ServiceWrapper(max_workers=3) # ── Thread-safe log capture ── _tlocal = threading.local() class TaskLogHandler(logging.Handler): """Capture all log records during task execution and forward to tm.add_log()""" def emit(self, record: logging.LogRecord): ctx = getattr(_tlocal, 'ctx', None) if ctx: tm = ctx.get('tm') task_id = ctx.get('task_id') if tm and task_id: msg = self.format(record) if any(skip in msg for skip in ['DEBUG:', 'urllib3', 'charset_normalizer']): return tm.add_log(task_id, msg) _log_handler = TaskLogHandler() _log_handler.setLevel(logging.DEBUG) _log_handler.setFormatter(logging.Formatter('%(message)s')) _root_logger = logging.getLogger() _configured = False def _setup_log_capture(): global _configured if not _configured: _root_logger.addHandler(_log_handler) _configured = True def _start_log_capture(tm, task_id: str): _setup_log_capture() _root_logger.setLevel(logging.DEBUG) _tlocal.ctx = {'tm': tm, 'task_id': task_id} def _stop_log_capture(): _tlocal.ctx = None def _add_result_file(name: str): files = getattr(_tlocal, 'result_files', None) if files is not None: files.append(name) def _wrap_with_capture(tm, task_id, func): """Wrap a do_work function with log capture setup/teardown.""" def wrapped(): _start_log_capture(tm, task_id) _tlocal.result_files = [] try: return func() finally: _stop_log_capture() return wrapped _project_root = Path(__file__).resolve().parent.parent.parent.parent _input_dir = _project_root / "data" / "input" _output_dir = _project_root / "data" / "output" _result_dir = _project_root / "data" / "result" class PipelineRequest(BaseModel): files: Optional[List[str]] = None supplier: Optional[str] = None class SingleFileRequest(BaseModel): filename: str class MergeBatchRequest(BaseModel): filenames: List[str] class TaskResponse(BaseModel): task_id: str status: str message: str def _get_task_manager(request: Request): return request.state.task_manager def _list_input_files(filter_ext: Optional[List[str]] = None) -> List[Path]: if not _input_dir.is_dir(): return [] files = [] for f in sorted(_input_dir.iterdir()): if f.is_file(): if filter_ext is None or f.suffix.lower() in filter_ext: files.append(f) return files def _list_files_in(directory: Path, filter_ext: List[str] = None) -> List[Path]: if not directory.is_dir(): return [] files = [] for f in sorted(directory.iterdir()): if f.is_file(): if filter_ext is None or f.suffix.lower() in filter_ext: files.append(f) return files def _run_background(coro): """Schedule a coroutine as a background task.""" asyncio.ensure_future(coro) def _run_background_with_log(coro, tm, task_id: str): """Schedule a coroutine with log capture during execution.""" async def _wrapped(): _start_log_capture(tm, task_id) try: await coro finally: _stop_log_capture() asyncio.ensure_future(_wrapped()) def _get_product_db(): from app.core.db.product_db import ProductDatabase return ProductDatabase( str(_project_root / 'data' / 'product_cache.db'), str(_project_root / 'templates' / '商品资料.xlsx') ) def _learn_products_from_excel(excel_path: Path, tm, task_id, source: str = 'ocr'): """从处理后的Excel文件学习商品数据到记忆库""" try: from app.core.utils.file_utils import smart_read_excel df = smart_read_excel(str(excel_path)) if df is None or df.empty: return except Exception: return from app.core.handlers.column_mapper import ColumnMapper barcode_col = ColumnMapper.find_column(list(df.columns), 'barcode') if not barcode_col: return name_col = ColumnMapper.find_column(list(df.columns), 'name') spec_col = ColumnMapper.find_column(list(df.columns), 'specification') unit_col = ColumnMapper.find_column(list(df.columns), 'unit') price_col = ColumnMapper.find_column(list(df.columns), 'unit_price') or ColumnMapper.find_column(list(df.columns), 'price') db = _get_product_db() barcodes = [str(r.get(barcode_col, '')).strip() for _, r in df.iterrows() if str(r.get(barcode_col, '')).strip()] memory = db.load_batch(barcodes) learned = 0 for _, row in df.iterrows(): barcode = str(row.get(barcode_col, '')).strip() if not barcode or barcode == 'nan': continue price = 0.0 if price_col: try: p = row.get(price_col) if p is not None and str(p).strip() not in ('', 'nan', 'None'): price = float(p) except (ValueError, TypeError): pass product = { 'barcode': barcode, 'name': str(row.get(name_col, '')).strip() if name_col else '', 'specification': str(row.get(spec_col, '')).strip() if spec_col else '', 'unit': str(row.get(unit_col, '')).strip() if unit_col else '', 'price': price, } # 1. 记忆辅助补全 filled, fill_log = db.fill_from_memory(barcode, product, memory) if fill_log: tm.add_log(task_id, f" {fill_log}") # 2. 价格预警 warn = db.price_warning(barcode, price, memory) if warn: tm.add_log(task_id, f" {warn}") # 3. 学习 log = db.learn_from_product(filled, source=source, memory=memory, add_log=None) if log: tm.add_log(task_id, f" {log}") learned += 1 if learned: tm.add_log(task_id, f"[记忆库] 从 {excel_path.name} 学习了 {learned} 条商品数据") # --------------------------------------------------------------------------- # Batch endpoints # --------------------------------------------------------------------------- @router.post("/ocr-batch", response_model=TaskResponse) async def ocr_batch( request: Request, current_user: dict = Depends(get_current_user), ): """Run OCR on all images in input/.""" tm = _get_task_manager(request) task = tm.create_task("批量OCR识别") task.metadata = {"endpoint": "/api/processing/ocr-batch", "body": {}} image_exts = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'} files = _list_input_files(filter_ext=list(image_exts)) if not files: raise HTTPException(400, "input/ 目录中没有图片文件") async def _bg(): def do_work(): from app.services.ocr_service import OCRService svc = OCRService() total = len(files) for i, f in enumerate(files): # Skip check out_stem = f.stem # OCR output could be .xlsx or .xls out_xlsx = _output_dir / f"{out_stem}.xlsx" out_xls = _output_dir / f"{out_stem}.xls" if out_xlsx.exists() or out_xls.exists(): out_name = out_xlsx.name if out_xlsx.exists() else out_xls.name tm.add_log(task.id, f"[跳过] {f.name} 已OCR过 → {out_name}") upsert_file_relation(input_image=f.name, output_excel=out_name, status='ocr_done') continue tm.update_progress(task.id, int((i / total) * 100), f"正在识别: {f.name}") tm.add_log(task.id, f"[OCR] 处理 {f.name}") try: svc.process_image(str(f)) # Find the output file for ext in ['.xlsx', '.xls']: candidate = _output_dir / f"{out_stem}{ext}" if candidate.exists(): upsert_file_relation(input_image=f.name, output_excel=candidate.name, status='ocr_done') _add_result_file(candidate.name) break tm.add_log(task.id, f"[OCR] 完成: {f.name}") # Learn products into memory from OCR output out_file = _output_dir / f"{out_stem}.xlsx" if not out_file.exists(): out_file = _output_dir / f"{out_stem}.xls" if out_file.exists(): _learn_products_from_excel(out_file, tm, task.id, source='ocr') except Exception as e: tm.add_log(task.id, f"[OCR] 失败: {f.name} - {e}") result_files = list(getattr(_tlocal, 'result_files', [])) tm.set_completed(task.id, result_files=result_files, message=f"OCR完成,共处理 {total} 个文件") await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message="OCR任务已创建") @router.post("/excel", response_model=TaskResponse) async def process_excel( request: Request, body: PipelineRequest = PipelineRequest(), current_user: dict = Depends(get_current_user), ): """Convert OCR output Excel files to standardized purchase orders.""" tm = _get_task_manager(request) task = tm.create_task("Excel标准化处理") task.metadata = {"endpoint": "/api/processing/excel", "body": body.dict()} excel_exts = {'.xls', '.xlsx'} if body.files: files = [_output_dir / f for f in body.files if (_output_dir / f).is_file()] else: files = _list_files_in(_output_dir, filter_ext=list(excel_exts)) if not files: raise HTTPException(400, "output/ 目录中没有Excel文件") async def _bg(): def do_work(): from app.services.order_service import OrderService svc = OrderService() total = len(files) for i, f in enumerate(files): # Skip check result_name = f"采购单_{f.stem}.xls" result_path = _result_dir / result_name if result_path.exists(): tm.add_log(task.id, f"[跳过] {f.name} 已处理过 → {result_name}") upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done') _add_result_file(result_name) continue tm.update_progress(task.id, int((i / total) * 100), f"正在处理: {f.name}") tm.add_log(task.id, f"[Excel] 处理 {f.name}") try: svc.process_excel(str(f)) # Find result file if result_path.exists(): upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done') _add_result_file(result_name) tm.add_log(task.id, f"[Excel] 完成: {f.name}") # Learn products into memory from purchase order result if result_path.exists(): _learn_products_from_excel(result_path, tm, task.id, source='ocr') except Exception as e: tm.add_log(task.id, f"[Excel] 失败: {f.name} - {e}") result_files = list(getattr(_tlocal, 'result_files', [])) tm.set_completed(task.id, result_files=result_files, message=f"Excel处理完成,共 {total} 个文件") await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message="Excel处理任务已创建") @router.post("/merge", response_model=TaskResponse) async def merge_orders( request: Request, body: MergeBatchRequest = MergeBatchRequest(filenames=[]), current_user: dict = Depends(get_current_user), ): """Merge selected purchase order files into one PosPal template.""" tm = _get_task_manager(request) task = tm.create_task("合并采购单") task.metadata = {"endpoint": "/api/processing/merge", "body": body.dict()} # If specific files provided, use them; otherwise merge all if body.filenames: file_paths = [_result_dir / f for f in body.filenames if (_result_dir / f).is_file()] else: file_paths = list(_result_dir.glob("采购单_*.xls")) if not file_paths: raise HTTPException(400, "没有找到可合并的采购单文件") async def _bg(): def do_work(): from app.core.excel.merger import PurchaseOrderMerger tm.update_progress(task.id, 20, "正在合并采购单...") tm.add_log(task.id, f"[合并] 合并 {len(file_paths)} 个文件") try: from app.config.settings import ConfigManager merger = PurchaseOrderMerger(ConfigManager()) result = merger.process([str(f) for f in file_paths]) if result: merged_name = Path(result).name upsert_file_relation(result_purchase=merged_name, status='merged') tm.add_log(task.id, f"[合并] 完成: {merged_name}") tm.set_completed(task.id, result_files=[merged_name], message="合并完成") else: tm.set_failed(task.id, "合并返回空结果") except Exception as e: tm.add_log(task.id, f"[合并] 失败: {e}") tm.set_failed(task.id, str(e)) await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message="合并任务已创建") @router.post("/pipeline", response_model=TaskResponse) async def full_pipeline( request: Request, body: PipelineRequest = PipelineRequest(), current_user: dict = Depends(get_current_user), ): """Run the full pipeline: OCR -> Excel -> Result (NO merge).""" tm = _get_task_manager(request) task = tm.create_task("一键全流程处理") task.metadata = {"endpoint": "/api/processing/pipeline", "body": body.dict()} async def _bg(): def do_work(): try: # Step 1: OCR tm.update_progress(task.id, 0, "步骤 1/2: OCR识别") tm.add_log(task.id, "[Pipeline] 开始OCR识别") from app.services.ocr_service import OCRService ocr_svc = OCRService() image_exts = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'} images = _list_input_files(filter_ext=list(image_exts)) for i, f in enumerate(images): pct = int((i / max(len(images), 1)) * 40) # Skip check out_stem = f.stem out_xlsx = _output_dir / f"{out_stem}.xlsx" out_xls = _output_dir / f"{out_stem}.xls" if out_xlsx.exists() or out_xls.exists(): out_name = out_xlsx.name if out_xlsx.exists() else out_xls.name tm.add_log(task.id, f"[跳过] {f.name} 已OCR过 → {out_name}") upsert_file_relation(input_image=f.name, output_excel=out_name, status='ocr_done') tm.update_progress(task.id, pct, f"跳过: {f.name}") continue tm.update_progress(task.id, pct, f"OCR: {f.name}") try: ocr_svc.process_image(str(f)) for ext in ['.xlsx', '.xls']: candidate = _output_dir / f"{out_stem}{ext}" if candidate.exists(): upsert_file_relation(input_image=f.name, output_excel=candidate.name, status='ocr_done') _add_result_file(candidate.name) break tm.add_log(task.id, f"[OCR] 完成: {f.name}") out_file = _output_dir / f"{out_stem}.xlsx" if not out_file.exists(): out_file = _output_dir / f"{out_stem}.xls" if out_file.exists(): _learn_products_from_excel(out_file, tm, task.id, source='ocr') except Exception as e: tm.add_log(task.id, f"[OCR] 失败: {f.name} - {e}") # Step 2: Excel conversion tm.update_progress(task.id, 45, "步骤 2/2: Excel标准化") tm.add_log(task.id, "[Pipeline] 开始Excel处理") from app.services.order_service import OrderService order_svc = OrderService() excel_files = list(_output_dir.glob("*.xls")) + list(_output_dir.glob("*.xlsx")) for i, f in enumerate(excel_files): pct = 45 + int((i / max(len(excel_files), 1)) * 55) # Skip check result_name = f"采购单_{f.stem}.xls" result_path = _result_dir / result_name if result_path.exists(): tm.add_log(task.id, f"[跳过] {f.name} 已处理过 → {result_name}") upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done') _add_result_file(result_name) tm.update_progress(task.id, pct, f"跳过: {f.name}") continue tm.update_progress(task.id, pct, f"Excel: {f.name}") try: order_svc.process_excel(str(f)) if result_path.exists(): upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done') _add_result_file(result_name) tm.add_log(task.id, f"[Excel] 完成: {f.name}") if result_path.exists(): _learn_products_from_excel(result_path, tm, task.id, source='ocr') except Exception as e: tm.add_log(task.id, f"[Excel] 失败: {f.name} - {e}") result_files = list(getattr(_tlocal, 'result_files', [])) tm.set_completed(task.id, result_files=result_files, message="全流程处理完成(不含合并)") except Exception as e: tb = traceback.format_exc() tm.add_log(task.id, f"[错误] {tb}") tm.set_failed(task.id, str(e)) await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message="全流程任务已创建") # --------------------------------------------------------------------------- # Single-file endpoints # --------------------------------------------------------------------------- @router.post("/ocr-single", response_model=TaskResponse) async def ocr_single( request: Request, body: SingleFileRequest, current_user: dict = Depends(get_current_user), ): """OCR a single image file.""" tm = _get_task_manager(request) task = tm.create_task(f"OCR: {body.filename}") task.metadata = {"endpoint": "/api/processing/ocr-single", "body": body.dict()} file_path = _input_dir / body.filename if not file_path.is_file(): raise HTTPException(404, f"文件不存在: {body.filename}") async def _bg(): def do_work(): from app.services.ocr_service import OCRService svc = OCRService() tm.update_progress(task.id, 10, f"正在识别: {body.filename}") tm.add_log(task.id, f"[OCR] 处理 {body.filename}") try: svc.process_image(str(file_path)) # Find output stem = file_path.stem for ext in ['.xlsx', '.xls']: candidate = _output_dir / f"{stem}{ext}" if candidate.exists(): upsert_file_relation(input_image=body.filename, output_excel=candidate.name, status='ocr_done') _add_result_file(candidate.name) break tm.add_log(task.id, f"[OCR] 完成: {body.filename}") result_files = list(getattr(_tlocal, 'result_files', [])) tm.set_completed(task.id, result_files=result_files, message=f"OCR完成: {body.filename}") except Exception as e: tm.add_log(task.id, f"[OCR] 失败: {e}") tm.set_failed(task.id, str(e)) await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message=f"OCR任务已创建: {body.filename}") @router.post("/excel-single", response_model=TaskResponse) async def excel_single( request: Request, body: SingleFileRequest, current_user: dict = Depends(get_current_user), ): """Process a single Excel file to purchase order.""" tm = _get_task_manager(request) task = tm.create_task(f"Excel处理: {body.filename}") task.metadata = {"endpoint": "/api/processing/excel-single", "body": body.dict()} file_path = _output_dir / body.filename if not file_path.is_file(): raise HTTPException(404, f"文件不存在: {body.filename}") async def _bg(): def do_work(): from app.services.order_service import OrderService svc = OrderService() tm.update_progress(task.id, 10, f"正在处理: {body.filename}") tm.add_log(task.id, f"[Excel] 处理 {body.filename}") try: svc.process_excel(str(file_path)) result_name = f"采购单_{file_path.stem}.xls" if (_result_dir / result_name).exists(): upsert_file_relation(output_excel=body.filename, result_purchase=result_name, status='done') tm.add_log(task.id, f"[Excel] 完成: {body.filename}") result_files = list(getattr(_tlocal, 'result_files', [])) tm.set_completed(task.id, result_files=result_files, message=f"Excel处理完成: {body.filename}") except Exception as e: tm.add_log(task.id, f"[Excel] 失败: {e}") tm.set_failed(task.id, str(e)) await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message=f"Excel处理任务已创建: {body.filename}") @router.post("/pipeline-single", response_model=TaskResponse) async def pipeline_single( request: Request, body: SingleFileRequest, current_user: dict = Depends(get_current_user), ): """Full pipeline for a single image: OCR -> Excel -> Result (no merge).""" tm = _get_task_manager(request) task = tm.create_task(f"全流程: {body.filename}") task.metadata = {"endpoint": "/api/processing/pipeline-single", "body": body.dict()} file_path = _input_dir / body.filename if not file_path.is_file(): raise HTTPException(404, f"文件不存在: {body.filename}") async def _bg(): def do_work(): try: stem = file_path.stem # Step 1: OCR tm.update_progress(task.id, 10, "步骤 1/2: OCR识别") tm.add_log(task.id, f"[Pipeline] OCR: {body.filename}") from app.services.ocr_service import OCRService ocr_svc = OCRService() out_xlsx = _output_dir / f"{stem}.xlsx" out_xls = _output_dir / f"{stem}.xls" if out_xlsx.exists() or out_xls.exists(): out_name = out_xlsx.name if out_xlsx.exists() else out_xls.name tm.add_log(task.id, f"[跳过] 已OCR过 → {out_name}") upsert_file_relation(input_image=body.filename, output_excel=out_name, status='ocr_done') _add_result_file(out_name) else: ocr_svc.process_image(str(file_path)) for ext in ['.xlsx', '.xls']: candidate = _output_dir / f"{stem}{ext}" if candidate.exists(): upsert_file_relation(input_image=body.filename, output_excel=candidate.name, status='ocr_done') _add_result_file(candidate.name) break tm.add_log(task.id, f"[OCR] 完成") # Step 2: Excel tm.update_progress(task.id, 50, "步骤 2/2: Excel处理") tm.add_log(task.id, f"[Pipeline] Excel处理") from app.services.order_service import OrderService order_svc = OrderService() result_name = f"采购单_{stem}.xls" result_path = _result_dir / result_name if result_path.exists(): tm.add_log(task.id, f"[跳过] 已处理过 → {result_name}") upsert_file_relation(output_excel=f"{stem}.xlsx", result_purchase=result_name, status='done') else: # Find the output excel excel_file = out_xlsx if out_xlsx.exists() else (out_xls if out_xls.exists() else None) if excel_file: order_svc.process_excel(str(excel_file)) if result_path.exists(): upsert_file_relation(output_excel=excel_file.name, result_purchase=result_name, status='done') tm.add_log(task.id, f"[Excel] 完成") else: tm.add_log(task.id, f"[错误] OCR未生成Excel文件") result_files = list(getattr(_tlocal, 'result_files', [])) tm.set_completed(task.id, result_files=result_files, message=f"全流程完成: {body.filename}") except Exception as e: tb = traceback.format_exc() tm.add_log(task.id, f"[错误] {tb}") tm.set_failed(task.id, str(e)) await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message=f"全流程任务已创建: {body.filename}") @router.post("/merge-batch", response_model=TaskResponse) async def merge_batch( request: Request, body: MergeBatchRequest, current_user: dict = Depends(get_current_user), ): """Merge selected purchase order files into one PosPal template.""" tm = _get_task_manager(request) task = tm.create_task("批量合并采购单") task.metadata = {"endpoint": "/api/processing/merge-batch", "body": body.dict()} file_paths = [_result_dir / f for f in body.filenames if (_result_dir / f).is_file()] if not file_paths: raise HTTPException(400, "没有找到可合并的采购单文件") async def _bg(): def do_work(): from app.core.excel.merger import PurchaseOrderMerger tm.update_progress(task.id, 20, f"正在合并 {len(file_paths)} 个采购单...") tm.add_log(task.id, f"[合并] 合并文件: {', '.join(f.name for f in file_paths)}") try: from app.config.settings import ConfigManager merger = PurchaseOrderMerger(ConfigManager()) result = merger.process([str(f) for f in file_paths]) if result: merged_name = Path(result).name upsert_file_relation(result_purchase=merged_name, status='merged') tm.add_log(task.id, f"[合并] 完成: {merged_name}") tm.set_completed(task.id, result_files=[merged_name], message="批量合并完成") else: tm.set_failed(task.id, "合并返回空结果") except Exception as e: tm.add_log(task.id, f"[合并] 失败: {e}") tm.set_failed(task.id, str(e)) await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message="批量合并任务已创建") # --------------------------------------------------------------------------- # Status endpoint # --------------------------------------------------------------------------- @router.get("/status/{task_id}") async def get_task_status( task_id: str, request: Request, current_user: dict = Depends(get_current_user), ): tm = _get_task_manager(request) task = tm.get_task(task_id) if not task: raise HTTPException(404, "任务不存在") return task.to_dict()