Files
orc-order-v2/app/ui/action_handlers.py
houhuan e4d62df7e3 feat: 益选 OCR 订单处理系统初始提交
- 智能供应商识别(蓉城易购/烟草/杨碧月/通用)
- 百度 OCR 表格识别集成
- 规则引擎(列映射/数据清洗/单位转换/规格推断)
- 条码映射管理与云端同步(Gitea REST API)
- 云端同步支持:条码映射、供应商配置、商品资料、采购模板
- 拖拽一键处理(图片→OCR→Excel→合并)
- 191 个单元测试
- 移除无用的模板管理功能
- 清理 IDE 产物目录

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-04 19:51:13 +08:00

566 lines
24 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""业务操作处理模块"""
import os
import time
import datetime
import json
import logging
import tkinter as tk
from tkinter import messagebox
from threading import Thread
from app.config.settings import ConfigManager
from app.services.ocr_service import OCRService
from app.services.order_service import OrderService
from app.core.utils.log_utils import get_logger
from .logging_ui import add_to_log, init_gui_logger, dispose_gui_logger, GUILogHandler
from .ui_widgets import ProgressReporter
from .error_utils import show_error_dialog, get_error_suggestion
logger = get_logger(__name__)
from .result_previews import show_ocr_result_preview, show_excel_result_preview, show_merge_result_preview
from .user_settings import add_recent_file
from .command_runner import get_running_task, set_running_task
from .file_operations import select_file, select_excel_file, validate_unit_price_against_item_data
def _ask_and_merge_purchase_orders(order_service, log_widget, add_to_recent=False):
"""弹窗询问是否合并采购单,返回合并结果路径或 None。
用于 run_pipeline_directly 和 batch_process_orders_with_status 的共享逻辑。
"""
try:
purchase_orders = order_service.get_purchase_orders()
if len(purchase_orders) == 0:
add_to_log(log_widget, "没有找到采购单文件,跳过合并步骤\n", "info")
elif len(purchase_orders) == 1:
add_to_log(log_widget, f"只有1个采购单文件,无需合并: {os.path.basename(purchase_orders[0])}\n", "info")
else:
add_to_log(log_widget, f"找到{len(purchase_orders)}个采购单文件\n", "info")
file_list = "\n".join([f"{os.path.basename(f)}" for f in purchase_orders])
merge_choice = messagebox.askyesnocancel(
"采购单合并选择",
f"发现{len(purchase_orders)}个采购单文件:\n\n{file_list}\n\n是否需要合并这些采购单?\n\n• 选择'':合并所有采购单\n• 选择'':保持文件分离\n• 选择'取消':跳过此步骤",
icon='question'
)
if merge_choice is True:
add_to_log(log_widget, "开始合并采购单...\n", "info")
merge_result = order_service.merge_all_purchase_orders()
if merge_result:
add_to_log(log_widget, "采购单合并完成\n", "success")
if add_to_recent:
try:
add_recent_file(merge_result)
except Exception as e:
logger.debug(f"添加最近文件失败: {e}")
return merge_result
else:
add_to_log(log_widget, "合并失败\n", "warning")
elif merge_choice is False:
add_to_log(log_widget, "用户选择不合并采购单,保持文件分离\n", "info")
else:
add_to_log(log_widget, "用户取消合并操作\n", "info")
except Exception as e:
add_to_log(log_widget, f"合并过程出现问题: {str(e)}\n", "warning")
return None
def process_single_image_with_status(log_widget, status_bar):
status_bar.set_status("选择图片中...")
file_path = select_file(log_widget, [("图片文件", "*.jpg *.jpeg *.png *.bmp"), ("所有文件", "*.*")], "选择图片")
if not file_path:
status_bar.set_status("操作已取消")
add_to_log(log_widget, "未选择文件,操作已取消\n", "warning")
return
def run_in_thread():
try:
status_bar.set_running(True)
status_bar.set_status("开始处理图片...")
gui_handler = GUILogHandler(log_widget)
gui_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
gui_handler.setFormatter(formatter)
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
if isinstance(handler, logging.StreamHandler):
root_logger.removeHandler(handler)
root_logger.addHandler(gui_handler)
root_logger.setLevel(logging.INFO)
ocr_service = OCRService()
add_to_log(log_widget, f"开始处理图片: {file_path}\n", "info")
try:
add_recent_file(file_path)
except Exception as e:
logger.debug(f"添加最近文件失败: {e}")
excel_path = ocr_service.process_image(file_path)
if excel_path:
add_to_log(log_widget, "图片OCR处理完成\n", "success")
preview_output = f"采购单已保存到: {excel_path}\n"
show_excel_result_preview(preview_output)
try:
add_recent_file(excel_path)
except Exception as e:
logger.debug(f"添加最近文件失败: {e}")
else:
add_to_log(log_widget, "图片OCR处理失败\n", "error")
except Exception as e:
add_to_log(log_widget, f"处理单个图片时出错: {str(e)}\n", "error")
sugg = get_error_suggestion(str(e))
if sugg:
show_error_dialog("OCR处理错误", str(e), sugg)
finally:
try:
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
if isinstance(handler, GUILogHandler):
root_logger.removeHandler(handler)
handler.close()
except Exception as e:
logger.debug(f"清理日志处理器失败: {e}")
status_bar.set_running(False)
status_bar.set_status("就绪")
thread = Thread(target=run_in_thread)
thread.daemon = True
thread.start()
def run_pipeline_directly(log_widget, status_bar):
"""直接运行完整处理流程"""
if get_running_task() is not None:
messagebox.showinfo("任务进行中", "请等待当前任务完成后再执行新的操作。")
return
def run_in_thread():
set_running_task("pipeline")
if status_bar:
status_bar.set_running(True)
status_bar.set_status("开始完整处理流程...")
start_time = datetime.datetime.now()
start_perf = time.perf_counter()
log_widget.configure(state=tk.NORMAL)
log_widget.delete(1.0, tk.END)
log_widget.insert(tk.END, "执行命令: 完整处理流程\n", "command")
log_widget.insert(tk.END, f"开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n", "time")
log_widget.insert(tk.END, "=" * 50 + "\n\n", "separator")
log_widget.configure(state=tk.DISABLED)
try:
config = ConfigManager()
gui_handler = init_gui_logger(log_widget)
ocr_service = OCRService(config)
order_service = OrderService(config)
reporter = ProgressReporter(status_bar)
reporter.running()
reporter.set("开始OCR批量处理...", 10)
total, success = ocr_service.batch_process(progress_cb=lambda p: reporter.set("OCR处理中...", p))
if total == 0:
add_to_log(log_widget, "没有找到需要处理的图片\n", "warning")
if status_bar:
status_bar.set_status("未找到图片文件")
return
elif success == 0:
add_to_log(log_widget, "OCR处理没有成功处理任何新文件\n", "warning")
else:
add_to_log(log_widget, f"OCR处理完成,共处理 {success}/{total} 个文件\n", "success")
try:
processed_map = {}
config = ConfigManager()
pjson = config.get('Paths', 'processed_record', fallback='data/processed_files.json')
if os.path.exists(pjson):
with open(pjson, 'r', encoding='utf-8') as f:
processed_map = json.load(f)
outputs = list(processed_map.values())
for p in outputs[-10:]:
if p:
add_recent_file(os.path.abspath(p))
except Exception as e:
logger.debug(f"加载已处理文件记录失败: {e}")
reporter.set("开始Excel处理...", 92)
add_to_log(log_widget, "开始Excel处理...\n", "info")
result = order_service.process_excel()
if not result:
add_to_log(log_widget, "Excel处理失败\n", "error")
else:
add_to_log(log_widget, "Excel处理完成\n", "success")
try:
add_recent_file(result)
except Exception as e:
logger.debug(f"添加最近文件失败: {e}")
try:
validate_unit_price_against_item_data(result, log_widget)
except Exception as e:
logger.debug(f"单价校验失败: {e}")
reporter.set("检查是否需要合并采购单...", 80)
_ask_and_merge_purchase_orders(order_service, log_widget, add_to_recent=True)
end_time = datetime.datetime.now()
duration_sec = max(0.0, time.perf_counter() - start_perf)
add_to_log(log_widget, f"\n{'=' * 50}\n", "separator")
add_to_log(log_widget, "完整处理流程执行完毕!\n", "success")
add_to_log(log_widget, f"结束时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n", "time")
add_to_log(log_widget, f"耗时: {duration_sec:.2f}\n", "time")
reporter.set("处理完成", 100)
except Exception as e:
add_to_log(log_widget, f"执行过程中发生错误: {str(e)}\n", "error")
import traceback
add_to_log(log_widget, f"详细错误信息: {traceback.format_exc()}\n", "error")
finally:
dispose_gui_logger()
reporter.done()
set_running_task(None)
if status_bar:
status_bar.set_running(False)
status_bar.set_status("就绪")
thread = Thread(target=run_in_thread)
thread.daemon = True
thread.start()
def batch_ocr_with_status(log_widget, status_bar):
"""OCR批量识别"""
def run_in_thread():
try:
reporter = ProgressReporter(status_bar)
reporter.running()
reporter.set("正在进行OCR批量识别...", 10)
add_to_log(log_widget, "开始OCR批量识别\n", "info")
init_gui_logger(log_widget)
ocr_service = OCRService()
result = ocr_service.batch_process()
if result:
add_to_log(log_widget, "OCR批量识别完成\n", "success")
show_ocr_result_preview("OCR批量识别成功完成")
reporter.set("批量识别完成", 100)
try:
processed_map = {}
config = ConfigManager()
pjson = config.get('Paths', 'processed_record', fallback='data/processed_files.json')
if os.path.exists(pjson):
with open(pjson, 'r', encoding='utf-8') as f:
processed_map = json.load(f)
outputs = list(processed_map.values())
for p in outputs[-10:]:
if p:
add_recent_file(p)
inputs = list(processed_map.keys())
for p in inputs[-10:]:
if p:
add_recent_file(p)
except Exception as e:
logger.debug(f"加载已处理文件记录失败: {e}")
else:
add_to_log(log_widget, "OCR批量识别失败\n", "error")
except Exception as e:
add_to_log(log_widget, f"OCR批量识别出错: {str(e)}\n", "error")
sugg = get_error_suggestion(str(e))
if sugg:
show_error_dialog("OCR处理错误", str(e), sugg)
finally:
dispose_gui_logger()
reporter.done()
thread = Thread(target=run_in_thread)
thread.daemon = True
thread.start()
def batch_process_orders_with_status(log_widget, status_bar):
"""批量处理订单(仅Excel处理,包含合并确认)"""
def run_in_thread():
try:
reporter = ProgressReporter(status_bar)
reporter.running()
reporter.set("正在批量处理订单...", 10)
add_to_log(log_widget, "开始批量处理订单\n", "info")
init_gui_logger(log_widget)
order_service = OrderService()
add_to_log(log_widget, "开始Excel处理...\n", "info")
try:
latest_input = order_service.get_latest_excel()
if latest_input:
add_recent_file(latest_input)
except Exception as e:
logger.debug(f"获取最新Excel失败: {e}")
result = order_service.process_excel(progress_cb=lambda p: reporter.set("Excel处理中...", p))
if result:
add_to_log(log_widget, "Excel处理完成\n", "success")
try:
validate_unit_price_against_item_data(result, log_widget)
except Exception as e:
logger.debug(f"单价校验失败: {e}")
reporter.set("检查是否需要合并采购单...", 70)
add_to_log(log_widget, "检查是否需要合并采购单...\n", "info")
_ask_and_merge_purchase_orders(order_service, log_widget)
add_to_log(log_widget, "批量处理订单完成\n", "success")
reporter.set("批量处理订单完成", 100)
show_excel_result_preview(f"采购单已保存到: {result}\n")
try:
add_recent_file(result)
except Exception as e:
logger.debug(f"添加最近文件失败: {e}")
else:
add_to_log(log_widget, "批量处理订单失败\n", "error")
except Exception as e:
add_to_log(log_widget, f"批量处理订单时出错: {str(e)}\n", "error")
sugg = get_error_suggestion(str(e))
if sugg:
show_error_dialog("Excel处理错误", str(e), sugg)
finally:
dispose_gui_logger()
reporter.done()
thread = Thread(target=run_in_thread)
thread.daemon = True
thread.start()
def merge_orders_with_status(log_widget, status_bar):
"""合并采购单"""
def run_in_thread():
try:
reporter = ProgressReporter(status_bar)
reporter.running()
reporter.set("正在合并采购单...", 10)
add_to_log(log_widget, "开始合并采购单\n", "info")
init_gui_logger(log_widget)
order_service = OrderService()
result = order_service.merge_all_purchase_orders(progress_cb=lambda p: reporter.set("合并处理中...", p))
if result:
add_to_log(log_widget, "采购单合并完成\n", "success")
show_merge_result_preview(f"已保存到: {result}\n")
try:
add_recent_file(result)
except Exception as e:
logger.debug(f"添加最近文件失败: {e}")
try:
validate_unit_price_against_item_data(result, log_widget)
except Exception as e:
logger.debug(f"单价校验失败: {e}")
else:
add_to_log(log_widget, "采购单合并失败\n", "error")
except Exception as e:
add_to_log(log_widget, f"采购单合并出错: {str(e)}\n", "error")
sugg = get_error_suggestion(str(e))
if sugg:
show_error_dialog("合并错误", str(e), sugg)
finally:
dispose_gui_logger()
reporter.done()
thread = Thread(target=run_in_thread)
thread.daemon = True
thread.start()
def process_excel_file_with_status(log_widget, status_bar):
"""处理Excel文件"""
def run_in_thread():
try:
status_bar.set_running(True)
status_bar.set_status("选择Excel文件中...")
file_path = select_excel_file(log_widget)
if file_path:
status_bar.set_status("开始处理Excel文件...")
add_to_log(log_widget, f"开始处理Excel文件: {file_path}\n", "info")
else:
status_bar.set_status("操作已取消")
add_to_log(log_widget, "未选择文件,操作已取消\n", "warning")
return
init_gui_logger(log_widget)
order_service = OrderService()
if file_path:
try:
add_recent_file(file_path)
except Exception as e:
logger.debug(f"添加最近文件失败: {e}")
result = order_service.process_excel(file_path, progress_cb=lambda p: status_bar.set_status("Excel处理中...", p))
else:
try:
latest_input = order_service.get_latest_excel()
if latest_input:
add_recent_file(latest_input)
except Exception as e:
logger.debug(f"获取最新Excel失败: {e}")
result = order_service.process_excel(progress_cb=lambda p: status_bar.set_status("Excel处理中...", p))
if result:
add_to_log(log_widget, "Excel文件处理完成\n", "success")
show_excel_result_preview(f"采购单已保存到: {result}\n")
try:
add_recent_file(result)
except Exception as e:
logger.debug(f"添加最近文件失败: {e}")
try:
validate_unit_price_against_item_data(result, log_widget)
except Exception as e:
logger.debug(f"单价校验失败: {e}")
else:
add_to_log(log_widget, "Excel文件处理失败\n", "error")
except Exception as e:
add_to_log(log_widget, f"Excel文件处理出错: {str(e)}\n", "error")
msg = str(e)
suggestion = None
if 'openpyxl' in msg or 'engine' in msg:
suggestion = "安装依赖:pip install openpyxl"
elif 'xlrd' in msg:
suggestion = "安装依赖:pip install xlrd"
if suggestion:
show_error_dialog("Excel处理错误", msg, suggestion)
finally:
dispose_gui_logger()
status_bar.set_running(False)
status_bar.set_status("就绪")
thread = Thread(target=run_in_thread)
thread.daemon = True
thread.start()
def process_dropped_file(log_widget, status_bar, file_path):
try:
ext = os.path.splitext(file_path)[1].lower()
if ext in ['.jpg', '.jpeg', '.png', '.bmp']:
def _run_img():
try:
reporter = ProgressReporter(status_bar)
reporter.running()
init_gui_logger(log_widget)
add_to_log(log_widget, f"开始一键处理图片: {file_path}\n", "info")
try:
add_recent_file(file_path)
except Exception as e:
logger.debug(f"添加最近文件失败: {e}")
# 步骤1: OCR识别
reporter.set("OCR识别中...", 10)
ocr_service = OCRService()
excel_path = ocr_service.process_image(file_path)
if not excel_path:
add_to_log(log_widget, "图片OCR处理失败\n", "error")
return
add_to_log(log_widget, f"OCR识别完成: {excel_path}\n", "success")
# 步骤2: Excel处理
reporter.set("Excel处理中...", 40)
order_service = OrderService()
result = order_service.process_excel(excel_path, progress_cb=lambda p: reporter.set("Excel处理中...", p))
if not result:
add_to_log(log_widget, "Excel处理失败\n", "error")
return
add_to_log(log_widget, f"Excel处理完成: {result}\n", "success")
try:
add_recent_file(result)
except Exception as e:
logger.debug(f"添加最近文件失败: {e}")
try:
validate_unit_price_against_item_data(result, log_widget)
except Exception as e:
logger.debug(f"单价校验失败: {e}")
# 步骤3: 合并采购单
reporter.set("检查合并采购单...", 80)
_ask_and_merge_purchase_orders(order_service, log_widget, add_to_recent=True)
reporter.set("处理完成", 100)
add_to_log(log_widget, "一键处理完成!\n", "success")
finally:
dispose_gui_logger()
reporter.done()
t = Thread(target=_run_img)
t.daemon = True
t.start()
elif ext in ['.xlsx', '.xls']:
def _run_xls():
try:
reporter = ProgressReporter(status_bar)
reporter.running()
init_gui_logger(log_widget)
order_service = OrderService()
add_to_log(log_widget, f"开始一键处理Excel文件: {file_path}\n", "info")
try:
add_recent_file(file_path)
except Exception as e:
logger.debug(f"添加最近文件失败: {e}")
# 步骤1: Excel处理
reporter.set("Excel处理中...", 20)
result = order_service.process_excel(file_path, progress_cb=lambda p: reporter.set("Excel处理中...", p))
if not result:
add_to_log(log_widget, "Excel文件处理失败\n", "error")
return
add_to_log(log_widget, f"Excel处理完成: {result}\n", "success")
try:
add_recent_file(result)
except Exception as e:
logger.debug(f"添加最近文件失败: {e}")
try:
validate_unit_price_against_item_data(result, log_widget)
except Exception as e:
logger.debug(f"单价校验失败: {e}")
# 步骤2: 合并采购单
reporter.set("检查合并采购单...", 80)
_ask_and_merge_purchase_orders(order_service, log_widget, add_to_recent=True)
reporter.set("处理完成", 100)
add_to_log(log_widget, "一键处理完成!\n", "success")
finally:
dispose_gui_logger()
reporter.done()
t = Thread(target=_run_xls)
t.daemon = True
t.start()
else:
add_to_log(log_widget, f"不支持的文件类型: {file_path}\n", "warning")
except Exception as e:
add_to_log(log_widget, f"处理拖拽文件失败: {str(e)}\n", "error")