feat(ui): simplify interface by removing dedicated tobacco/rongcheng buttons and optimizing auto-routing
This commit is contained in:
@@ -1,5 +0,0 @@
|
||||
"""
|
||||
OCR订单处理系统 - 命令行接口
|
||||
-------------------------
|
||||
提供命令行工具,便于用户使用系统功能。
|
||||
"""
|
||||
@@ -1,138 +0,0 @@
|
||||
"""
|
||||
Excel处理命令行工具
|
||||
---------------
|
||||
提供Excel处理相关的命令行接口。
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from typing import List, Optional
|
||||
|
||||
from ..config.settings import ConfigManager
|
||||
from ..core.utils.log_utils import get_logger, close_logger
|
||||
from ..services.order_service import OrderService
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
def create_parser() -> argparse.ArgumentParser:
|
||||
"""
|
||||
创建命令行参数解析器
|
||||
|
||||
Returns:
|
||||
参数解析器
|
||||
"""
|
||||
parser = argparse.ArgumentParser(description='Excel处理工具')
|
||||
|
||||
# 通用选项
|
||||
parser.add_argument('--config', type=str, help='配置文件路径')
|
||||
|
||||
# 子命令
|
||||
subparsers = parser.add_subparsers(dest='command', help='子命令')
|
||||
|
||||
# 处理Excel命令
|
||||
process_parser = subparsers.add_parser('process', help='处理Excel文件')
|
||||
process_parser.add_argument('--input', type=str, help='输入Excel文件路径,如果不指定则处理最新的文件')
|
||||
|
||||
# 查看命令
|
||||
list_parser = subparsers.add_parser('list', help='获取最新的Excel文件')
|
||||
|
||||
return parser
|
||||
|
||||
def process_excel(order_service: OrderService, input_file: Optional[str] = None) -> bool:
|
||||
"""
|
||||
处理Excel文件
|
||||
|
||||
Args:
|
||||
order_service: 订单服务
|
||||
input_file: 输入文件路径,如果为None则处理最新的文件
|
||||
|
||||
Returns:
|
||||
处理是否成功
|
||||
"""
|
||||
if input_file:
|
||||
if not os.path.exists(input_file):
|
||||
logger.error(f"输入文件不存在: {input_file}")
|
||||
return False
|
||||
|
||||
result = order_service.process_excel(input_file)
|
||||
else:
|
||||
latest_file = order_service.get_latest_excel()
|
||||
if not latest_file:
|
||||
logger.warning("未找到可处理的Excel文件")
|
||||
return False
|
||||
|
||||
logger.info(f"处理最新的Excel文件: {latest_file}")
|
||||
result = order_service.process_excel(latest_file)
|
||||
|
||||
if result:
|
||||
logger.info(f"处理成功,输出文件: {result}")
|
||||
return True
|
||||
else:
|
||||
logger.error("处理失败")
|
||||
return False
|
||||
|
||||
def list_latest_excel(order_service: OrderService) -> bool:
|
||||
"""
|
||||
获取最新的Excel文件
|
||||
|
||||
Args:
|
||||
order_service: 订单服务
|
||||
|
||||
Returns:
|
||||
是否找到Excel文件
|
||||
"""
|
||||
latest_file = order_service.get_latest_excel()
|
||||
|
||||
if latest_file:
|
||||
logger.info(f"最新的Excel文件: {latest_file}")
|
||||
return True
|
||||
else:
|
||||
logger.info("未找到Excel文件")
|
||||
return False
|
||||
|
||||
def main(args: Optional[List[str]] = None) -> int:
|
||||
"""
|
||||
Excel处理命令行主函数
|
||||
|
||||
Args:
|
||||
args: 命令行参数,如果为None则使用sys.argv
|
||||
|
||||
Returns:
|
||||
退出状态码
|
||||
"""
|
||||
parser = create_parser()
|
||||
parsed_args = parser.parse_args(args)
|
||||
|
||||
if parsed_args.command is None:
|
||||
parser.print_help()
|
||||
return 1
|
||||
|
||||
try:
|
||||
# 创建配置管理器
|
||||
config = ConfigManager(parsed_args.config) if parsed_args.config else ConfigManager()
|
||||
|
||||
# 创建订单服务
|
||||
order_service = OrderService(config)
|
||||
|
||||
# 根据命令执行不同功能
|
||||
if parsed_args.command == 'process':
|
||||
success = process_excel(order_service, parsed_args.input)
|
||||
elif parsed_args.command == 'list':
|
||||
success = list_latest_excel(order_service)
|
||||
else:
|
||||
parser.print_help()
|
||||
return 1
|
||||
|
||||
return 0 if success else 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"执行过程中发生错误: {e}")
|
||||
return 1
|
||||
|
||||
finally:
|
||||
# 关闭日志
|
||||
close_logger(__name__)
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
||||
@@ -1,147 +0,0 @@
|
||||
"""
|
||||
订单合并命令行工具
|
||||
--------------
|
||||
提供订单合并相关的命令行接口。
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from typing import List, Optional
|
||||
|
||||
from ..config.settings import ConfigManager
|
||||
from ..core.utils.log_utils import get_logger, close_logger
|
||||
from ..services.order_service import OrderService
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
def create_parser() -> argparse.ArgumentParser:
|
||||
"""
|
||||
创建命令行参数解析器
|
||||
|
||||
Returns:
|
||||
参数解析器
|
||||
"""
|
||||
parser = argparse.ArgumentParser(description='订单合并工具')
|
||||
|
||||
# 通用选项
|
||||
parser.add_argument('--config', type=str, help='配置文件路径')
|
||||
|
||||
# 子命令
|
||||
subparsers = parser.add_subparsers(dest='command', help='子命令')
|
||||
|
||||
# 合并命令
|
||||
merge_parser = subparsers.add_parser('merge', help='合并采购单')
|
||||
merge_parser.add_argument('--input', type=str, help='输入采购单文件路径列表,以逗号分隔,如果不指定则合并所有采购单')
|
||||
|
||||
# 列出采购单命令
|
||||
list_parser = subparsers.add_parser('list', help='列出采购单文件')
|
||||
|
||||
return parser
|
||||
|
||||
def merge_orders(order_service: OrderService, input_files: Optional[str] = None) -> bool:
|
||||
"""
|
||||
合并采购单
|
||||
|
||||
Args:
|
||||
order_service: 订单服务
|
||||
input_files: 输入文件路径列表,以逗号分隔,如果为None则合并所有采购单
|
||||
|
||||
Returns:
|
||||
合并是否成功
|
||||
"""
|
||||
if input_files:
|
||||
# 分割输入文件列表
|
||||
file_paths = [path.strip() for path in input_files.split(',')]
|
||||
|
||||
# 检查文件是否存在
|
||||
for path in file_paths:
|
||||
if not os.path.exists(path):
|
||||
logger.error(f"输入文件不存在: {path}")
|
||||
return False
|
||||
|
||||
result = order_service.merge_orders(file_paths)
|
||||
else:
|
||||
# 获取所有采购单文件
|
||||
file_paths = order_service.get_purchase_orders()
|
||||
if not file_paths:
|
||||
logger.warning("未找到采购单文件")
|
||||
return False
|
||||
|
||||
logger.info(f"合并 {len(file_paths)} 个采购单文件")
|
||||
result = order_service.merge_orders()
|
||||
|
||||
if result:
|
||||
logger.info(f"合并成功,输出文件: {result}")
|
||||
return True
|
||||
else:
|
||||
logger.error("合并失败")
|
||||
return False
|
||||
|
||||
def list_purchase_orders(order_service: OrderService) -> bool:
|
||||
"""
|
||||
列出采购单文件
|
||||
|
||||
Args:
|
||||
order_service: 订单服务
|
||||
|
||||
Returns:
|
||||
是否有采购单文件
|
||||
"""
|
||||
files = order_service.get_purchase_orders()
|
||||
|
||||
if not files:
|
||||
logger.info("未找到采购单文件")
|
||||
return False
|
||||
|
||||
logger.info(f"采购单文件 ({len(files)}):")
|
||||
for file in files:
|
||||
logger.info(f" {file}")
|
||||
|
||||
return True
|
||||
|
||||
def main(args: Optional[List[str]] = None) -> int:
|
||||
"""
|
||||
订单合并命令行主函数
|
||||
|
||||
Args:
|
||||
args: 命令行参数,如果为None则使用sys.argv
|
||||
|
||||
Returns:
|
||||
退出状态码
|
||||
"""
|
||||
parser = create_parser()
|
||||
parsed_args = parser.parse_args(args)
|
||||
|
||||
if parsed_args.command is None:
|
||||
parser.print_help()
|
||||
return 1
|
||||
|
||||
try:
|
||||
# 创建配置管理器
|
||||
config = ConfigManager(parsed_args.config) if parsed_args.config else ConfigManager()
|
||||
|
||||
# 创建订单服务
|
||||
order_service = OrderService(config)
|
||||
|
||||
# 根据命令执行不同功能
|
||||
if parsed_args.command == 'merge':
|
||||
success = merge_orders(order_service, parsed_args.input)
|
||||
elif parsed_args.command == 'list':
|
||||
success = list_purchase_orders(order_service)
|
||||
else:
|
||||
parser.print_help()
|
||||
return 1
|
||||
|
||||
return 0 if success else 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"执行过程中发生错误: {e}")
|
||||
return 1
|
||||
|
||||
finally:
|
||||
# 关闭日志
|
||||
close_logger(__name__)
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
||||
@@ -1,164 +0,0 @@
|
||||
"""
|
||||
OCR命令行工具
|
||||
----------
|
||||
提供OCR识别相关的命令行接口。
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from typing import List, Optional
|
||||
|
||||
from ..config.settings import ConfigManager
|
||||
from ..core.utils.log_utils import get_logger, close_logger
|
||||
from ..services.ocr_service import OCRService
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
def create_parser() -> argparse.ArgumentParser:
|
||||
"""
|
||||
创建命令行参数解析器
|
||||
|
||||
Returns:
|
||||
参数解析器
|
||||
"""
|
||||
parser = argparse.ArgumentParser(description='OCR识别工具')
|
||||
|
||||
# 通用选项
|
||||
parser.add_argument('--config', type=str, help='配置文件路径')
|
||||
|
||||
# 子命令
|
||||
subparsers = parser.add_subparsers(dest='command', help='子命令')
|
||||
|
||||
# 单文件处理命令
|
||||
process_parser = subparsers.add_parser('process', help='处理单个文件')
|
||||
process_parser.add_argument('--input', type=str, required=True, help='输入图片文件路径')
|
||||
|
||||
# 批量处理命令
|
||||
batch_parser = subparsers.add_parser('batch', help='批量处理文件')
|
||||
batch_parser.add_argument('--batch-size', type=int, help='批处理大小')
|
||||
batch_parser.add_argument('--max-workers', type=int, help='最大线程数')
|
||||
|
||||
# 查看未处理文件命令
|
||||
list_parser = subparsers.add_parser('list', help='列出未处理的文件')
|
||||
|
||||
return parser
|
||||
|
||||
def process_file(ocr_service: OCRService, input_file: str) -> bool:
|
||||
"""
|
||||
处理单个文件
|
||||
|
||||
Args:
|
||||
ocr_service: OCR服务
|
||||
input_file: 输入文件路径
|
||||
|
||||
Returns:
|
||||
处理是否成功
|
||||
"""
|
||||
if not os.path.exists(input_file):
|
||||
logger.error(f"输入文件不存在: {input_file}")
|
||||
return False
|
||||
|
||||
if not ocr_service.validate_image(input_file):
|
||||
logger.error(f"输入文件无效: {input_file}")
|
||||
return False
|
||||
|
||||
result = ocr_service.process_image(input_file)
|
||||
|
||||
if result:
|
||||
logger.info(f"处理成功,输出文件: {result}")
|
||||
return True
|
||||
else:
|
||||
logger.error("处理失败")
|
||||
return False
|
||||
|
||||
def process_batch(ocr_service: OCRService, batch_size: Optional[int] = None, max_workers: Optional[int] = None) -> bool:
|
||||
"""
|
||||
批量处理文件
|
||||
|
||||
Args:
|
||||
ocr_service: OCR服务
|
||||
batch_size: 批处理大小
|
||||
max_workers: 最大线程数
|
||||
|
||||
Returns:
|
||||
处理是否成功
|
||||
"""
|
||||
total, success = ocr_service.process_images_batch(batch_size, max_workers)
|
||||
|
||||
if total == 0:
|
||||
logger.warning("没有找到需要处理的文件")
|
||||
return False
|
||||
|
||||
logger.info(f"批量处理完成,总计: {total},成功: {success}")
|
||||
return success > 0
|
||||
|
||||
def list_unprocessed(ocr_service: OCRService) -> bool:
|
||||
"""
|
||||
列出未处理的文件
|
||||
|
||||
Args:
|
||||
ocr_service: OCR服务
|
||||
|
||||
Returns:
|
||||
是否有未处理的文件
|
||||
"""
|
||||
files = ocr_service.get_unprocessed_images()
|
||||
|
||||
if not files:
|
||||
logger.info("没有未处理的文件")
|
||||
return False
|
||||
|
||||
logger.info(f"未处理的文件 ({len(files)}):")
|
||||
for file in files:
|
||||
logger.info(f" {file}")
|
||||
|
||||
return True
|
||||
|
||||
def main(args: Optional[List[str]] = None) -> int:
|
||||
"""
|
||||
OCR命令行主函数
|
||||
|
||||
Args:
|
||||
args: 命令行参数,如果为None则使用sys.argv
|
||||
|
||||
Returns:
|
||||
退出状态码
|
||||
"""
|
||||
parser = create_parser()
|
||||
parsed_args = parser.parse_args(args)
|
||||
|
||||
if parsed_args.command is None:
|
||||
parser.print_help()
|
||||
return 1
|
||||
|
||||
try:
|
||||
# 创建配置管理器
|
||||
config = ConfigManager(parsed_args.config) if parsed_args.config else ConfigManager()
|
||||
|
||||
# 创建OCR服务
|
||||
ocr_service = OCRService(config)
|
||||
|
||||
# 根据命令执行不同功能
|
||||
if parsed_args.command == 'process':
|
||||
success = process_file(ocr_service, parsed_args.input)
|
||||
elif parsed_args.command == 'batch':
|
||||
success = process_batch(ocr_service, parsed_args.batch_size, parsed_args.max_workers)
|
||||
elif parsed_args.command == 'list':
|
||||
success = list_unprocessed(ocr_service)
|
||||
else:
|
||||
parser.print_help()
|
||||
return 1
|
||||
|
||||
return 0 if success else 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"执行过程中发生错误: {e}")
|
||||
return 1
|
||||
|
||||
finally:
|
||||
# 关闭日志
|
||||
close_logger(__name__)
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
||||
@@ -113,11 +113,21 @@ class OrderService:
|
||||
return special_svc.preprocess_rongcheng_yigou(file_path)
|
||||
|
||||
# 3. 识别:杨碧月 (Yang Biyue)
|
||||
from .special_suppliers_service import SpecialSuppliersService
|
||||
special_svc = SpecialSuppliersService(self.config)
|
||||
# 我们直接复用 SpecialSuppliersService 里的逻辑,但要确保它只返回路径
|
||||
# 修改 SpecialSuppliersService.process_yang_biyue 使其支持仅返回预处理路径
|
||||
return special_svc.process_yang_biyue_only(file_path)
|
||||
# 特征:经手人列包含“杨碧月”
|
||||
handler_col = None
|
||||
for col in df_head.columns:
|
||||
# 在前50行中搜索“经手人”关键字
|
||||
if df_head[col].astype(str).str.contains('经手人').any():
|
||||
handler_col = col
|
||||
break
|
||||
|
||||
if handler_col is not None:
|
||||
# 检查该列是否有“杨碧月”
|
||||
if df_head[handler_col].astype(str).str.contains('杨碧月').any():
|
||||
logger.info("识别到杨碧月订单,执行专用预处理...")
|
||||
from .special_suppliers_service import SpecialSuppliersService
|
||||
special_svc = SpecialSuppliersService(self.config)
|
||||
return special_svc.process_yang_biyue_only(file_path)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"智能预处理识别失败: {e}")
|
||||
|
||||
@@ -37,24 +37,41 @@ class SpecialSuppliersService:
|
||||
if handler_col is None or not df[handler_col].astype(str).str.contains('杨碧月').any():
|
||||
return None
|
||||
|
||||
# 识别到杨碧月订单,执行专用清洗
|
||||
logger.info("识别到杨碧月订单,正在执行专用清洗...")
|
||||
|
||||
# 定义列映射关系
|
||||
# 定义列映射关系 (映射到 ExcelProcessor 期望的中文列名)
|
||||
# 使用精确匹配优先,防止“结算单位”匹配到“单位”
|
||||
column_map = {
|
||||
'商品条码': 'barcode',
|
||||
'商品名称': 'name',
|
||||
'商品规格': 'specification',
|
||||
'单位': 'unit',
|
||||
'数量': 'quantity',
|
||||
'含税单价': 'unit_price',
|
||||
'含税金额': 'total_price'
|
||||
'商品条码': '商品条码',
|
||||
'商品名称': '商品名称',
|
||||
'商品规格': '规格',
|
||||
'单位': '单位',
|
||||
'数量': '数量',
|
||||
'含税单价': '单价',
|
||||
'含税金额': '金额'
|
||||
}
|
||||
|
||||
# 提取并重命名列
|
||||
found_cols = {}
|
||||
# 1. 第一遍:尝试精确匹配
|
||||
for target_zh, std_name in column_map.items():
|
||||
for col in df.columns:
|
||||
if target_zh in str(col):
|
||||
if str(col).strip() == target_zh:
|
||||
found_cols[col] = std_name
|
||||
break
|
||||
|
||||
# 2. 第二遍:对未匹配成功的列尝试模糊匹配(但要排除特定干扰词)
|
||||
for target_zh, std_name in column_map.items():
|
||||
if std_name in found_cols.values():
|
||||
continue
|
||||
for col in df.columns:
|
||||
col_str = str(col)
|
||||
if target_zh in col_str:
|
||||
# 排除干扰列
|
||||
if target_zh == '单位' and '结算单位' in col_str:
|
||||
continue
|
||||
if target_zh == '数量' and '基本单位数量' in col_str:
|
||||
continue
|
||||
found_cols[col] = std_name
|
||||
break
|
||||
|
||||
@@ -66,7 +83,7 @@ class SpecialSuppliersService:
|
||||
df_clean = df_clean.rename(columns=found_cols)
|
||||
|
||||
# 过滤掉空的条码行
|
||||
df_clean = df_clean.dropna(subset=['barcode'])
|
||||
df_clean = df_clean.dropna(subset=['商品条码'])
|
||||
|
||||
# 保存预处理文件
|
||||
out_dir = os.path.dirname(src_path)
|
||||
|
||||
Reference in New Issue
Block a user