orc-order-v2/启动器.py

1974 lines
81 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
益选-OCR订单处理系统启动器
-----------------
提供简单的图形界面,方便用户选择功能
"""
import os
import sys
import time
import subprocess
import shutil
import tkinter as tk
from tkinter import messagebox, filedialog, scrolledtext, ttk
from tkinter import font as tkfont
from threading import Thread
import datetime
import json
import re
import logging
from typing import Dict, List, Optional, Any
# 导入自定义对话框工具
from app.core.utils.dialog_utils import show_custom_dialog, show_barcode_mapping_dialog, show_config_dialog
from app.core.excel.converter import UnitConverter
from app.config.settings import ConfigManager
# 导入服务类
from app.services.ocr_service import OCRService
from app.services.order_service import OrderService
from app.services.tobacco_service import TobaccoService
# 全局变量,用于跟踪任务状态
RUNNING_TASK = None
THEME_MODE = "light" # 默认浅色主题
# config_manager = ConfigManager() # 创建配置管理器实例 - 延迟初始化
# 定义浅色和深色主题颜色
THEMES = {
"light": {
"bg": "#f8f9fa",
"fg": "#212529",
"button_bg": "#ffffff",
"button_fg": "#495057",
"button_hover": "#e9ecef",
"primary_bg": "#007bff",
"primary_fg": "#ffffff",
"secondary_bg": "#6c757d",
"secondary_fg": "#ffffff",
"log_bg": "#ffffff",
"log_fg": "#212529",
"highlight_bg": "#007bff",
"highlight_fg": "#ffffff",
"border": "#dee2e6",
"success": "#28a745",
"error": "#dc3545",
"warning": "#ffc107",
"info": "#17a2b8",
"card_bg": "#ffffff",
"shadow": "#00000010"
},
"dark": {
"bg": "#1a1a1a",
"fg": "#e9ecef",
"button_bg": "#343a40",
"button_fg": "#e9ecef",
"button_hover": "#495057",
"primary_bg": "#0d6efd",
"primary_fg": "#ffffff",
"secondary_bg": "#6c757d",
"secondary_fg": "#ffffff",
"log_bg": "#212529",
"log_fg": "#e9ecef",
"highlight_bg": "#0d6efd",
"highlight_fg": "#ffffff",
"border": "#495057",
"success": "#198754",
"error": "#dc3545",
"warning": "#ffc107",
"info": "#0dcaf0",
"card_bg": "#2d3748",
"shadow": "#00000030"
}
}
class StatusBar(tk.Frame):
"""状态栏,显示当前系统状态和进度"""
def __init__(self, master, **kwargs):
super().__init__(master, **kwargs)
self.configure(height=25, relief=tk.SUNKEN, borderwidth=1)
# 状态标签
self.status_label = tk.Label(self, text="就绪", anchor=tk.W, padx=5)
self.status_label.pack(side=tk.LEFT, fill=tk.X, expand=True)
# 进度条
self.progress = ttk.Progressbar(self, orient=tk.HORIZONTAL, length=200, mode='determinate')
self.progress.pack(side=tk.RIGHT, padx=5, pady=2)
# 隐藏进度条(初始状态)
self.progress.pack_forget()
def set_status(self, text, progress=None):
"""设置状态栏文本和进度"""
self.status_label.config(text=text)
if progress is not None and 0 <= progress <= 100:
self.progress.pack(side=tk.RIGHT, padx=5, pady=2)
self.progress.config(value=progress)
else:
self.progress.pack_forget()
def set_running(self, is_running=True):
"""设置运行状态"""
if is_running:
self.status_label.config(text="处理中...", foreground=THEMES[THEME_MODE]["info"])
self.progress.pack(side=tk.RIGHT, padx=5, pady=2)
self.progress.config(mode='indeterminate')
self.progress.start()
else:
self.status_label.config(text="就绪", foreground=THEMES[THEME_MODE]["fg"])
self.progress.stop()
self.progress.pack_forget()
def run_command_with_logging(command, log_widget, status_bar=None, on_complete=None):
"""运行命令并将输出重定向到日志窗口"""
global RUNNING_TASK
# 如果已有任务在运行,提示用户
if RUNNING_TASK is not None:
messagebox.showinfo("任务进行中", "请等待当前任务完成后再执行新的操作。")
return
def run_in_thread():
global RUNNING_TASK
RUNNING_TASK = command
# 更新状态栏
if status_bar:
status_bar.set_running(True)
# 记录命令开始执行的时间
start_time = datetime.datetime.now()
log_widget.configure(state=tk.NORMAL)
log_widget.delete(1.0, tk.END) # 清空之前的日志
log_widget.insert(tk.END, f"执行命令: {' '.join(command)}\n", "command")
log_widget.insert(tk.END, f"开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n", "time")
log_widget.insert(tk.END, "=" * 50 + "\n\n", "separator")
log_widget.configure(state=tk.DISABLED)
# 获取原始的stdout和stderr
old_stdout = sys.stdout
old_stderr = sys.stderr
# 创建日志重定向器
log_redirector = LogRedirector(log_widget)
# 设置环境变量强制OCR模块输出到data目录
env = os.environ.copy()
env["OCR_OUTPUT_DIR"] = os.path.abspath("data/output")
env["OCR_INPUT_DIR"] = os.path.abspath("data/input")
env["OCR_LOG_LEVEL"] = "DEBUG" # 设置更详细的日志级别
try:
# 重定向stdout和stderr到日志重定向器
sys.stdout = log_redirector
sys.stderr = log_redirector
# 打印一条消息,确认重定向已生效
print("日志重定向已启动现在同时输出到终端和GUI")
# 运行命令并捕获输出
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
env=env
)
output_data = []
# 读取并显示输出
for line in process.stdout:
output_data.append(line)
print(line.rstrip()) # 直接打印到已重定向的stdout
# 尝试从输出中提取进度信息
if status_bar:
progress = extract_progress_from_log(line)
if progress is not None:
log_widget.after(0, lambda p=progress: status_bar.set_status(f"处理中: {p}%完成", p))
# 等待进程结束
process.wait()
# 记录命令结束时间
end_time = datetime.datetime.now()
duration = end_time - start_time
print(f"\n{'=' * 50}")
print(f"执行完毕!返回码: {process.returncode}")
print(f"结束时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"耗时: {duration.total_seconds():.2f}")
# 获取输出内容
output_text = ''.join(output_data)
# 检查是否是完整流程命令且遇到了"未找到可合并的文件"的情况
is_pipeline = "pipeline" in command
no_merge_files = "未找到采购单文件" in output_text
single_file = "只有1个采购单文件" in output_text
# 如果是完整流程且只是没有找到可合并文件或只有一个文件,则视为成功
if is_pipeline and (no_merge_files or single_file):
print("完整流程中没有需要合并的文件,但其他步骤执行成功,视为成功完成")
if status_bar:
log_widget.after(0, lambda: status_bar.set_status("处理完成", 100))
log_widget.after(0, lambda: show_result_preview(command, output_text))
else:
# 执行完成后处理结果
if on_complete:
log_widget.after(0, lambda: on_complete(process.returncode, output_text))
# 如果处理成功且没有指定on_complete回调函数则显示默认成功信息
elif process.returncode == 0:
if status_bar:
log_widget.after(0, lambda: status_bar.set_status("处理完成", 100))
log_widget.after(0, lambda: show_result_preview(command, output_text))
else:
if status_bar:
log_widget.after(0, lambda: status_bar.set_status(f"处理失败 (返回码: {process.returncode})", 0))
log_widget.after(0, lambda: messagebox.showerror("操作失败", f"处理失败,返回码:{process.returncode}"))
except Exception as e:
print(f"\n执行出错: {str(e)}")
if status_bar:
log_widget.after(0, lambda: status_bar.set_status(f"执行出错: {str(e)}", 0))
log_widget.after(0, lambda: messagebox.showerror("执行错误", f"执行命令时出错: {str(e)}"))
finally:
# 恢复原始stdout和stderr
sys.stdout = old_stdout
sys.stderr = old_stderr
# 任务完成,重置状态
RUNNING_TASK = None
if status_bar:
log_widget.after(0, lambda: status_bar.set_running(False))
# 在新线程中运行避免UI阻塞
Thread(target=run_in_thread).start()
def extract_progress_from_log(log_line):
"""从日志行中提取进度信息"""
# 尝试匹配"处理批次 x/y"格式的进度信息
batch_match = re.search(r'处理批次 (\d+)/(\d+)', log_line)
if batch_match:
current = int(batch_match.group(1))
total = int(batch_match.group(2))
return int(current / total * 100)
# 尝试匹配百分比格式
percent_match = re.search(r'(\d+)%', log_line)
if percent_match:
return int(percent_match.group(1))
return None
def show_result_preview(command, output):
"""显示处理结果预览"""
# 根据命令类型提取不同的结果信息
if "ocr" in command:
show_ocr_result_preview(output)
elif "excel" in command:
show_excel_result_preview(output)
elif "merge" in command:
show_merge_result_preview(output)
elif "pipeline" in command:
show_pipeline_result_preview(output)
else:
messagebox.showinfo("处理完成", "操作已成功完成!\n请在data/output目录查看结果。")
def show_ocr_result_preview(output):
"""显示OCR处理结果预览"""
# 提取处理的文件数量
files_match = re.search(r'找到 (\d+) 个图片文件,其中 (\d+) 个未处理', output)
processed_match = re.search(r'所有图片处理完成, 总计: (\d+), 成功: (\d+)', output)
if processed_match:
total = int(processed_match.group(1))
success = int(processed_match.group(2))
# 创建结果预览对话框
preview = tk.Toplevel()
preview.title("OCR处理结果")
preview.geometry("400x300")
preview.resizable(False, False)
# 居中显示
center_window(preview)
# 添加内容
tk.Label(preview, text="OCR处理完成", font=("Arial", 16, "bold")).pack(pady=10)
result_frame = tk.Frame(preview)
result_frame.pack(pady=10, fill=tk.BOTH, expand=True)
tk.Label(result_frame, text=f"总共处理: {total} 个文件", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
tk.Label(result_frame, text=f"成功处理: {success} 个文件", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
tk.Label(result_frame, text=f"失败数量: {total - success} 个文件", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
# 处理结果评估
if success == total:
result_text = "全部处理成功!"
result_color = "#28a745"
elif success > total * 0.8:
result_text = "大部分处理成功。"
result_color = "#ffc107"
else:
result_text = "处理失败较多,请检查日志。"
result_color = "#dc3545"
tk.Label(result_frame, text=result_text, font=("Arial", 12, "bold"), fg=result_color).pack(pady=10)
# 添加按钮
button_frame = tk.Frame(preview)
button_frame.pack(pady=10)
tk.Button(button_frame, text="查看输出文件", command=lambda: os.startfile(os.path.abspath("data/output"))).pack(side=tk.LEFT, padx=10)
tk.Button(button_frame, text="关闭", command=preview.destroy).pack(side=tk.LEFT, padx=10)
else:
messagebox.showinfo("OCR处理完成", "OCR处理已完成请在data/output目录查看结果。")
def show_excel_result_preview(output):
"""显示Excel处理结果预览"""
# 提取处理的Excel信息
extract_match = re.search(r'提取到 (\d+) 个商品信息', output)
file_match = re.search(r'采购单已保存到: (.+?)(?:\n|$)', output)
if extract_match and file_match:
products_count = int(extract_match.group(1))
output_file = file_match.group(1)
# 创建结果预览对话框
preview = tk.Toplevel()
preview.title("Excel处理结果")
preview.geometry("450x320")
preview.resizable(False, False)
# 使弹窗居中显示
center_window(preview)
# 添加内容
tk.Label(preview, text="Excel处理完成", font=("Arial", 16, "bold")).pack(pady=10)
result_frame = tk.Frame(preview)
result_frame.pack(pady=10, fill=tk.BOTH, expand=True)
tk.Label(result_frame, text=f"提取商品数量: {products_count}", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
tk.Label(result_frame, text=f"输出文件: {os.path.basename(output_file)}", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
# 处理成功提示
tk.Label(result_frame, text="采购单已成功生成!", font=("Arial", 12, "bold"), fg="#28a745").pack(pady=10)
# 文件信息框
file_frame = tk.Frame(result_frame, relief=tk.GROOVE, borderwidth=1)
file_frame.pack(fill=tk.X, padx=15, pady=5)
tk.Label(file_frame, text="文件信息", font=("Arial", 10, "bold")).pack(anchor=tk.W, padx=10, pady=5)
# 获取文件大小和时间
try:
file_size = os.path.getsize(output_file)
file_time = datetime.datetime.fromtimestamp(os.path.getmtime(output_file))
size_text = f"{file_size / 1024:.1f} KB" if file_size < 1024*1024 else f"{file_size / (1024*1024):.1f} MB"
tk.Label(file_frame, text=f"文件大小: {size_text}", font=("Arial", 10)).pack(anchor=tk.W, padx=10, pady=2)
tk.Label(file_frame, text=f"创建时间: {file_time.strftime('%Y-%m-%d %H:%M:%S')}", font=("Arial", 10)).pack(anchor=tk.W, padx=10, pady=2)
except:
tk.Label(file_frame, text="无法获取文件信息", font=("Arial", 10)).pack(anchor=tk.W, padx=10, pady=2)
# 添加按钮
button_frame = tk.Frame(preview)
button_frame.pack(pady=10)
tk.Button(button_frame, text="打开文件", command=lambda: os.startfile(output_file)).pack(side=tk.LEFT, padx=5)
tk.Button(button_frame, text="打开所在文件夹", command=lambda: os.startfile(os.path.dirname(output_file))).pack(side=tk.LEFT, padx=5)
tk.Button(button_frame, text="关闭", command=preview.destroy).pack(side=tk.LEFT, padx=5)
else:
messagebox.showinfo("Excel处理完成", "Excel处理已完成请在data/output目录查看结果。")
def show_merge_result_preview(output):
"""显示合并结果预览"""
# 提取合并信息
merged_match = re.search(r'合并了 (\d+) 个采购单', output)
product_match = re.search(r'共处理 (\d+) 个商品', output)
output_match = re.search(r'已保存到: (.+?)(?:\n|$)', output)
if merged_match and output_match:
merged_count = int(merged_match.group(1))
product_count = int(product_match.group(1)) if product_match else 0
output_file = output_match.group(1)
# 创建结果预览对话框
preview = tk.Toplevel()
preview.title("采购单合并结果")
preview.geometry("450x300")
preview.resizable(False, False)
# 设置主题
apply_theme(preview)
# 添加内容
tk.Label(preview, text="采购单合并完成", font=("Arial", 16, "bold")).pack(pady=10)
result_frame = tk.Frame(preview)
result_frame.pack(pady=10, fill=tk.BOTH, expand=True)
tk.Label(result_frame, text=f"合并采购单数量: {merged_count}", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
tk.Label(result_frame, text=f"处理商品数量: {product_count}", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
tk.Label(result_frame, text=f"输出文件: {os.path.basename(output_file)}", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
# 处理成功提示
tk.Label(result_frame, text="采购单已成功合并!", font=("Arial", 12, "bold"), fg=THEMES[THEME_MODE]["success"]).pack(pady=10)
# 添加按钮
button_frame = tk.Frame(preview)
button_frame.pack(pady=10)
tk.Button(button_frame, text="打开文件", command=lambda: os.startfile(output_file)).pack(side=tk.LEFT, padx=10)
tk.Button(button_frame, text="打开所在文件夹", command=lambda: os.startfile(os.path.dirname(output_file))).pack(side=tk.LEFT, padx=10)
tk.Button(button_frame, text="关闭", command=preview.destroy).pack(side=tk.LEFT, padx=10)
else:
messagebox.showinfo("采购单合并完成", "采购单合并已完成请在data/output目录查看结果。")
def show_pipeline_result_preview(output):
"""显示完整流程结果预览"""
# 提取关键信息
ocr_match = re.search(r'所有图片处理完成, 总计: (\d+), 成功: (\d+)', output)
excel_match = re.search(r'提取到 (\d+) 个商品信息', output)
output_file_match = re.search(r'采购单已保存到: (.+?)(?:\n|$)', output)
# 创建结果预览对话框
preview = tk.Toplevel()
preview.title("完整流程处理结果")
preview.geometry("500x400")
preview.resizable(False, False)
# 居中显示
center_window(preview)
# 添加内容
tk.Label(preview, text="完整处理流程已完成", font=("Arial", 16, "bold")).pack(pady=10)
# 添加处理结果提示(即使没有可合并文件也显示成功)
no_files_match = re.search(r'未找到可合并的文件', output)
if no_files_match:
tk.Label(preview, text="未找到可合并的文件,但其他步骤已成功执行", font=("Arial", 12)).pack(pady=0)
result_frame = tk.Frame(preview)
result_frame.pack(pady=10, fill=tk.BOTH, expand=True)
# 创建多行结果区域
result_text = scrolledtext.ScrolledText(result_frame, wrap=tk.WORD, height=15, width=60)
result_text.pack(fill=tk.BOTH, expand=True, padx=15, pady=5)
result_text.configure(state=tk.NORMAL)
# 填充结果文本
result_text.insert(tk.END, "===== 流程执行结果 =====\n\n", "title")
# OCR处理结果
result_text.insert(tk.END, "步骤1: OCR识别\n", "step")
if ocr_match:
total = int(ocr_match.group(1))
success = int(ocr_match.group(2))
result_text.insert(tk.END, f" 处理图片: {total}\n", "info")
result_text.insert(tk.END, f" 成功识别: {success}\n", "info")
if success == total:
result_text.insert(tk.END, " 结果: 全部识别成功\n", "success")
else:
result_text.insert(tk.END, f" 结果: 部分识别成功 ({success}/{total})\n", "warning")
else:
result_text.insert(tk.END, " 结果: 无OCR处理或处理信息不完整\n", "warning")
# Excel处理结果
result_text.insert(tk.END, "\n步骤2: Excel处理\n", "step")
if excel_match:
products = int(excel_match.group(1))
result_text.insert(tk.END, f" 提取商品: {products}\n", "info")
result_text.insert(tk.END, " 结果: 成功生成采购单\n", "success")
if output_file_match:
output_file = output_file_match.group(1)
result_text.insert(tk.END, f" 输出文件: {os.path.basename(output_file)}\n", "info")
else:
result_text.insert(tk.END, " 结果: 无Excel处理或处理信息不完整\n", "warning")
# 总体评估
result_text.insert(tk.END, "\n===== 整体评估 =====\n", "title")
has_errors = "错误" in output or "失败" in output
no_files_match = re.search(r'未找到采购单文件', output)
single_file_match = re.search(r'只有1个采购单文件', output)
if no_files_match:
result_text.insert(tk.END, "没有找到可合并的文件,但处理流程已成功完成。\n", "warning")
result_text.insert(tk.END, "可以选择打开Excel文件或查看输出文件夹。\n", "info")
elif single_file_match:
result_text.insert(tk.END, "只有一个采购单文件,无需合并,处理流程已成功完成。\n", "warning")
result_text.insert(tk.END, "可以选择打开生成的Excel文件。\n", "info")
elif ocr_match and excel_match and not has_errors:
result_text.insert(tk.END, "流程完整执行成功!\n", "success")
elif ocr_match or excel_match:
result_text.insert(tk.END, "流程部分执行成功,请检查日志获取详情。\n", "warning")
else:
result_text.insert(tk.END, "流程执行可能存在问题,请查看详细日志。\n", "error")
# 设置标签样式
result_text.tag_configure("title", font=("Arial", 12, "bold"))
result_text.tag_configure("step", font=("Arial", 11, "bold"))
result_text.tag_configure("info", font=("Arial", 10))
result_text.tag_configure("success", font=("Arial", 10, "bold"), foreground="#28a745")
result_text.tag_configure("warning", font=("Arial", 10, "bold"), foreground="#ffc107")
result_text.tag_configure("error", font=("Arial", 10, "bold"), foreground="#dc3545")
result_text.configure(state=tk.DISABLED)
# 添加按钮
button_frame = tk.Frame(preview)
button_frame.pack(pady=10)
if output_file_match:
output_file = output_file_match.group(1)
tk.Button(button_frame, text="打开Excel文件", command=lambda: os.startfile(output_file)).pack(side=tk.LEFT, padx=10)
else:
# 如果没有找到合并后的文件但Excel处理成功提供打开最新Excel文件的选项
if excel_match or no_files_match or single_file_match:
# 找到输出目录中最新的采购单Excel文件
output_dir = os.path.abspath("data/output")
excel_files = [f for f in os.listdir(output_dir) if f.startswith('采购单_') and (f.endswith('.xls') or f.endswith('.xlsx'))]
if excel_files:
# 按修改时间排序,获取最新的文件
excel_files.sort(key=lambda x: os.path.getmtime(os.path.join(output_dir, x)), reverse=True)
latest_file = os.path.join(output_dir, excel_files[0])
tk.Button(button_frame, text="打开最新Excel文件",
command=lambda: os.startfile(latest_file)).pack(side=tk.LEFT, padx=10)
tk.Button(button_frame, text="查看输出文件夹", command=lambda: os.startfile(os.path.abspath("data/output"))).pack(side=tk.LEFT, padx=10)
tk.Button(button_frame, text="关闭", command=preview.destroy).pack(side=tk.LEFT, padx=10)
def create_modern_button(parent, text, command, style="primary", width=None, height=None):
"""创建现代化样式的按钮"""
theme = THEMES[THEME_MODE]
if style == "primary":
bg_color = "white" # 白色背景
fg_color = theme["primary_bg"] # 蓝色文字
hover_color = "#f0f8ff" # 浅蓝色悬停
border_color = theme["primary_bg"] # 蓝色边框
elif style == "secondary":
bg_color = theme["secondary_bg"]
fg_color = theme["secondary_fg"]
hover_color = theme["button_hover"]
border_color = theme["secondary_bg"]
else: # default
bg_color = "white" # 白色背景
fg_color = theme["primary_bg"] # 蓝色文字
hover_color = "#f0f8ff" # 浅蓝色悬停
border_color = theme["primary_bg"] # 蓝色边框
# 创建一个Frame来包装按钮
button_frame = tk.Frame(parent, bg=border_color, highlightthickness=0)
button_frame.configure(relief="flat", bd=0)
# 创建实际的按钮
button = tk.Button(
button_frame,
text=text,
command=command,
bg=bg_color,
fg=fg_color,
font=("Microsoft YaHei UI", 9),
relief="flat",
bd=0,
padx=10,
pady=5,
cursor="hand2",
activebackground=hover_color,
activeforeground=fg_color
)
if width:
button.configure(width=width)
if height:
button.configure(height=height)
button.pack(fill=tk.BOTH, expand=True, padx=1, pady=1)
# 添加悬停效果
def on_enter(e):
button.configure(bg=hover_color)
def on_leave(e):
button.configure(bg=bg_color)
button.bind("<Enter>", on_enter)
button.bind("<Leave>", on_leave)
button_frame.bind("<Enter>", on_enter)
button_frame.bind("<Leave>", on_leave)
return button_frame
def create_card_frame(parent, title=None):
"""创建卡片样式的框架"""
theme = THEMES[THEME_MODE]
card = tk.Frame(
parent,
bg=theme["card_bg"],
relief="flat",
borderwidth=1,
highlightbackground=theme["border"],
highlightthickness=1
)
if title:
title_label = tk.Label(
card,
text=title,
bg=theme["card_bg"],
fg=theme["fg"],
font=("Microsoft YaHei UI", 12, "bold")
)
title_label.pack(pady=(10, 5))
return card
def apply_theme(widget, theme_mode=None):
"""应用主题到小部件"""
global THEME_MODE
if theme_mode is None:
theme_mode = THEME_MODE
theme = THEMES[theme_mode]
try:
widget.configure(bg=theme["bg"], fg=theme["fg"])
except:
pass
# 递归应用到所有子部件
for child in widget.winfo_children():
if isinstance(child, tk.Button) and not isinstance(child, ttk.Button):
child.configure(bg=theme["button_bg"], fg=theme["button_fg"])
elif isinstance(child, scrolledtext.ScrolledText):
child.configure(bg=theme["log_bg"], fg=theme["log_fg"])
else:
try:
child.configure(bg=theme["bg"], fg=theme["fg"])
except:
pass
# 递归处理子部件的子部件
apply_theme(child, theme_mode)
def ensure_directories():
"""确保必要的目录结构存在"""
directories = ["data/input", "data/output", "data/temp", "logs"]
for directory in directories:
if not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
print(f"创建目录: {directory}")
class LogRedirector:
"""日志重定向器,用于捕获命令输出并显示到界面"""
def __init__(self, text_widget):
self.text_widget = text_widget
self.buffer = ""
self.terminal = sys.__stdout__ # 保存原始的stdout引用
def write(self, string):
self.buffer += string
# 同时输出到终端
self.terminal.write(string)
# 在UI线程中更新文本控件
self.text_widget.after(0, self.update_text_widget)
def update_text_widget(self):
self.text_widget.configure(state=tk.NORMAL)
# 根据内容使用不同的标签
if self.buffer.strip():
# 检测不同类型的消息并应用相应样式
if any(marker in self.buffer.lower() for marker in ["错误", "error", "失败", "异常", "exception"]):
self.text_widget.insert(tk.END, self.buffer, "error")
elif any(marker in self.buffer.lower() for marker in ["警告", "warning"]):
self.text_widget.insert(tk.END, self.buffer, "warning")
elif any(marker in self.buffer.lower() for marker in ["成功", "success", "完成", "成功处理"]):
self.text_widget.insert(tk.END, self.buffer, "success")
elif any(marker in self.buffer.lower() for marker in ["info", "信息", "开始", "处理中"]):
self.text_widget.insert(tk.END, self.buffer, "info")
else:
self.text_widget.insert(tk.END, self.buffer, "normal")
else:
self.text_widget.insert(tk.END, self.buffer)
# 自动滚动到底部
self.text_widget.see(tk.END)
self.text_widget.configure(state=tk.DISABLED)
self.buffer = ""
def flush(self):
self.terminal.flush() # 确保终端也被刷新
class GUILogHandler(logging.Handler):
"""自定义日志处理器将日志输出到GUI界面"""
def __init__(self, text_widget):
super().__init__()
self.text_widget = text_widget
def emit(self, record):
try:
msg = self.format(record)
# 根据日志级别确定标签
if record.levelno >= logging.ERROR:
tag = "error"
elif record.levelno >= logging.WARNING:
tag = "warning"
elif record.levelno >= logging.INFO:
tag = "info"
else:
tag = "normal"
# 在UI线程中更新文本控件
self.text_widget.after(0, lambda: self._update_text_widget(msg + "\n", tag))
except Exception:
self.handleError(record)
def _update_text_widget(self, message, tag):
"""在UI线程中更新文本控件"""
self.text_widget.configure(state=tk.NORMAL)
self.text_widget.insert(tk.END, message, tag)
self.text_widget.see(tk.END)
self.text_widget.configure(state=tk.DISABLED)
def create_collapsible_frame(parent, title, initial_state=True):
"""创建可折叠的面板"""
frame = tk.Frame(parent)
frame.pack(fill=tk.X, pady=5)
# 标题栏
title_frame = tk.Frame(frame)
title_frame.pack(fill=tk.X)
# 折叠指示器
state_var = tk.BooleanVar(value=initial_state)
indicator = "" if initial_state else ""
state_label = tk.Label(title_frame, text=indicator, font=("Arial", 10, "bold"))
state_label.pack(side=tk.LEFT, padx=5)
# 标题
title_label = tk.Label(title_frame, text=title, font=("Arial", 11, "bold"))
title_label.pack(side=tk.LEFT, padx=5)
# 内容区域
content_frame = tk.Frame(frame)
if initial_state:
content_frame.pack(fill=tk.X, padx=20, pady=5)
# 点击事件处理函数
def toggle_collapse(event=None):
current_state = state_var.get()
new_state = not current_state
state_var.set(new_state)
# 更新指示器
state_label.config(text="" if new_state else "")
# 显示或隐藏内容
if new_state:
content_frame.pack(fill=tk.X, padx=20, pady=5)
else:
content_frame.pack_forget()
# 绑定点击事件
title_frame.bind("<Button-1>", toggle_collapse)
state_label.bind("<Button-1>", toggle_collapse)
title_label.bind("<Button-1>", toggle_collapse)
return content_frame, state_var
def process_single_image_with_status(log_widget, status_bar):
status_bar.set_status("选择图片中...")
file_path = select_file(log_widget)
if file_path:
status_bar.set_status("开始处理图片...")
run_command_with_logging(["python", "run.py", "ocr", "--input", file_path], log_widget, status_bar)
else:
status_bar.set_status("操作已取消")
add_to_log(log_widget, "未选择文件,操作已取消\n", "warning")
def run_pipeline_directly(log_widget, status_bar):
"""直接运行完整处理流程"""
global RUNNING_TASK
# 如果已有任务在运行,提示用户
if RUNNING_TASK is not None:
messagebox.showinfo("任务进行中", "请等待当前任务完成后再执行新的操作。")
return
def run_in_thread():
global RUNNING_TASK
RUNNING_TASK = "pipeline"
# 更新状态栏
if status_bar:
status_bar.set_running(True)
status_bar.set_status("开始完整处理流程...")
# 记录开始时间
start_time = datetime.datetime.now()
log_widget.configure(state=tk.NORMAL)
log_widget.delete(1.0, tk.END) # 清空之前的日志
log_widget.insert(tk.END, f"执行命令: 完整处理流程\n", "command")
log_widget.insert(tk.END, f"开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n", "time")
log_widget.insert(tk.END, "=" * 50 + "\n\n", "separator")
log_widget.configure(state=tk.DISABLED)
try:
# 直接调用main函数中的pipeline逻辑
from app.config.settings import ConfigManager
from app.services.ocr_service import OCRService
from app.services.order_service import OrderService
import logging
config = ConfigManager()
# 设置日志重定向到GUI
gui_handler = GUILogHandler(log_widget)
gui_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
gui_handler.setFormatter(formatter)
# 获取根日志记录器并添加GUI处理器
root_logger = logging.getLogger()
# 移除现有的控制台处理器,避免重复输出
for handler in root_logger.handlers[:]:
if isinstance(handler, logging.StreamHandler):
root_logger.removeHandler(handler)
root_logger.addHandler(gui_handler)
root_logger.setLevel(logging.INFO)
# 创建服务实例
ocr_service = OCRService(config)
order_service = OrderService(config)
add_to_log(log_widget, "开始OCR批量处理...\n", "info")
# 1. OCR批量处理
total, success = ocr_service.batch_process()
if total == 0:
add_to_log(log_widget, "没有找到需要处理的图片\n", "warning")
if status_bar:
status_bar.set_status("未找到图片文件")
return
elif success == 0:
add_to_log(log_widget, "OCR处理没有成功处理任何新文件\n", "warning")
else:
add_to_log(log_widget, f"OCR处理完成共处理 {success}/{total} 个文件\n", "success")
# 2. Excel处理
add_to_log(log_widget, "开始Excel处理...\n", "info")
result = order_service.process_excel()
if not result:
add_to_log(log_widget, "Excel处理失败\n", "error")
else:
add_to_log(log_widget, "Excel处理完成\n", "success")
# 3. 可选的合并步骤(如果有多个采购单)
add_to_log(log_widget, "检查是否需要合并采购单...\n", "info")
try:
# 先获取采购单文件列表
purchase_orders = order_service.get_purchase_orders()
if len(purchase_orders) == 0:
add_to_log(log_widget, "没有找到采购单文件,跳过合并步骤\n", "info")
elif len(purchase_orders) == 1:
add_to_log(log_widget, f"只有1个采购单文件无需合并: {os.path.basename(purchase_orders[0])}\n", "info")
else:
add_to_log(log_widget, f"找到{len(purchase_orders)}个采购单文件\n", "info")
# 弹窗询问是否进行合并
file_list = "\n".join([f"{os.path.basename(f)}" for f in purchase_orders])
merge_choice = messagebox.askyesnocancel(
"采购单合并选择",
f"发现{len(purchase_orders)}个采购单文件:\n\n{file_list}\n\n是否需要合并这些采购单?\n\n• 选择'':合并所有采购单\n• 选择'':保持文件分离\n• 选择'取消':跳过此步骤",
icon='question'
)
if merge_choice is True: # 用户选择合并
add_to_log(log_widget, "用户选择合并采购单,开始合并...\n", "info")
merge_result = order_service.merge_all_purchase_orders()
if merge_result:
add_to_log(log_widget, "采购单合并完成\n", "success")
else:
add_to_log(log_widget, "合并失败\n", "warning")
elif merge_choice is False: # 用户选择不合并
add_to_log(log_widget, "用户选择不合并采购单,保持文件分离\n", "info")
else: # 用户选择取消
add_to_log(log_widget, "用户取消合并操作,跳过合并步骤\n", "info")
except Exception as e:
add_to_log(log_widget, f"合并过程出现问题: {str(e)}\n", "warning")
# 记录结束时间
end_time = datetime.datetime.now()
duration = end_time - start_time
add_to_log(log_widget, f"\n{'=' * 50}\n", "separator")
add_to_log(log_widget, f"完整处理流程执行完毕!\n", "success")
add_to_log(log_widget, f"结束时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}\n", "time")
add_to_log(log_widget, f"耗时: {duration.total_seconds():.2f}\n", "time")
# 自动打开output目录
try:
output_dir = os.path.join(os.getcwd(), "output")
if os.path.exists(output_dir):
os.startfile(output_dir)
add_to_log(log_widget, "已自动打开output目录\n", "info")
except Exception as e:
add_to_log(log_widget, f"打开output目录失败: {str(e)}\n", "warning")
except Exception as e:
add_to_log(log_widget, f"执行过程中发生错误: {str(e)}\n", "error")
import traceback
add_to_log(log_widget, f"详细错误信息: {traceback.format_exc()}\n", "error")
finally:
# 清理日志处理器
try:
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
if isinstance(handler, GUILogHandler):
root_logger.removeHandler(handler)
handler.close()
except:
pass
RUNNING_TASK = None
if status_bar:
status_bar.set_running(False)
status_bar.set_status("就绪")
# 在新线程中运行
thread = Thread(target=run_in_thread)
thread.daemon = True
thread.start()
def batch_ocr_with_status(log_widget, status_bar):
"""OCR批量识别"""
def run_in_thread():
try:
status_bar.set_running(True)
status_bar.set_status("正在进行OCR批量识别...")
add_to_log(log_widget, "开始OCR批量识别\n", "info")
# 设置日志重定向到GUI
gui_handler = GUILogHandler(log_widget)
gui_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
gui_handler.setFormatter(formatter)
# 获取根日志记录器并添加GUI处理器
root_logger = logging.getLogger()
# 移除现有的控制台处理器,避免重复输出
for handler in root_logger.handlers[:]:
if isinstance(handler, logging.StreamHandler):
root_logger.removeHandler(handler)
root_logger.addHandler(gui_handler)
root_logger.setLevel(logging.INFO)
# 创建OCR服务实例
ocr_service = OCRService()
# 执行批量OCR处理
result = ocr_service.batch_process()
if result:
add_to_log(log_widget, "OCR批量识别完成\n", "success")
show_ocr_result_preview("OCR批量识别成功完成")
# 自动打开output目录
try:
output_dir = os.path.join(os.getcwd(), "output")
if os.path.exists(output_dir):
os.startfile(output_dir)
add_to_log(log_widget, "已自动打开output目录\n", "info")
except Exception as e:
add_to_log(log_widget, f"打开output目录失败: {str(e)}\n", "warning")
else:
add_to_log(log_widget, "OCR批量识别失败\n", "error")
except Exception as e:
add_to_log(log_widget, f"OCR批量识别出错: {str(e)}\n", "error")
finally:
# 清理日志处理器
try:
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
if isinstance(handler, GUILogHandler):
root_logger.removeHandler(handler)
handler.close()
except:
pass
status_bar.set_running(False)
status_bar.set_status("就绪")
# 在新线程中运行
thread = Thread(target=run_in_thread)
thread.daemon = True
thread.start()
def batch_process_orders_with_status(log_widget, status_bar):
"""批量处理订单仅Excel处理包含合并确认"""
def run_in_thread():
try:
status_bar.set_running(True)
status_bar.set_status("正在批量处理订单...")
add_to_log(log_widget, "开始批量处理订单\n", "info")
# 设置日志重定向到GUI
gui_handler = GUILogHandler(log_widget)
gui_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
gui_handler.setFormatter(formatter)
# 获取根日志记录器并添加GUI处理器
root_logger = logging.getLogger()
# 移除现有的控制台处理器,避免重复输出
for handler in root_logger.handlers[:]:
if isinstance(handler, logging.StreamHandler):
root_logger.removeHandler(handler)
root_logger.addHandler(gui_handler)
root_logger.setLevel(logging.INFO)
# 创建订单服务实例
order_service = OrderService()
# 执行Excel处理
add_to_log(log_widget, "开始Excel处理...\n", "info")
result = order_service.process_excel()
if result:
add_to_log(log_widget, "Excel处理完成\n", "success")
# 检查是否需要合并采购单
add_to_log(log_widget, "检查是否需要合并采购单...\n", "info")
try:
# 先获取采购单文件列表
purchase_orders = order_service.get_purchase_orders()
if len(purchase_orders) == 0:
add_to_log(log_widget, "没有找到采购单文件,跳过合并步骤\n", "info")
elif len(purchase_orders) == 1:
add_to_log(log_widget, f"只有1个采购单文件无需合并: {os.path.basename(purchase_orders[0])}\n", "info")
else:
add_to_log(log_widget, f"找到{len(purchase_orders)}个采购单文件\n", "info")
# 弹窗询问是否进行合并
file_list = "\n".join([f"{os.path.basename(f)}" for f in purchase_orders])
merge_choice = messagebox.askyesnocancel(
"采购单合并选择",
f"发现{len(purchase_orders)}个采购单文件:\n\n{file_list}\n\n是否需要合并这些采购单?\n\n• 选择'':合并所有采购单\n• 选择'':保持文件分离\n• 选择'取消':跳过此步骤",
icon='question'
)
if merge_choice is True: # 用户选择合并
add_to_log(log_widget, "开始合并采购单...\n", "info")
merge_result = order_service.merge_all_purchase_orders()
if merge_result:
add_to_log(log_widget, "采购单合并完成\n", "success")
else:
add_to_log(log_widget, "采购单合并失败\n", "error")
elif merge_choice is False: # 用户选择不合并
add_to_log(log_widget, "用户选择保持文件分离,跳过合并步骤\n", "info")
else: # 用户选择取消
add_to_log(log_widget, "用户取消合并操作\n", "info")
except Exception as e:
add_to_log(log_widget, f"合并检查过程中出错: {str(e)}\n", "error")
add_to_log(log_widget, "批量处理订单完成\n", "success")
show_excel_result_preview("批量处理订单成功完成")
else:
add_to_log(log_widget, "批量处理订单失败\n", "error")
except Exception as e:
add_to_log(log_widget, f"批量处理订单时出错: {str(e)}\n", "error")
finally:
# 清理日志处理器
try:
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
if isinstance(handler, GUILogHandler):
root_logger.removeHandler(handler)
handler.close()
except:
pass
status_bar.set_running(False)
status_bar.set_status("就绪")
# 在新线程中运行
thread = Thread(target=run_in_thread)
thread.daemon = True
thread.start()
def merge_orders_with_status(log_widget, status_bar):
"""合并采购单"""
def run_in_thread():
try:
status_bar.set_running(True)
status_bar.set_status("正在合并采购单...")
add_to_log(log_widget, "开始合并采购单\n", "info")
# 设置日志重定向到GUI
gui_handler = GUILogHandler(log_widget)
gui_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
gui_handler.setFormatter(formatter)
# 获取根日志记录器并添加GUI处理器
root_logger = logging.getLogger()
# 移除现有的控制台处理器,避免重复输出
for handler in root_logger.handlers[:]:
if isinstance(handler, logging.StreamHandler):
root_logger.removeHandler(handler)
root_logger.addHandler(gui_handler)
root_logger.setLevel(logging.INFO)
# 创建订单服务实例
order_service = OrderService()
# 执行合并处理
result = order_service.merge_all_purchase_orders()
if result:
add_to_log(log_widget, "采购单合并完成\n", "success")
show_merge_result_preview("采购单合并成功完成")
else:
add_to_log(log_widget, "采购单合并失败\n", "error")
except Exception as e:
add_to_log(log_widget, f"采购单合并出错: {str(e)}\n", "error")
finally:
# 清理日志处理器
try:
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
if isinstance(handler, GUILogHandler):
root_logger.removeHandler(handler)
handler.close()
except:
pass
status_bar.set_running(False)
status_bar.set_status("就绪")
# 在新线程中运行
thread = Thread(target=run_in_thread)
thread.daemon = True
thread.start()
def process_tobacco_orders_with_status(log_widget, status_bar):
"""处理烟草订单"""
def run_in_thread():
try:
status_bar.set_running(True)
status_bar.set_status("正在处理烟草订单...")
add_to_log(log_widget, "开始处理烟草订单\n", "info")
# 设置日志重定向到GUI
gui_handler = GUILogHandler(log_widget)
gui_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
gui_handler.setFormatter(formatter)
# 获取根日志记录器并添加GUI处理器
root_logger = logging.getLogger()
# 移除现有的控制台处理器,避免重复输出
for handler in root_logger.handlers[:]:
if isinstance(handler, logging.StreamHandler):
root_logger.removeHandler(handler)
root_logger.addHandler(gui_handler)
root_logger.setLevel(logging.INFO)
# 创建烟草服务实例
config_manager = ConfigManager()
tobacco_service = TobaccoService(config_manager)
# 执行烟草订单处理
result = tobacco_service.process_tobacco_order()
if result:
add_to_log(log_widget, "烟草订单处理完成\n", "success")
# 构造输出信息用于预览
output_info = f"烟草订单处理成功完成\n烟草订单处理完成,绝对路径: {result}"
show_tobacco_result_preview(0, output_info)
else:
add_to_log(log_widget, "烟草订单处理失败\n", "error")
except Exception as e:
add_to_log(log_widget, f"烟草订单处理出错: {str(e)}\n", "error")
finally:
# 清理日志处理器
try:
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
if isinstance(handler, GUILogHandler):
root_logger.removeHandler(handler)
handler.close()
except:
pass
status_bar.set_running(False)
status_bar.set_status("就绪")
# 在新线程中运行
thread = Thread(target=run_in_thread)
thread.daemon = True
thread.start()
def process_excel_file_with_status(log_widget, status_bar):
"""处理Excel文件"""
def run_in_thread():
try:
status_bar.set_running(True)
status_bar.set_status("选择Excel文件中...")
file_path = select_excel_file(log_widget)
if file_path:
status_bar.set_status("开始处理Excel文件...")
add_to_log(log_widget, f"开始处理Excel文件: {file_path}\n", "info")
else:
status_bar.set_status("开始处理最新Excel文件...")
add_to_log(log_widget, "未选择文件尝试处理最新的Excel文件\n", "info")
# 设置日志重定向到GUI
gui_handler = GUILogHandler(log_widget)
gui_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
gui_handler.setFormatter(formatter)
# 获取根日志记录器并添加GUI处理器
root_logger = logging.getLogger()
# 移除现有的控制台处理器,避免重复输出
for handler in root_logger.handlers[:]:
if isinstance(handler, logging.StreamHandler):
root_logger.removeHandler(handler)
root_logger.addHandler(gui_handler)
root_logger.setLevel(logging.INFO)
# 创建订单服务实例
order_service = OrderService()
# 执行Excel处理
if file_path:
result = order_service.process_excel(file_path)
else:
result = order_service.process_excel()
if result:
add_to_log(log_widget, "Excel文件处理完成\n", "success")
show_excel_result_preview("Excel文件处理成功完成")
# 自动打开output目录
try:
output_dir = os.path.join(os.getcwd(), "output")
if os.path.exists(output_dir):
os.startfile(output_dir)
add_to_log(log_widget, "已自动打开output目录\n", "info")
except Exception as e:
add_to_log(log_widget, f"打开output目录失败: {str(e)}\n", "warning")
else:
add_to_log(log_widget, "Excel文件处理失败\n", "error")
except Exception as e:
add_to_log(log_widget, f"Excel文件处理出错: {str(e)}\n", "error")
finally:
# 清理日志处理器
try:
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
if isinstance(handler, GUILogHandler):
root_logger.removeHandler(handler)
handler.close()
except:
pass
status_bar.set_running(False)
status_bar.set_status("就绪")
# 在新线程中运行
thread = Thread(target=run_in_thread)
thread.daemon = True
thread.start()
def main():
"""主函数"""
try:
# 确保必要的目录结构存在并转移旧目录内容
ensure_directories()
# 创建窗口
root = tk.Tk()
root.title("益选-OCR订单处理系统 v1.1.0")
root.geometry("1000x620") # 优化窗口尺寸,更加小巧
root.configure(bg=THEMES[THEME_MODE]["bg"])
# 设置窗口图标和样式
try:
root.iconbitmap(default="")
except:
pass
# 创建主容器
main_container = tk.Frame(root, bg=THEMES[THEME_MODE]["bg"])
main_container.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 隐藏主标题区域,减少间距
# 创建主内容区域
content_frame = tk.Frame(main_container, bg=THEMES[THEME_MODE]["bg"])
content_frame.pack(fill=tk.BOTH, expand=True)
# 左侧控制面板
left_panel = create_card_frame(content_frame)
left_panel.pack(side=tk.LEFT, fill=tk.BOTH, expand=False, padx=(0, 5), pady=5)
left_panel.configure(width=400) # 固定宽度
# 右侧日志面板
log_panel = create_card_frame(content_frame, "处理日志")
log_panel.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=(5, 0), pady=5)
# 日志文本区域
log_text = scrolledtext.ScrolledText(
log_panel,
wrap=tk.WORD,
width=60,
height=25,
bg=THEMES[THEME_MODE]["log_bg"],
fg=THEMES[THEME_MODE]["log_fg"],
font=("Consolas", 9),
state=tk.DISABLED,
relief="flat",
borderwidth=0
)
log_text.pack(fill=tk.BOTH, expand=True, padx=10, pady=(5, 10))
# 配置日志文本样式
log_text.tag_configure("command", foreground=THEMES[THEME_MODE]["info"], font=("Consolas", 9, "bold"))
log_text.tag_configure("time", foreground=THEMES[THEME_MODE]["secondary_bg"], font=("Consolas", 8))
log_text.tag_configure("separator", foreground=THEMES[THEME_MODE]["border"])
log_text.tag_configure("success", foreground=THEMES[THEME_MODE]["success"], font=("Consolas", 9, "bold"))
log_text.tag_configure("error", foreground=THEMES[THEME_MODE]["error"], font=("Consolas", 9, "bold"))
log_text.tag_configure("warning", foreground=THEMES[THEME_MODE]["warning"], font=("Consolas", 9, "bold"))
log_text.tag_configure("info", foreground=THEMES[THEME_MODE]["info"], font=("Consolas", 9))
# 初始化日志内容
add_to_log(log_text, "欢迎使用 益选-OCR订单处理系统 v1.1.0\n", "success")
add_to_log(log_text, "系统已就绪,请选择相应功能进行操作。\n\n", "info")
add_to_log(log_text, "功能说明:\n", "command")
add_to_log(log_text, "• 完整处理流程一键完成OCR识别和Excel处理\n", "info")
add_to_log(log_text, "• 批量处理订单:批量处理多个订单文件\n", "info")
add_to_log(log_text, "• 处理烟草订单:专门处理烟草类订单\n", "info")
add_to_log(log_text, "• 合并订单:将多个订单合并为一个文件\n\n", "info")
add_to_log(log_text, "请将需要处理的图片文件放入 data/input 目录中。\n", "warning")
add_to_log(log_text, "OCR识别结果保存在 data/output 目录,处理完成的订单保存在 result 目录中。\n\n", "warning")
add_to_log(log_text, "=" * 50 + "\n\n", "separator")
# 创建状态栏
status_bar = StatusBar(root)
status_bar.pack(side=tk.BOTTOM, fill=tk.X)
# 左侧面板内容区域
panel_content = tk.Frame(left_panel, bg=THEMES[THEME_MODE]["card_bg"])
panel_content.pack(fill=tk.BOTH, expand=True, padx=10, pady=(5, 10))
# 核心功能区
core_section = tk.LabelFrame(
panel_content,
text="核心功能",
bg=THEMES[THEME_MODE]["card_bg"],
fg=THEMES[THEME_MODE]["fg"],
font=("Microsoft YaHei UI", 10, "bold"),
relief="flat",
borderwidth=0
)
core_section.pack(fill=tk.X, pady=(0, 16))
# 核心功能按钮
core_buttons_frame = tk.Frame(core_section, bg=THEMES[THEME_MODE]["card_bg"])
core_buttons_frame.pack(fill=tk.X, padx=8, pady=6)
# 核心功能按钮行1
core_row1 = tk.Frame(core_buttons_frame, bg=THEMES[THEME_MODE]["card_bg"])
core_row1.pack(fill=tk.X, pady=3)
# 完整处理流程
create_modern_button(
core_row1,
"完整处理流程",
lambda: run_pipeline_directly(log_text, status_bar),
"primary",
width=10
).pack(side=tk.LEFT, padx=(0, 3), fill=tk.X, expand=True)
# 批量处理订单
create_modern_button(
core_row1,
"批量处理订单",
lambda: batch_process_orders_with_status(log_text, status_bar),
"primary",
width=10
).pack(side=tk.LEFT, padx=(3, 0), fill=tk.X, expand=True)
# 核心功能按钮行2
core_row2 = tk.Frame(core_buttons_frame, bg=THEMES[THEME_MODE]["card_bg"])
core_row2.pack(fill=tk.X, pady=3)
# 处理烟草订单
create_modern_button(
core_row2,
"处理烟草订单",
lambda: process_tobacco_orders_with_status(log_text, status_bar),
"primary",
width=10
).pack(side=tk.LEFT, padx=(0, 3), fill=tk.X, expand=True)
# 合并订单
create_modern_button(
core_row2,
"合并订单",
lambda: merge_orders_with_status(log_text, status_bar),
"primary",
width=10
).pack(side=tk.LEFT, padx=(3, 0), fill=tk.X, expand=True)
# OCR功能区
ocr_section = tk.LabelFrame(
panel_content,
text="OCR识别",
bg=THEMES[THEME_MODE]["card_bg"],
fg=THEMES[THEME_MODE]["fg"],
font=("Microsoft YaHei UI", 10, "bold"),
relief="flat",
borderwidth=0
)
ocr_section.pack(fill=tk.X, pady=(0, 16))
ocr_buttons_frame = tk.Frame(ocr_section, bg=THEMES[THEME_MODE]["card_bg"])
ocr_buttons_frame.pack(fill=tk.X, padx=8, pady=6)
# OCR按钮行1
ocr_row1 = tk.Frame(ocr_buttons_frame, bg=THEMES[THEME_MODE]["card_bg"])
ocr_row1.pack(fill=tk.X, pady=3)
# OCR批量识别
create_modern_button(
ocr_row1,
"OCR批量识别",
lambda: batch_ocr_with_status(log_text, status_bar),
"primary",
width=10
).pack(side=tk.LEFT, padx=(0, 3), fill=tk.X, expand=True)
# 处理单个图片
create_modern_button(
ocr_row1,
"处理单个图片",
lambda: process_single_image_with_status(log_text, status_bar),
"primary",
width=10
).pack(side=tk.LEFT, padx=(3, 0), fill=tk.X, expand=True)
# Excel处理区
excel_section = tk.LabelFrame(
panel_content,
text="Excel处理",
bg=THEMES[THEME_MODE]["card_bg"],
fg=THEMES[THEME_MODE]["fg"],
font=("Microsoft YaHei UI", 10, "bold"),
relief="flat",
borderwidth=0
)
excel_section.pack(fill=tk.X, pady=(0, 16))
excel_buttons_frame = tk.Frame(excel_section, bg=THEMES[THEME_MODE]["card_bg"])
excel_buttons_frame.pack(fill=tk.X, padx=8, pady=6)
# Excel按钮行1
excel_row1 = tk.Frame(excel_buttons_frame, bg=THEMES[THEME_MODE]["card_bg"])
excel_row1.pack(fill=tk.X, pady=3)
# 处理Excel文件
create_modern_button(
excel_row1,
"处理Excel文件",
lambda: process_excel_file_with_status(log_text, status_bar),
"primary",
width=10
).pack(side=tk.LEFT, padx=(0, 3), fill=tk.X, expand=True)
# 结果目录
create_modern_button(
excel_row1,
"结果目录",
lambda: open_result_directory(),
"primary",
width=10
).pack(side=tk.LEFT, padx=(3, 0), fill=tk.X, expand=True)
# 工具功能区
tools_section = tk.LabelFrame(
panel_content,
text="工具功能",
bg=THEMES[THEME_MODE]["card_bg"],
fg=THEMES[THEME_MODE]["fg"],
font=("Microsoft YaHei UI", 10, "bold"),
relief="flat",
borderwidth=0
)
tools_section.pack(fill=tk.X, pady=(0, 16))
tools_buttons_frame = tk.Frame(tools_section, bg=THEMES[THEME_MODE]["card_bg"])
tools_buttons_frame.pack(fill=tk.X, padx=8, pady=6)
# 工具按钮行1
tools_row1 = tk.Frame(tools_buttons_frame, bg=THEMES[THEME_MODE]["card_bg"])
tools_row1.pack(fill=tk.X, pady=3)
# 打开输入目录
create_modern_button(
tools_row1,
"输入目录",
lambda: os.startfile(os.path.abspath("data/input")),
"primary",
width=10
).pack(side=tk.LEFT, padx=(0, 3), fill=tk.X, expand=True)
# 打开输出目录
create_modern_button(
tools_row1,
"输出目录",
lambda: os.startfile(os.path.abspath("data/output")),
"primary",
width=10
).pack(side=tk.LEFT, padx=(3, 0), fill=tk.X, expand=True)
# 工具按钮行2
tools_row2 = tk.Frame(tools_buttons_frame, bg=THEMES[THEME_MODE]["card_bg"])
tools_row2.pack(fill=tk.X, pady=3)
# 清除缓存
create_modern_button(
tools_row2,
"清除缓存",
lambda: clean_cache(log_text),
"primary",
width=10
).pack(side=tk.LEFT, padx=(0, 3), fill=tk.X, expand=True)
# 清理文件
create_modern_button(
tools_row2,
"清理文件",
lambda: clean_data_files(log_text),
"primary",
width=10
).pack(side=tk.LEFT, padx=(3, 0), fill=tk.X, expand=True)
# 系统设置区
settings_section = tk.LabelFrame(
panel_content,
text="系统设置",
bg=THEMES[THEME_MODE]["card_bg"],
fg=THEMES[THEME_MODE]["fg"],
font=("Microsoft YaHei UI", 10, "bold"),
relief="flat",
borderwidth=0
)
settings_section.pack(fill=tk.X, pady=(0, 16))
settings_buttons_frame = tk.Frame(settings_section, bg=THEMES[THEME_MODE]["card_bg"])
settings_buttons_frame.pack(fill=tk.X, padx=8, pady=6)
# 系统设置按钮行1
settings_row1 = tk.Frame(settings_buttons_frame, bg=THEMES[THEME_MODE]["card_bg"])
settings_row1.pack(fill=tk.X, pady=3)
# 系统设置
create_modern_button(
settings_row1,
"系统设置",
lambda: show_config_dialog(root, ConfigManager()),
"primary",
width=10
).pack(side=tk.LEFT, padx=(0, 3), fill=tk.X, expand=True)
# 条码映射编辑
create_modern_button(
settings_row1,
"条码映射",
lambda: edit_barcode_mappings(log_text),
"primary",
width=10
).pack(side=tk.LEFT, padx=(3, 0), fill=tk.X, expand=True)
# 绑定键盘快捷键
bind_keyboard_shortcuts(root, log_text, status_bar)
# 启动主循环
root.mainloop()
except Exception as e:
import traceback
error_msg = f"程序启动失败: {str(e)}\n详细错误信息:\n{traceback.format_exc()}"
print(error_msg)
try:
import tkinter.messagebox as mb
mb.showerror("启动错误", f"程序启动失败:\n{str(e)}")
except:
pass
def add_to_log(log_widget, text, tag="normal"):
"""向日志窗口添加文本,支持样式标签"""
log_widget.configure(state=tk.NORMAL)
log_widget.insert(tk.END, text, tag)
log_widget.see(tk.END) # 自动滚动到底部
log_widget.configure(state=tk.DISABLED)
def select_file(log_widget, file_types=[("所有文件", "*.*")], title="选择文件"):
"""通用文件选择对话框"""
file_path = filedialog.askopenfilename(title=title, filetypes=file_types)
if file_path:
add_to_log(log_widget, f"已选择文件: {file_path}\n", "info")
return file_path
def select_excel_file(log_widget):
"""选择Excel文件"""
return select_file(
log_widget,
[("Excel文件", "*.xlsx *.xls"), ("所有文件", "*.*")],
"选择Excel文件"
)
def clean_cache(log_widget):
"""清除处理缓存"""
try:
# 清除OCR缓存文件
cache_files = [
os.path.join("data", "processed_files.json"),
os.path.join("data/output", "processed_files.json"),
os.path.join("data/output", "merged_files.json")
]
for cache_file in cache_files:
if os.path.exists(cache_file):
os.remove(cache_file)
add_to_log(log_widget, f"已清除缓存文件: {cache_file}\n", "success")
# 清除临时文件夹中所有文件
temp_dir = os.path.join("data/temp")
if os.path.exists(temp_dir):
for file in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, file)
try:
if os.path.isfile(file_path):
os.remove(file_path)
add_to_log(log_widget, f"已清除临时文件: {file_path}\n", "info")
except Exception as e:
add_to_log(log_widget, f"清除文件时出错: {file_path}, 错误: {str(e)}\n", "error")
# 清除日志文件中的active标记
log_dir = "logs"
if os.path.exists(log_dir):
for file in os.listdir(log_dir):
if file.endswith(".active"):
file_path = os.path.join(log_dir, file)
try:
os.remove(file_path)
add_to_log(log_widget, f"已清除活动日志标记: {file_path}\n", "info")
except Exception as e:
add_to_log(log_widget, f"清除文件时出错: {file_path}, 错误: {str(e)}\n", "error")
# 重置全局状态
global RUNNING_TASK
RUNNING_TASK = None
add_to_log(log_widget, "缓存清除完成,系统将重新处理所有文件\n", "success")
messagebox.showinfo("缓存清除", "缓存已清除,系统将重新处理所有文件。")
except Exception as e:
add_to_log(log_widget, f"清除缓存时出错: {str(e)}\n", "error")
messagebox.showerror("错误", f"清除缓存时出错: {str(e)}")
def open_result_directory():
"""打开结果目录"""
try:
result_dir = os.path.abspath("data/result")
if not os.path.exists(result_dir):
os.makedirs(result_dir, exist_ok=True)
os.startfile(result_dir)
except Exception as e:
messagebox.showerror("错误", f"无法打开结果目录: {str(e)}")
def clean_data_files(log_widget):
"""清理数据文件仅清理input和output目录"""
try:
# 确认清理
if not messagebox.askyesno("确认清理", "确定要清理input和output目录的文件吗这将删除所有输入和输出数据。"):
add_to_log(log_widget, "操作已取消\n", "info")
return
files_cleaned = 0
# 清理输入目录
input_dir = "data/input"
if os.path.exists(input_dir):
for file in os.listdir(input_dir):
file_path = os.path.join(input_dir, file)
if os.path.isfile(file_path):
os.remove(file_path)
files_cleaned += 1
add_to_log(log_widget, f"已清理input目录\n", "info")
# 清理输出目录
output_dir = "data/output"
if os.path.exists(output_dir):
for file in os.listdir(output_dir):
file_path = os.path.join(output_dir, file)
if os.path.isfile(file_path):
os.remove(file_path)
files_cleaned += 1
add_to_log(log_widget, f"已清理output目录\n", "info")
add_to_log(log_widget, f"清理完成,共清理 {files_cleaned} 个文件\n", "success")
messagebox.showinfo("清理完成", f"已成功清理 {files_cleaned} 个文件")
except Exception as e:
add_to_log(log_widget, f"清理数据文件时出错: {str(e)}\n", "error")
messagebox.showerror("错误", f"清理数据文件时出错: {str(e)}")
def center_window(window):
"""使窗口居中显示"""
window.update_idletasks()
width = window.winfo_width()
height = window.winfo_height()
x = (window.winfo_screenwidth() // 2) - (width // 2)
y = (window.winfo_screenheight() // 2) - (height // 2)
window.geometry('{}x{}+{}+{}'.format(width, height, x, y))
def show_tobacco_result_preview(returncode, output):
"""显示烟草订单处理结果预览"""
# 只在成功时显示结果预览
if returncode != 0:
return
try:
# 查找输出文件路径
result_file = None
order_time = "(未知)"
total_amount = "(未知)"
items_count = 0
# 先使用更可靠的方式查找文件路径
abs_path_match = re.search(r'烟草订单处理完成,绝对路径: (.+)(?:\n|$)', output)
if abs_path_match:
result_file = abs_path_match.group(1).strip()
# 提取处理结果信息
for line in output.split('\n'):
# 提取订单时间和金额
if "烟草公司订单处理成功" in line and "订单时间" in line:
time_match = re.search(r'订单时间: ([^,]+)', line)
amount_match = re.search(r'总金额: ([^,]+)', line)
items_match = re.search(r'处理条目: (\d+)', line)
if time_match:
order_time = time_match.group(1).strip()
if amount_match:
total_amount = amount_match.group(1).strip()
if items_match:
items_count = int(items_match.group(1).strip())
# 如果没有找到文件路径,使用默认路径
if not result_file or not os.path.exists(result_file):
default_path = os.path.abspath("data/output/银豹采购单_烟草公司.xls")
if os.path.exists(default_path):
result_file = default_path
# 创建结果预览对话框
preview = tk.Toplevel()
preview.title("烟草订单处理结果")
preview.geometry("450x320")
preview.resizable(False, False)
# 使弹窗居中显示
center_window(preview)
# 添加内容
tk.Label(preview, text="烟草订单处理完成", font=("Arial", 16, "bold")).pack(pady=10)
result_frame = tk.Frame(preview)
result_frame.pack(pady=10, fill=tk.BOTH, expand=True)
tk.Label(result_frame, text=f"订单时间: {order_time}", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
tk.Label(result_frame, text=f"订单总金额: {total_amount}", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
tk.Label(result_frame, text=f"处理商品数量: {items_count}", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
# 文件信息
if result_file and os.path.exists(result_file):
tk.Label(result_frame, text=f"输出文件: {os.path.basename(result_file)}", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
# 处理成功提示
tk.Label(result_frame, text="银豹采购单已成功生成!", font=("Arial", 12, "bold"), fg="#28a745").pack(pady=10)
# 文件信息框
file_frame = tk.Frame(result_frame, relief=tk.GROOVE, borderwidth=1)
file_frame.pack(fill=tk.X, padx=15, pady=5)
tk.Label(file_frame, text="文件信息", font=("Arial", 10, "bold")).pack(anchor=tk.W, padx=10, pady=5)
# 获取文件大小和时间
try:
file_size = os.path.getsize(result_file)
file_time = datetime.datetime.fromtimestamp(os.path.getmtime(result_file))
size_text = f"{file_size / 1024:.1f} KB" if file_size < 1024*1024 else f"{file_size / (1024*1024):.1f} MB"
tk.Label(file_frame, text=f"文件大小: {size_text}", font=("Arial", 10)).pack(anchor=tk.W, padx=10, pady=2)
tk.Label(file_frame, text=f"创建时间: {file_time.strftime('%Y-%m-%d %H:%M:%S')}", font=("Arial", 10)).pack(anchor=tk.W, padx=10, pady=2)
except:
tk.Label(file_frame, text="无法获取文件信息", font=("Arial", 10)).pack(anchor=tk.W, padx=10, pady=2)
# 添加按钮
button_frame = tk.Frame(preview)
button_frame.pack(pady=10)
tk.Button(button_frame, text="打开文件", command=lambda: os.startfile(result_file)).pack(side=tk.LEFT, padx=5)
tk.Button(button_frame, text="打开所在文件夹", command=lambda: os.startfile(os.path.dirname(result_file))).pack(side=tk.LEFT, padx=5)
tk.Button(button_frame, text="关闭", command=preview.destroy).pack(side=tk.LEFT, padx=5)
else:
# 文件不存在或未找到的情况
tk.Label(result_frame, text="未找到输出文件", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
tk.Label(result_frame, text="请检查data/output目录", font=("Arial", 12, "bold"), fg="#dc3545").pack(pady=10)
# 添加按钮
button_frame = tk.Frame(preview)
button_frame.pack(pady=10)
tk.Button(button_frame, text="打开输出目录", command=lambda: os.startfile(os.path.abspath("data/output"))).pack(side=tk.LEFT, padx=5)
tk.Button(button_frame, text="关闭", command=preview.destroy).pack(side=tk.LEFT, padx=5)
# 确保窗口显示在最前
preview.lift()
preview.attributes('-topmost', True)
preview.after_idle(lambda: preview.attributes('-topmost', False))
except Exception as e:
# 发生异常,显示错误消息
messagebox.showerror(
"处理异常",
f"显示预览时发生错误: {e}\n请检查日志了解详细信息。"
)
def edit_barcode_mappings(log_widget):
"""编辑条码映射配置"""
try:
add_to_log(log_widget, "正在加载条码映射配置...\n", "info")
# 创建单位转换器实例,用于加载和保存条码映射
unit_converter = UnitConverter()
# 加载现有的映射配置
current_mappings = unit_converter.special_barcodes
# 回调函数,保存更新后的映射
def save_mappings(new_mappings):
success = unit_converter.update_barcode_mappings(new_mappings)
if success:
add_to_log(log_widget, f"成功保存条码映射配置,共{len(new_mappings)}\n", "success")
else:
add_to_log(log_widget, "保存条码映射配置失败\n", "error")
# 显示条码映射编辑对话框
show_barcode_mapping_dialog(None, save_mappings, current_mappings)
except Exception as e:
add_to_log(log_widget, f"编辑条码映射时出错: {str(e)}\n", "error")
messagebox.showerror("错误", f"编辑条码映射时出错: {str(e)}")
def bind_keyboard_shortcuts(root, log_widget, status_bar):
"""绑定键盘快捷键"""
# Ctrl+O - 处理单个图片
root.bind('<Control-o>', lambda e: process_single_image_with_status(log_widget, status_bar))
# Ctrl+E - 处理Excel文件
root.bind('<Control-e>', lambda e: process_excel_file_with_status(log_widget, status_bar))
# Ctrl+B - 批量处理
root.bind('<Control-b>', lambda e: batch_ocr_with_status(log_widget, status_bar))
# Ctrl+P - 完整流程
root.bind('<Control-p>', lambda e: run_pipeline_directly(log_widget, status_bar))
# Ctrl+M - 合并采购单
root.bind('<Control-m>', lambda e: merge_orders_with_status(log_widget, status_bar))
# Ctrl+T - 处理烟草订单
root.bind('<Control-t>', lambda e: process_tobacco_orders_with_status(log_widget, status_bar))
# F5 - 刷新/清除缓存
root.bind('<F5>', lambda e: clean_cache(log_widget))
# Escape - 退出
root.bind('<Escape>', lambda e: root.quit() if messagebox.askyesno("确认退出", "确定要退出程序吗?") else None)
# F1 - 显示快捷键帮助
root.bind('<F1>', lambda e: show_shortcuts_help())
def show_shortcuts_help():
"""显示快捷键帮助对话框"""
help_dialog = tk.Toplevel()
help_dialog.title("快捷键帮助")
help_dialog.geometry("400x450")
center_window(help_dialog)
tk.Label(help_dialog, text="键盘快捷键", font=("Arial", 16, "bold")).pack(pady=10)
help_text = tk.Text(help_dialog, wrap=tk.WORD, width=50, height=20)
help_text.pack(padx=20, pady=10, fill=tk.BOTH, expand=True)
shortcuts = """
Ctrl+O: 处理单个图片
Ctrl+E: 处理Excel文件
Ctrl+B: OCR批量识别
Ctrl+P: 完整处理流程
Ctrl+M: 合并采购单
Ctrl+T: 处理烟草订单
F5: 清除处理缓存
Esc: 退出程序
"""
help_text.insert(tk.END, shortcuts)
help_text.configure(state=tk.DISABLED)
# 按钮
tk.Button(help_dialog, text="确定", command=help_dialog.destroy).pack(pady=10)
# 确保窗口显示在最前
help_dialog.lift()
help_dialog.attributes('-topmost', True)
help_dialog.after_idle(lambda: help_dialog.attributes('-topmost', False))
if __name__ == "__main__":
main()