新增条码映射编辑功能图形化界面

This commit is contained in:
2025-05-10 11:39:11 +08:00
parent 7b7d491663
commit 5c0b709528
46 changed files with 2510 additions and 499 deletions
Binary file not shown.
Binary file not shown.
Binary file not shown.
+102 -55
View File
@@ -6,6 +6,8 @@
import re
import logging
import os
import json
from typing import Dict, Tuple, Optional, Any, List, Union
from ..utils.log_utils import get_logger
@@ -18,6 +20,9 @@ from .validators import ProductValidator
logger = get_logger(__name__)
# 条码映射配置文件路径
BARCODE_MAPPING_CONFIG = "config/barcode_mappings.json"
class UnitConverter:
"""
单位转换器:处理不同单位之间的转换,支持从商品名称推断规格
@@ -27,60 +32,8 @@ class UnitConverter:
"""
初始化单位转换器
"""
# 特殊条码配置
self.special_barcodes = {
'6925019900087': {
'multiplier': 10, # 数量乘以10
'target_unit': '', # 目标单位
'description': '特殊处理:数量*10,单位转换为瓶'
},
'6921168593804': {
'multiplier': 30, # 数量乘以30
'target_unit': '', # 目标单位
'description': 'NFC产品特殊处理:每箱30瓶'
},
'6901826888138': {
'multiplier': 30, # 数量乘以30
'target_unit': '', # 目标单位
'fixed_price': 112/30, # 固定单价为112/30
'specification': '1*30', # 固定规格
'description': '特殊处理: 规格1*30,数量*30,单价=112/30'
},
# 条码映射转换配置
'6920584471055': {
'map_to': '6920584471017', # 映射到新条码
'description': '条码映射:6920584471055 -> 6920584471017'
},
'6925861571159': {
'map_to': '69021824', # 映射到新条码
'description': '条码映射:6925861571159 -> 69021824'
},
'6923644268923': {
'map_to': '6923644268480', # 映射到新条码
'description': '条码映射:6923644268923 -> 6923644268480'
},
'6907992501819': {
'map_to': '6907992500133', # 映射到新条码
'description': '条码映射:6907992501819 -> 6907992500133'
},
'6923644268916': {
'map_to': '6923644268503', # 映射到新条码
'description': '条码映射:6923644268916 -> 6923644268503'
},
'6923644283582': {
'map_to': '6923644283575', # 映射到新条码
'description': '条码映射:6923644283582 -> 6923644283575'
},
'6923644268930': {
'map_to': '6923644268497', # 映射到新条码
'description': '条码映射:6923644268930 -> 6923644268497'
},
'6923644210151': {
'map_to': '6923644223458', # 映射到新条码
'description': '条码映射:6923644210151 -> 6923644223458'
}
# 可以添加更多特殊条码的配置
}
# 加载特殊条码配置
self.special_barcodes = self.load_barcode_mappings()
# 规格推断的正则表达式模式
self.spec_patterns = [
@@ -447,4 +400,98 @@ class UnitConverter:
# 没有找到适用的处理程序,保持不变
logger.info(f"其他单位处理: 保持原样 数量: {result.get('quantity', 0)}, 单价: {result.get('price', 0)}, 单位: {result.get('unit', '')}")
return result
return result
def load_barcode_mappings(self) -> Dict[str, Dict[str, Any]]:
"""
从配置文件加载条码映射
Returns:
条码映射字典
"""
# 默认映射
default_mappings = {
'6925019900087': {
'multiplier': 10,
'target_unit': '',
'description': '特殊处理:数量*10,单位转换为瓶'
},
'6921168593804': {
'multiplier': 30,
'target_unit': '',
'description': 'NFC产品特殊处理:每箱30瓶'
},
'6901826888138': {
'multiplier': 30,
'target_unit': '',
'fixed_price': 112/30,
'specification': '1*30',
'description': '特殊处理: 规格1*30,数量*30,单价=112/30'
},
# 条码映射配置
'6920584471055': {
'map_to': '6920584471017',
'description': '条码映射:6920584471055 -> 6920584471017'
},
'6925861571159': {
'map_to': '69021824',
'description': '条码映射:6925861571159 -> 69021824'
},
'6923644268923': {
'map_to': '6923644268480',
'description': '条码映射:6923644268923 -> 6923644268480'
}
}
try:
# 检查配置文件是否存在
if os.path.exists(BARCODE_MAPPING_CONFIG):
with open(BARCODE_MAPPING_CONFIG, 'r', encoding='utf-8') as file:
mappings = json.load(file)
logger.info(f"成功加载条码映射配置,共{len(mappings)}")
return mappings
else:
# 创建默认配置文件
self.save_barcode_mappings(default_mappings)
logger.info(f"创建默认条码映射配置,共{len(default_mappings)}")
return default_mappings
except Exception as e:
logger.error(f"加载条码映射配置失败: {e}")
return default_mappings
def save_barcode_mappings(self, mappings: Dict[str, Dict[str, Any]]) -> bool:
"""
保存条码映射到配置文件
Args:
mappings: 条码映射字典
Returns:
保存是否成功
"""
try:
# 确保配置目录存在
os.makedirs(os.path.dirname(BARCODE_MAPPING_CONFIG), exist_ok=True)
# 写入配置文件
with open(BARCODE_MAPPING_CONFIG, 'w', encoding='utf-8') as file:
json.dump(mappings, file, ensure_ascii=False, indent=2)
logger.info(f"条码映射配置保存成功,共{len(mappings)}")
return True
except Exception as e:
logger.error(f"保存条码映射配置失败: {e}")
return False
def update_barcode_mappings(self, new_mappings: Dict[str, Dict[str, Any]]) -> bool:
"""
更新条码映射配置
Args:
new_mappings: 新的条码映射字典
Returns:
更新是否成功
"""
self.special_barcodes = new_mappings
return self.save_barcode_mappings(new_mappings)
+34 -25
View File
@@ -36,35 +36,44 @@ class PurchaseOrderMerger:
采购单合并器:将多个采购单Excel文件合并成一个文件
"""
def __init__(self, config: Optional[ConfigManager] = None):
def __init__(self, config):
"""
初始化采购单合并器
Args:
config: 配置管理器,如果为None则创建新的
config: 配置信息
"""
logger.info("初始化PurchaseOrderMerger")
self.config = config or ConfigManager()
self.config = config
# 获取配置
self.output_dir = self.config.get_path('Paths', 'output_folder', 'data/output', create=True)
# 获取模板文件路径
template_folder = self.config.get('Paths', 'template_folder', 'templates')
template_name = self.config.get('Templates', 'purchase_order', '银豹-采购单模板.xls')
self.template_path = os.path.join(template_folder, template_name)
# 检查模板文件是否存在
if not os.path.exists(self.template_path):
logger.error(f"模板文件不存在: {self.template_path}")
raise FileNotFoundError(f"模板文件不存在: {self.template_path}")
# 用于记录已合并的文件
self.cache_file = os.path.join(self.output_dir, "merged_files.json")
self.merged_files = self._load_merged_files()
logger.info(f"初始化完成,模板文件: {self.template_path}")
# 修复ConfigParser对象没有get_path方法的问题
try:
# 获取输出目录
self.output_dir = config.get('Paths', 'output_folder', fallback='data/output')
# 确保目录存在
os.makedirs(self.output_dir, exist_ok=True)
# 记录实际路径
logger.info(f"使用输出目录: {os.path.abspath(self.output_dir)}")
# 获取模板文件路径
template_folder = config.get('Paths', 'template_folder', fallback='templates')
template_name = config.get('Templates', 'purchase_order', fallback='银豹-采购单模板.xls')
self.template_path = os.path.join(template_folder, template_name)
# 检查模板文件是否存在
if not os.path.exists(self.template_path):
logger.warning(f"模板文件不存在: {self.template_path}")
# 用于记录已合并的文件
self.merged_files_json = os.path.join(self.output_dir, "merged_files.json")
self.merged_files = self._load_merged_files()
logger.info(f"初始化PurchaseOrderMerger完成,模板文件: {self.template_path}")
except Exception as e:
logger.error(f"初始化PurchaseOrderMerger失败: {e}")
raise
def _load_merged_files(self) -> Dict[str, str]:
"""
@@ -73,11 +82,11 @@ class PurchaseOrderMerger:
Returns:
合并记录字典
"""
return load_json(self.cache_file, {})
return load_json(self.merged_files_json, {})
def _save_merged_files(self) -> None:
"""保存已合并文件的缓存"""
save_json(self.merged_files, self.cache_file)
save_json(self.merged_files, self.merged_files_json)
def get_purchase_orders(self) -> List[str]:
"""
+32 -27
View File
@@ -39,39 +39,44 @@ class ExcelProcessor:
提取条码、单价和数量,并按照采购单模板的格式填充
"""
def __init__(self, config: Optional[ConfigManager] = None):
def __init__(self, config):
"""
初始化Excel处理器
Args:
config: 配置管理器,如果为None则创建新的
config: 配置信息
"""
logger.info("初始化ExcelProcessor")
self.config = config or ConfigManager()
self.config = config
# 获取配置
self.output_dir = self.config.get_path('Paths', 'output_folder', 'data/output', create=True)
self.temp_dir = self.config.get_path('Paths', 'temp_folder', 'data/temp', create=True)
# 获取模板文件路径
template_folder = self.config.get('Paths', 'template_folder', 'templates')
template_name = self.config.get('Templates', 'purchase_order', '银豹-采购单模板.xls')
self.template_path = os.path.join(template_folder, template_name)
# 检查模板文件是否存在
if not os.path.exists(self.template_path):
logger.error(f"模板文件不存在: {self.template_path}")
raise FileNotFoundError(f"模板文件不存在: {self.template_path}")
# 用于记录已处理的文件
self.cache_file = os.path.join(self.output_dir, "processed_files.json")
self.processed_files = self._load_processed_files()
# 创建单位转换器
self.unit_converter = UnitConverter()
logger.info(f"初始化完成,模板文件: {self.template_path}")
# 修复ConfigParser对象没有get_path方法的问题
try:
# 获取输入和输出目录
self.output_dir = config.get('Paths', 'output_folder', fallback='data/output')
self.temp_dir = config.get('Paths', 'temp_folder', fallback='data/temp')
# 获取模板文件路径
self.template_path = config.get('Paths', 'template_file', fallback='templates/银豹-采购单模板.xls')
if not os.path.exists(self.template_path):
logger.warning(f"模板文件不存在: {self.template_path}")
# 设置缓存文件路径
self.cache_file = os.path.join(self.output_dir, "processed_files.json")
self.processed_files = self._load_processed_files()
# 确保目录存在
os.makedirs(self.output_dir, exist_ok=True)
os.makedirs(self.temp_dir, exist_ok=True)
# 记录实际路径
logger.info(f"使用输出目录: {os.path.abspath(self.output_dir)}")
logger.info(f"使用临时目录: {os.path.abspath(self.temp_dir)}")
# 加载单位转换器和配置
self.unit_converter = UnitConverter()
logger.info(f"初始化ExcelProcessor完成,模板文件: {self.template_path}")
except Exception as e:
logger.error(f"初始化ExcelProcessor失败: {e}")
raise
def _load_processed_files(self) -> Dict[str, str]:
"""
Binary file not shown.
Binary file not shown.
+41 -22
View File
@@ -105,34 +105,53 @@ class BaiduOCRClient:
百度OCR API客户端
"""
def __init__(self, config: Optional[ConfigManager] = None):
def __init__(self, config):
"""
初始化百度OCR客户端
Args:
config: 配置管理器,如果为None则创建新的
config: 配置信息
"""
self.config = config or ConfigManager()
self.config = config
# 获取配置
self.api_key = self.config.get('API', 'api_key')
self.secret_key = self.config.get('API', 'secret_key')
self.timeout = self.config.getint('API', 'timeout', 30)
self.max_retries = self.config.getint('API', 'max_retries', 3)
self.retry_delay = self.config.getint('API', 'retry_delay', 2)
self.api_url = self.config.get('API', 'api_url', 'https://aip.baidubce.com/rest/2.0/ocr/v1/table')
# 创建令牌管理器
self.token_manager = TokenManager(
self.api_key,
self.secret_key,
self.max_retries,
self.retry_delay
)
# 验证API配置
if not self.api_key or not self.secret_key:
logger.warning("API密钥未设置,请在配置文件中设置API密钥")
# 从配置中读取API信息
try:
# 修复getint调用方式
self.timeout = config.get('API', 'timeout', fallback=30)
if isinstance(self.timeout, str):
self.timeout = int(self.timeout)
self.api_key = config.get('API', 'api_key', fallback='')
self.secret_key = config.get('API', 'secret_key', fallback='')
# 使用fallback而不是位置参数
try:
self.max_retries = config.getint('API', 'max_retries', fallback=3)
except (TypeError, AttributeError):
# 如果getint不支持fallback,则使用get再转换
self.max_retries = int(config.get('API', 'max_retries', fallback='3'))
try:
self.retry_delay = config.getint('API', 'retry_delay', fallback=2)
except (TypeError, AttributeError):
# 如果getint不支持fallback,则使用get再转换
self.retry_delay = int(config.get('API', 'retry_delay', fallback='2'))
self.api_url = config.get('API', 'api_url', fallback='https://aip.baidubce.com/rest/2.0/ocr/v1/table')
# 创建令牌管理器
self.token_manager = TokenManager(
self.api_key,
self.secret_key,
self.max_retries,
self.retry_delay
)
# 验证API配置
if not self.api_key or not self.secret_key:
logger.warning("API密钥未设置,请在配置文件中设置API密钥")
except Exception as e:
logger.error(f"初始化失败: {e}")
def read_image(self, image_path: str) -> Optional[bytes]:
"""
+108 -61
View File
@@ -103,51 +103,65 @@ class ProcessedRecordManager:
class OCRProcessor:
"""
OCR处理器,用于表格识别与处理
OCR处理器,负责协调OCR识别和结果处理
"""
def __init__(self, config: Optional[ConfigManager] = None):
def __init__(self, config):
"""
初始化OCR处理器
Args:
config: 配置管理器,如果为None则创建新的
config: 配置信息
"""
self.config = config or ConfigManager()
self.config = config
# 创建百度OCR客户端
self.ocr_client = BaiduOCRClient(self.config)
# 修复ConfigParser对象没有get_path方法的问题
try:
# 获取输入和输出目录
self.input_folder = config.get('Paths', 'input_folder', fallback='data/input')
self.output_folder = config.get('Paths', 'output_folder', fallback='data/output')
self.temp_folder = config.get('Paths', 'temp_folder', fallback='data/temp')
# 确保目录存在
os.makedirs(self.input_folder, exist_ok=True)
os.makedirs(self.output_folder, exist_ok=True)
os.makedirs(self.temp_folder, exist_ok=True)
# 获取文件类型列表
allowed_extensions_str = config.get('File', 'allowed_extensions', fallback='.jpg,.jpeg,.png,.bmp')
self.file_types = [ext.strip() for ext in allowed_extensions_str.split(',') if ext.strip()]
if not self.file_types:
self.file_types = ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tif', '.tiff']
# 初始化OCR客户端
self.ocr_client = BaiduOCRClient(self.config)
# 记录实际路径
logger.info(f"使用输入目录: {os.path.abspath(self.input_folder)}")
logger.info(f"使用输出目录: {os.path.abspath(self.output_folder)}")
logger.info(f"使用临时目录: {os.path.abspath(self.temp_folder)}")
logger.info(f"允许的文件类型: {self.file_types}")
# 初始化processed_files_json和record_manager
self.processed_files_json = os.path.join(self.output_folder, 'processed_files.json')
self.record_manager = ProcessedRecordManager(self.processed_files_json)
# 加载已处理文件记录
self.processed_files = self._load_processed_files()
logger.info(f"初始化OCRProcessor完成:输入目录={self.input_folder}, 输出目录={self.output_folder}")
except Exception as e:
logger.error(f"初始化OCRProcessor失败: {e}")
raise
def _load_processed_files(self) -> Dict[str, str]:
"""
加载已处理的文件记录
# 获取配置
self.input_folder = self.config.get_path('Paths', 'input_folder', 'data/input', create=True)
self.output_folder = self.config.get_path('Paths', 'output_folder', 'data/output', create=True)
self.temp_folder = self.config.get_path('Paths', 'temp_folder', 'data/temp', create=True)
# 确保目录结构正确
for folder in [self.input_folder, self.output_folder, self.temp_folder]:
if not os.path.exists(folder):
os.makedirs(folder, exist_ok=True)
logger.info(f"创建目录: {folder}")
# 记录实际路径
logger.info(f"使用输入目录: {os.path.abspath(self.input_folder)}")
logger.info(f"使用输出目录: {os.path.abspath(self.output_folder)}")
logger.info(f"使用临时目录: {os.path.abspath(self.temp_folder)}")
self.allowed_extensions = self.config.get_list('File', 'allowed_extensions', '.jpg,.jpeg,.png,.bmp')
self.max_file_size_mb = self.config.getfloat('File', 'max_file_size_mb', 4.0)
self.excel_extension = self.config.get('File', 'excel_extension', '.xlsx')
# 处理性能配置
self.max_workers = self.config.getint('Performance', 'max_workers', 4)
self.batch_size = self.config.getint('Performance', 'batch_size', 5)
self.skip_existing = self.config.getboolean('Performance', 'skip_existing', True)
# 初始化处理记录管理器
record_file = self.config.get('Paths', 'processed_record', 'data/processed_files.json')
self.record_manager = ProcessedRecordManager(record_file)
logger.info(f"OCR处理器初始化完成,输入目录: {self.input_folder}, 输出目录: {self.output_folder}")
Returns:
已处理的文件记录字典,键为输入文件路径,值为输出文件路径
"""
return load_json(self.processed_files_json, {})
def get_unprocessed_images(self) -> List[str]:
"""
@@ -157,10 +171,16 @@ class OCRProcessor:
未处理的图片文件路径列表
"""
# 获取所有图片文件
image_files = get_files_by_extensions(self.input_folder, self.allowed_extensions)
image_files = get_files_by_extensions(self.input_folder, self.file_types)
# 如果需要跳过已存在的文件
if self.skip_existing:
skip_existing = True
try:
skip_existing = self.config.getboolean('Performance', 'skip_existing', fallback=True)
except:
pass
if skip_existing:
# 过滤已处理的文件
unprocessed_files = self.record_manager.get_unprocessed_files(image_files)
logger.info(f"找到 {len(image_files)} 个图片文件,其中 {len(unprocessed_files)} 个未处理")
@@ -186,13 +206,19 @@ class OCRProcessor:
# 检查文件扩展名
ext = get_file_extension(image_path)
if ext not in self.allowed_extensions:
if ext not in self.file_types:
logger.warning(f"不支持的文件类型: {ext}, 文件: {image_path}")
return False
# 检查文件大小
if not is_file_size_valid(image_path, self.max_file_size_mb):
logger.warning(f"文件大小超过限制 ({self.max_file_size_mb}MB): {image_path}")
max_size_mb = 4.0
try:
max_size_mb = float(self.config.get('File', 'max_file_size_mb', fallback='4.0'))
except:
pass
if not is_file_size_valid(image_path, max_size_mb):
logger.warning(f"文件大小超过限制 ({max_size_mb}MB): {image_path}")
return False
return True
@@ -211,8 +237,15 @@ class OCRProcessor:
if not self.validate_image(image_path):
return None
# 获取是否跳过已处理文件的配置
skip_existing = True
try:
skip_existing = self.config.getboolean('Performance', 'skip_existing', fallback=True)
except:
pass
# 如果需要跳过已处理的文件
if self.skip_existing and self.record_manager.is_processed(image_path):
if skip_existing and self.record_manager.is_processed(image_path):
output_file = self.record_manager.get_output_file(image_path)
logger.info(f"图片已处理,跳过: {image_path}, 输出文件: {output_file}")
return output_file
@@ -220,12 +253,19 @@ class OCRProcessor:
logger.info(f"开始处理图片: {image_path}")
try:
# 获取Excel扩展名
excel_extension = '.xlsx'
try:
excel_extension = self.config.get('File', 'excel_extension', fallback='.xlsx')
except:
pass
# 生成输出文件路径
file_name = os.path.splitext(os.path.basename(image_path))[0]
output_file = os.path.join(self.output_folder, f"{file_name}{self.excel_extension}")
output_file = os.path.join(self.output_folder, f"{file_name}{excel_extension}")
# 检查是否已存在对应的Excel文件
if os.path.exists(output_file) and self.skip_existing:
if os.path.exists(output_file) and skip_existing:
logger.info(f"已存在对应的Excel文件,跳过处理: {os.path.basename(image_path)} -> {os.path.basename(output_file)}")
# 记录处理结果
self.record_manager.mark_as_processed(image_path, output_file)
@@ -304,31 +344,38 @@ class OCRProcessor:
(总处理数, 成功处理数)元组
"""
# 使用配置值或参数值
batch_size = batch_size or self.batch_size
max_workers = max_workers or self.max_workers
if batch_size is None:
try:
batch_size = self.config.getint('Performance', 'batch_size', fallback=5)
except:
batch_size = 5
if max_workers is None:
try:
max_workers = self.config.getint('Performance', 'max_workers', fallback=4)
except:
max_workers = 4
# 获取未处理的图片
unprocessed_images = self.get_unprocessed_images()
if not unprocessed_images:
logger.warning("没有需要处理的图片")
return 0, 0
total = len(unprocessed_images)
success = 0
success_count = 0
# 按批次处理
for i in range(0, total, batch_size):
batch = unprocessed_images[i:i + batch_size]
logger.info(f"处理批次 {i//batch_size + 1}/{(total-1)//batch_size + 1}, 大小: {len(batch)}")
# 使用线程池并行处理
batch = unprocessed_images[i:i+batch_size]
logger.info(f"处理批次 {i//batch_size+1}/{(total+batch_size-1)//batch_size}: {len(batch)} 个文件")
# 使用线程处理批次
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(self.process_image, batch))
# 统计成功数
success += sum(1 for result in results if result is not None)
logger.info(f"批次处理完成, 成功: {sum(1 for result in results if result is not None)}/{len(batch)}")
logger.info(f"所有图片处理完成, 总计: {total}, 成功: {success}")
return total, success
# 统计成功数
success_count += sum(1 for result in results if result is not None)
logger.info(f"所有图片处理完成, 总计: {total}, 成功: {success_count}")
return total, success_count
Binary file not shown.
+468
View File
@@ -0,0 +1,468 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
对话框工具模块
-------------
提供各种弹窗和对话框显示功能
"""
import os
import tkinter as tk
from tkinter import messagebox, ttk
from datetime import datetime
def create_custom_dialog(title="提示", message="", result_file=None, time_info=None,
count_info=None, amount_info=None, additional_info=None):
"""
创建自定义结果对话框
Args:
title: 对话框标题
message: 主要消息
result_file: 结果文件路径(如果有)
time_info: 时间信息(如:订单时间)
count_info: 数量信息(如:处理条目数)
amount_info: 金额信息(如:总金额)
additional_info: 其他附加信息(字典格式)
Returns:
dialog: 对话框对象
"""
# 创建对话框
dialog = tk.Toplevel()
dialog.title(title)
dialog.geometry("450x320")
dialog.resizable(False, False)
# 使弹窗居中显示
center_window(dialog)
# 添加标题
tk.Label(dialog, text=message, font=("Arial", 16, "bold")).pack(pady=10)
# 创建内容框架
result_frame = tk.Frame(dialog)
result_frame.pack(pady=10, fill=tk.BOTH, expand=True)
# 添加时间、数量、金额等信息
if time_info:
tk.Label(result_frame, text=f"时间信息: {time_info}", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
if count_info:
tk.Label(result_frame, text=f"处理数量: {count_info}", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
if amount_info:
tk.Label(result_frame, text=f"金额信息: {amount_info}", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
# 添加其他附加信息
if additional_info and isinstance(additional_info, dict):
for key, value in additional_info.items():
tk.Label(result_frame, text=f"{key}: {value}", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
# 如果有结果文件,显示文件信息
if result_file and os.path.exists(result_file):
tk.Label(result_frame, text=f"输出文件: {os.path.basename(result_file)}", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
# 成功提示
tk.Label(result_frame, text="处理已成功完成!", font=("Arial", 12, "bold"), fg="#28a745").pack(pady=10)
# 文件信息框
file_frame = tk.Frame(result_frame, relief=tk.GROOVE, borderwidth=1)
file_frame.pack(fill=tk.X, padx=15, pady=5)
tk.Label(file_frame, text="文件信息", font=("Arial", 10, "bold")).pack(anchor=tk.W, padx=10, pady=5)
# 获取文件大小和时间
try:
file_size = os.path.getsize(result_file)
file_time = datetime.fromtimestamp(os.path.getmtime(result_file))
size_text = f"{file_size / 1024:.1f} KB" if file_size < 1024*1024 else f"{file_size / (1024*1024):.1f} MB"
tk.Label(file_frame, text=f"文件大小: {size_text}", font=("Arial", 10)).pack(anchor=tk.W, padx=10, pady=2)
tk.Label(file_frame, text=f"创建时间: {file_time.strftime('%Y-%m-%d %H:%M:%S')}", font=("Arial", 10)).pack(anchor=tk.W, padx=10, pady=2)
except:
tk.Label(file_frame, text="无法获取文件信息", font=("Arial", 10)).pack(anchor=tk.W, padx=10, pady=2)
# 添加按钮
button_frame = tk.Frame(dialog)
button_frame.pack(pady=10)
tk.Button(button_frame, text="打开文件", command=lambda: os.startfile(result_file)).pack(side=tk.LEFT, padx=5)
tk.Button(button_frame, text="打开所在文件夹", command=lambda: os.startfile(os.path.dirname(result_file))).pack(side=tk.LEFT, padx=5)
tk.Button(button_frame, text="关闭", command=dialog.destroy).pack(side=tk.LEFT, padx=5)
else:
# 如果没有结果文件或文件不存在
if result_file:
tk.Label(result_frame, text="未找到输出文件", font=("Arial", 12)).pack(anchor=tk.W, padx=20, pady=5)
tk.Label(result_frame, text="请检查输出目录", font=("Arial", 12, "bold"), fg="#dc3545").pack(pady=10)
# 添加按钮
button_frame = tk.Frame(dialog)
button_frame.pack(pady=10)
tk.Button(button_frame, text="打开输出目录", command=lambda: os.startfile(os.path.abspath("data/output"))).pack(side=tk.LEFT, padx=5)
tk.Button(button_frame, text="关闭", command=dialog.destroy).pack(side=tk.LEFT, padx=5)
# 确保窗口显示在最前
dialog.lift()
dialog.attributes('-topmost', True)
dialog.after_idle(lambda: dialog.attributes('-topmost', False))
return dialog
def show_custom_dialog(*args, **kwargs):
"""
显示自定义对话框
参数与create_custom_dialog相同
Returns:
dialog: 对话框对象
"""
return create_custom_dialog(*args, **kwargs)
def center_window(window):
"""使窗口居中显示"""
window.update_idletasks()
width = window.winfo_width()
height = window.winfo_height()
x = (window.winfo_screenwidth() // 2) - (width // 2)
y = (window.winfo_screenheight() // 2) - (height // 2)
window.geometry('{}x{}+{}+{}'.format(width, height, x, y))
def create_barcode_mapping_dialog(parent=None, on_save=None, current_mappings=None):
"""
创建条码映射编辑弹窗
Args:
parent: 父窗口
on_save: 保存回调函数,接收修改后的映射数据
current_mappings: 当前的映射数据
Returns:
dialog: 对话框对象
"""
dialog = tk.Toplevel(parent)
dialog.title("条码映射编辑")
dialog.geometry("600x500")
dialog.resizable(True, True)
# 使弹窗居中显示
center_window(dialog)
# 创建主框架
main_frame = tk.Frame(dialog)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 创建选项卡控件
tab_control = ttk.Notebook(main_frame)
# 创建两个选项卡页面
tab1 = tk.Frame(tab_control)
tab2 = tk.Frame(tab_control)
tab_control.add(tab1, text="条码映射")
tab_control.add(tab2, text="特殊处理")
tab_control.pack(expand=True, fill=tk.BOTH)
# ========= 条码映射选项卡 =========
# 顶部输入区域
input_frame = tk.Frame(tab1)
input_frame.pack(fill=tk.X, padx=5, pady=5)
tk.Label(input_frame, text="源条码:").grid(row=0, column=0, padx=5, pady=5)
source_entry = tk.Entry(input_frame, width=20)
source_entry.grid(row=0, column=1, padx=5, pady=5)
tk.Label(input_frame, text="目标条码:").grid(row=0, column=2, padx=5, pady=5)
target_entry = tk.Entry(input_frame, width=20)
target_entry.grid(row=0, column=3, padx=5, pady=5)
# 存储映射列表的变量
mapping_list = []
# 映射列表显示区域
list_frame = tk.Frame(tab1)
list_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
columns = ("源条码", "目标条码")
mapping_tree = ttk.Treeview(list_frame, columns=columns, show="headings", selectmode="browse")
for col in columns:
mapping_tree.heading(col, text=col)
mapping_tree.column(col, width=100)
mapping_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# 添加滚动条
scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=mapping_tree.yview)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
mapping_tree.configure(yscrollcommand=scrollbar.set)
# ========= 特殊处理选项卡 =========
# 顶部输入区域
special_input_frame = tk.Frame(tab2)
special_input_frame.pack(fill=tk.X, padx=5, pady=5)
tk.Label(special_input_frame, text="条码:").grid(row=0, column=0, padx=5, pady=5)
special_barcode_entry = tk.Entry(special_input_frame, width=20)
special_barcode_entry.grid(row=0, column=1, padx=5, pady=5)
tk.Label(special_input_frame, text="乘数:").grid(row=1, column=0, padx=5, pady=5)
multiplier_entry = tk.Entry(special_input_frame, width=10)
multiplier_entry.grid(row=1, column=1, padx=5, pady=5)
tk.Label(special_input_frame, text="目标单位:").grid(row=1, column=2, padx=5, pady=5)
unit_entry = tk.Entry(special_input_frame, width=10)
unit_entry.grid(row=1, column=3, padx=5, pady=5)
tk.Label(special_input_frame, text="固定单价:").grid(row=2, column=0, padx=5, pady=5)
price_entry = tk.Entry(special_input_frame, width=10)
price_entry.grid(row=2, column=1, padx=5, pady=5)
tk.Label(special_input_frame, text="规格:").grid(row=2, column=2, padx=5, pady=5)
spec_entry = tk.Entry(special_input_frame, width=10)
spec_entry.grid(row=2, column=3, padx=5, pady=5)
tk.Label(special_input_frame, text="描述:").grid(row=3, column=0, padx=5, pady=5)
desc_entry = tk.Entry(special_input_frame, width=40)
desc_entry.grid(row=3, column=1, columnspan=3, padx=5, pady=5)
# 特殊处理列表显示区域
special_list_frame = tk.Frame(tab2)
special_list_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
special_columns = ("条码", "乘数", "目标单位", "固定单价", "规格", "描述")
special_tree = ttk.Treeview(special_list_frame, columns=special_columns, show="headings", selectmode="browse")
for col in special_columns:
special_tree.heading(col, text=col)
special_tree.column(col, width=80)
special_tree.column("描述", width=200)
special_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# 添加滚动条
special_scrollbar = ttk.Scrollbar(special_list_frame, orient=tk.VERTICAL, command=special_tree.yview)
special_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
special_tree.configure(yscrollcommand=special_scrollbar.set)
# 存储特殊处理列表的变量
special_list = []
# 按钮区域
def add_mapping():
source = source_entry.get().strip()
target = target_entry.get().strip()
if not source or not target:
messagebox.showwarning("输入错误", "源条码和目标条码不能为空")
return
# 检查是否已存在
for item in mapping_list:
if item[0] == source:
messagebox.showwarning("重复条码", f"条码 {source} 已存在映射")
return
# 添加到列表
mapping_list.append((source, target))
mapping_tree.insert("", tk.END, values=(source, target))
# 清空输入框
source_entry.delete(0, tk.END)
target_entry.delete(0, tk.END)
def remove_mapping():
selected = mapping_tree.selection()
if not selected:
messagebox.showwarning("未选择", "请先选择要删除的条目")
return
# 获取选中项的索引
item = mapping_tree.item(selected[0])
source = item['values'][0]
# 从列表中移除
for i, (s, _) in enumerate(mapping_list):
if s == source:
mapping_list.pop(i)
break
# 从树中移除
mapping_tree.delete(selected[0])
def add_special():
barcode = special_barcode_entry.get().strip()
multiplier = multiplier_entry.get().strip()
unit = unit_entry.get().strip()
price = price_entry.get().strip()
spec = spec_entry.get().strip()
desc = desc_entry.get().strip()
if not barcode:
messagebox.showwarning("输入错误", "条码不能为空")
return
# 检查是否已存在
for item in special_list:
if item[0] == barcode:
messagebox.showwarning("重复条码", f"条码 {barcode} 已存在特殊处理")
return
# 添加到列表
special_list.append((barcode, multiplier, unit, price, spec, desc))
special_tree.insert("", tk.END, values=(barcode, multiplier, unit, price, spec, desc))
# 清空输入框
special_barcode_entry.delete(0, tk.END)
multiplier_entry.delete(0, tk.END)
unit_entry.delete(0, tk.END)
price_entry.delete(0, tk.END)
spec_entry.delete(0, tk.END)
desc_entry.delete(0, tk.END)
def remove_special():
selected = special_tree.selection()
if not selected:
messagebox.showwarning("未选择", "请先选择要删除的条目")
return
# 获取选中项的索引
item = special_tree.item(selected[0])
barcode = item['values'][0]
# 从列表中移除
for i, (b, _, _, _, _, _) in enumerate(special_list):
if b == barcode:
special_list.pop(i)
break
# 从树中移除
special_tree.delete(selected[0])
# 条码映射按钮
btn_frame = tk.Frame(tab1)
btn_frame.pack(fill=tk.X, padx=5, pady=5)
add_btn = tk.Button(btn_frame, text="添加映射", command=add_mapping)
add_btn.pack(side=tk.LEFT, padx=5)
remove_btn = tk.Button(btn_frame, text="删除映射", command=remove_mapping)
remove_btn.pack(side=tk.LEFT, padx=5)
# 特殊处理按钮
special_btn_frame = tk.Frame(tab2)
special_btn_frame.pack(fill=tk.X, padx=5, pady=5)
add_special_btn = tk.Button(special_btn_frame, text="添加特殊处理", command=add_special)
add_special_btn.pack(side=tk.LEFT, padx=5)
remove_special_btn = tk.Button(special_btn_frame, text="删除特殊处理", command=remove_special)
remove_special_btn.pack(side=tk.LEFT, padx=5)
# 底部按钮区域
bottom_frame = tk.Frame(dialog)
bottom_frame.pack(fill=tk.X, padx=10, pady=10)
def save_mappings():
# 构建保存数据
mappings = {}
# 添加条码映射
for source, target in mapping_list:
mappings[source] = {
'map_to': target,
'description': f'条码映射:{source} -> {target}'
}
# 添加特殊处理
for barcode, multiplier, unit, price, spec, desc in special_list:
mappings[barcode] = {}
if multiplier:
try:
# 安全地转换multiplier为数字
if isinstance(multiplier, str):
if '.' in multiplier:
mappings[barcode]['multiplier'] = float(multiplier)
else:
mappings[barcode]['multiplier'] = int(multiplier)
else:
# 已经是数字类型
mappings[barcode]['multiplier'] = multiplier
except ValueError:
# 如果转换失败,保持原始字符串
mappings[barcode]['multiplier'] = multiplier
if unit:
mappings[barcode]['target_unit'] = unit
if price:
try:
# 安全地转换price为浮点数
mappings[barcode]['fixed_price'] = float(price)
except ValueError:
# 如果转换失败,保持原始字符串
mappings[barcode]['fixed_price'] = price
if spec:
mappings[barcode]['specification'] = spec
if desc:
mappings[barcode]['description'] = desc
# 调用保存回调
if on_save:
on_save(mappings)
messagebox.showinfo("保存成功", f"已保存{len(mapping_list)}个条码映射和{len(special_list)}个特殊处理规则")
dialog.destroy()
def cancel():
dialog.destroy()
save_btn = tk.Button(bottom_frame, text="保存", command=save_mappings)
save_btn.pack(side=tk.RIGHT, padx=5)
cancel_btn = tk.Button(bottom_frame, text="取消", command=cancel)
cancel_btn.pack(side=tk.RIGHT, padx=5)
# 导入当前映射数据
if current_mappings:
for barcode, data in current_mappings.items():
if 'map_to' in data:
# 这是条码映射
mapping_list.append((barcode, data['map_to']))
mapping_tree.insert("", tk.END, values=(barcode, data['map_to']))
else:
# 这是特殊处理
multiplier = data.get('multiplier', '')
unit = data.get('target_unit', '')
price = data.get('fixed_price', '')
spec = data.get('specification', '')
desc = data.get('description', '')
special_list.append((barcode, multiplier, unit, price, spec, desc))
special_tree.insert("", tk.END, values=(barcode, multiplier, unit, price, spec, desc))
# 确保窗口显示在最前
dialog.transient(parent)
dialog.grab_set()
return dialog
def show_barcode_mapping_dialog(*args, **kwargs):
"""
显示条码映射编辑弹窗
参数与create_barcode_mapping_dialog相同
Returns:
dialog: 对话框对象
"""
# 确保已导入ttk
import tkinter.ttk as ttk
return create_barcode_mapping_dialog(*args, **kwargs)
+49
View File
@@ -97,6 +97,36 @@ def get_logger(name: str) -> logging.Logger:
return setup_logger(name)
return logger
def set_log_level(level: str) -> None:
"""
设置所有日志记录器的级别
Args:
level: 日志级别(DEBUG, INFO, WARNING, ERROR, CRITICAL)
"""
level_map = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL
}
# 获取对应的日志级别
log_level = level_map.get(level.lower(), logging.INFO)
# 获取所有记录器
loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
# 设置每个记录器的级别
for logger in loggers:
logger.setLevel(log_level)
# 设置根记录器的级别
logging.getLogger().setLevel(log_level)
print(f"所有日志记录器级别已设置为: {logging.getLevelName(log_level)}")
def close_logger(name: str) -> None:
"""
关闭日志记录器的所有处理器
@@ -113,6 +143,25 @@ def close_logger(name: str) -> None:
_handlers.pop(f"{name}_file", None)
_handlers.pop(f"{name}_console", None)
def close_all_loggers() -> None:
"""
关闭所有日志记录器的处理器
"""
# 获取所有记录器
loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
# 关闭每个记录器的处理器
for logger in loggers:
if hasattr(logger, 'handlers'):
for handler in logger.handlers[:]:
handler.close()
logger.removeHandler(handler)
# 清空处理器缓存
_handlers.clear()
print("所有日志记录器已关闭")
def cleanup_active_marker(name: str) -> None:
"""
清理日志活跃标记
Binary file not shown.
+15
View File
@@ -75,6 +75,21 @@ class OCRService:
logger.info(f"OCRService开始批量处理图片, batch_size={batch_size}, max_workers={max_workers}")
return self.ocr_processor.process_images_batch(batch_size, max_workers)
# 添加batch_process作为process_images_batch的别名,确保兼容性
def batch_process(self, batch_size: int = None, max_workers: int = None) -> Tuple[int, int]:
"""
批量处理图片(别名方法,与process_images_batch功能相同)
Args:
batch_size: 批处理大小
max_workers: 最大线程数
Returns:
(总处理数, 成功处理数)元组
"""
logger.info(f"OCRService.batch_process被调用,转发到process_images_batch")
return self.process_images_batch(batch_size, max_workers)
def validate_image(self, image_path: str) -> bool:
"""
验证图片是否有效
+255
View File
@@ -0,0 +1,255 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
烟草公司订单处理服务
----------------
处理烟草公司特定格式的订单明细文件,生成银豹采购单
"""
import os
import glob
import datetime
import pandas as pd
import xlrd
import xlwt
import re
from xlutils.copy import copy
from openpyxl import load_workbook
from typing import Optional, Dict, Any, List, Tuple
from app.core.utils.log_utils import get_logger
from app.core.utils.dialog_utils import show_custom_dialog # 导入自定义弹窗工具
from ..config.settings import ConfigManager
logger = get_logger(__name__)
class TobaccoService:
"""烟草公司订单处理服务"""
def __init__(self, config: Dict[str, Any]):
"""
初始化服务
Args:
config: 配置信息
"""
self.config = config
# 修复配置获取方式,使用fallback机制
self.output_dir = config.get('Paths', 'output_folder', fallback='data/output')
self.template_file = config.get('Paths', 'template_file', fallback='templates/银豹-采购单模板.xls')
self.output_file = os.path.join(self.output_dir, '银豹采购单_烟草公司.xls')
def get_latest_tobacco_order(self) -> Optional[str]:
"""
获取最新的烟草订单明细文件
Returns:
文件路径或None
"""
# 获取今日开始时间戳
today = datetime.date.today()
today_start = datetime.datetime.combine(today, datetime.time.min).timestamp()
# 查找订单明细文件
file_pattern = os.path.join(self.output_dir, "订单明细*.xlsx")
candidates = glob.glob(file_pattern)
if not candidates:
logger.warning("未找到烟草公司订单明细文件")
return None
# 按创建时间排序
candidates.sort(key=os.path.getctime, reverse=True)
latest_file = candidates[0]
# 检查是否是今天的文件
if os.path.getctime(latest_file) >= today_start:
logger.info(f"找到最新烟草订单明细文件: {latest_file}")
return latest_file
else:
logger.warning(f"找到的烟草订单明细文件不是今天创建的: {latest_file}")
return latest_file # 仍然返回最新文件,但给出警告
def process_tobacco_order(self, input_file=None):
"""
处理烟草订单
Args:
input_file: 输入文件路径,如果为None则自动查找最新文件
Returns:
输出文件路径或None(如果处理失败)
"""
try:
# 如果没有指定输入文件,查找最新的文件
if input_file is None:
input_file = self.get_latest_tobacco_order()
if input_file is None:
logger.warning("未找到烟草公司订单明细文件")
logger.error("未找到可处理的烟草订单明细文件")
return None
logger.info(f"开始处理烟草公司订单: {input_file}")
# 读取订单时间和总金额
order_info = self._read_order_info(input_file)
if not order_info:
logger.error(f"读取订单信息失败: {input_file}")
return None
order_time, total_amount = order_info
# 读取订单数据
order_data = self._read_order_data(input_file)
if not order_data:
logger.error(f"读取订单数据失败: {input_file}")
return None
# 生成银豹采购单
output_file = self._generate_pospal_order(order_data, order_time)
if not output_file:
logger.error("生成银豹采购单失败")
return None
# 获取处理条目数
total_count = len(order_data)
# 输出处理结果
logger.info(f"烟草公司订单处理成功,订单时间: {order_time}, 总金额: {total_amount}, 处理条目: {total_count}")
logger.info(f"采购单已生成: {output_file}")
# 显示处理结果对话框
self.show_result_dialog(output_file, order_time, total_count, total_amount)
return output_file
except Exception as e:
logger.error(f"处理烟草公司订单时发生错误: {e}", exc_info=True)
return None
def _read_order_info(self, file_path: str) -> Optional[Tuple[str, float]]:
"""
读取订单信息(时间和总金额)
Args:
file_path: 文件路径
Returns:
包含订单时间和总金额的元组或None
"""
try:
wb_info = load_workbook(file_path, data_only=True)
ws_info = wb_info.active
order_time = ws_info["H1"].value or "(空)"
total_amount = ws_info["H3"].value or 0
return (order_time, total_amount)
except Exception as e:
logger.error(f"读取订单信息出错: {e}")
return None
def _read_order_data(self, file_path: str) -> Optional[pd.DataFrame]:
"""
读取订单数据
Args:
file_path: 文件路径
Returns:
订单数据DataFrame或None
"""
columns = ['商品', '盒码', '条码', '建议零售价', '批发价', '需求量', '订单量', '金额']
try:
# 读取Excel文件
df_old = pd.read_excel(file_path, header=None, skiprows=3, names=columns)
# 过滤订单量不为0的数据,并计算采购量和单价
df_filtered = df_old[df_old['订单量'] != 0].copy()
df_filtered['采购量'] = df_filtered['订单量'] * 10
df_filtered['采购单价'] = df_filtered['金额'] / df_filtered['采购量']
df_filtered = df_filtered.reset_index(drop=True)
return df_filtered
except Exception as e:
logger.error(f"读取订单数据失败: {e}")
return None
def _generate_pospal_order(self, order_data: pd.DataFrame, order_time: str) -> Optional[str]:
"""
生成银豹采购单
Args:
order_data: 订单数据
order_time: 订单时间
Returns:
输出文件路径或None
"""
try:
# 检查模板文件是否存在
if not os.path.exists(self.template_file):
logger.error(f"采购单模板文件不存在: {self.template_file}")
return None
# 打开模板,准备写入
template_rd = xlrd.open_workbook(self.template_file, formatting_info=True)
template_wb = copy(template_rd)
template_ws = template_wb.get_sheet(0)
# 获取模板中的表头列索引
header_row = template_rd.sheet_by_index(0).row_values(0)
barcode_col = header_row.index("条码(必填)")
amount_col = header_row.index("采购量(必填)")
gift_col = header_row.index("赠送量")
price_col = header_row.index("采购单价(必填)")
# 写入数据到模板
for i, row in order_data.iterrows():
template_ws.write(i + 1, barcode_col, row['盒码']) # 商品条码
template_ws.write(i + 1, amount_col, int(row['采购量'])) # 采购量
template_ws.write(i + 1, gift_col, "") # 赠送量为空
template_ws.write(i + 1, price_col, round(row['采购单价'], 2)) # 采购单价保留两位小数
# 确保输出目录存在
os.makedirs(os.path.dirname(self.output_file), exist_ok=True)
# 保存输出文件
template_wb.save(self.output_file)
logger.info(f"采购单生成成功: {self.output_file}")
return self.output_file
except Exception as e:
logger.error(f"生成银豹采购单失败: {e}")
return None
def show_result_dialog(self, output_file, order_time, total_count, total_amount):
"""
显示处理结果对话框
Args:
output_file: 输出文件路径
order_time: 订单时间
total_count: 总处理条目
total_amount: 总金额
"""
# 创建附加信息
additional_info = {
"订单来源": "烟草公司",
"处理时间": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# 显示自定义对话框
show_custom_dialog(
title="烟草订单处理结果",
message="烟草订单处理完成",
result_file=output_file,
time_info=order_time,
count_info=f"{total_count}个商品",
amount_info=f"¥{total_amount:.2f}",
additional_info=additional_info
)
# 记录日志
logger.info(f"烟草公司订单处理成功,订单时间: {order_time}, 总金额: {total_amount}, 处理条目: {total_count}")