orc-order-v2/app/services/order_service.py

252 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
订单服务模块
---------
提供订单处理服务协调Excel处理和订单合并流程。
"""
import os
from typing import Dict, List, Optional, Tuple, Union, Any, Callable
from ..config.settings import ConfigManager
from ..core.utils.log_utils import get_logger
from ..core.excel.processor import ExcelProcessor
from ..core.excel.merger import PurchaseOrderMerger
logger = get_logger(__name__)
class OrderService:
"""
订单服务协调Excel处理和订单合并流程
"""
def __init__(self, config: Optional[ConfigManager] = None):
"""
初始化订单服务
Args:
config: 配置管理器如果为None则创建新的
"""
logger.info("初始化OrderService")
self.config = config or ConfigManager()
# 创建Excel处理器和采购单合并器
self.excel_processor = ExcelProcessor(self.config)
self.order_merger = PurchaseOrderMerger(self.config)
logger.info("OrderService初始化完成")
def get_latest_excel(self) -> Optional[str]:
"""
获取最新的Excel文件
Returns:
最新Excel文件路径如果未找到则返回None
"""
return self.excel_processor.get_latest_excel()
def process_excel(self, file_path: Optional[str] = None, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]:
"""
处理Excel订单文件生成标准采购单
Args:
file_path: Excel文件路径如果为None则处理最新的文件
Returns:
输出采购单文件路径如果处理失败则返回None
"""
if not file_path:
file_path = self.excel_processor.get_latest_excel()
if not file_path:
logger.warning("未找到可处理的Excel文件")
return None
logger.info("OrderService开始处理最新Excel文件")
else:
logger.info(f"OrderService开始处理指定Excel文件: {file_path}")
# 检查是否需要特殊的供应商预处理(如杨碧月)
try:
from .special_suppliers_service import SpecialSuppliersService
special_service = SpecialSuppliersService(self.config)
# 尝试识别并预处理(注意:这里不再传入 progress_cb 避免无限递归或重复进度条,
# 或者我们在 special_service 内部逻辑中处理完后直接返回结果)
# 为了避免循环调用,我们在 SpecialSuppliersService 内部不再调用 process_excel
# 而是让 process_excel 识别后自己决定是否处理预处理后的文件。
# 我们新增一个 check_and_preprocess 方法
preprocessed_path = self._check_special_preprocess(file_path)
if preprocessed_path:
logger.info(f"检测到特殊供应商,已生成预处理文件: {preprocessed_path}")
file_path = preprocessed_path
except Exception as e:
logger.error(f"检查特殊预处理时出错: {e}")
return self.excel_processor.process_specific_file(file_path, progress_cb=progress_cb)
def _check_special_preprocess(self, file_path: str) -> Optional[str]:
"""检查并执行特殊的预处理(支持杨碧月、烟草公司、蓉城易购)"""
try:
from app.core.utils.file_utils import smart_read_excel
import pandas as pd
import re
# 仅读取前 50 行进行智能识别 (header=None 确保能读到第一行内容)
df_head = smart_read_excel(file_path, nrows=50, header=None)
df_str = df_head.astype(str)
# 1. 识别:烟草公司 (Tobacco)
# 特征内容中包含“专卖证号”或特定证号“510109104938”
is_tobacco = df_str.apply(lambda x: x.str.contains('专卖证号|510109104938')).any().any()
if is_tobacco:
logger.info("识别到烟草公司订单,执行专用预处理...")
from .tobacco_service import TobaccoService
tobacco_svc = TobaccoService(self.config)
return tobacco_svc.preprocess_tobacco_order(file_path)
# 2. 识别:蓉城易购 (Rongcheng Yigou)
# 特征内容中包含单号标识“RCDH”
is_rongcheng = df_str.apply(lambda x: x.str.contains('RCDH')).any().any()
if is_rongcheng:
logger.info("识别到蓉城易购订单,执行专用预处理...")
from .special_suppliers_service import SpecialSuppliersService
special_svc = SpecialSuppliersService(self.config)
return special_svc.preprocess_rongcheng_yigou(file_path)
# 3. 识别:杨碧月 (Yang Biyue)
# 特征:经手人列包含“杨碧月”
handler_col = None
for col in df_head.columns:
# 在前50行中搜索“经手人”关键字
if df_head[col].astype(str).str.contains('经手人').any():
handler_col = col
break
if handler_col is not None:
# 检查该列是否有“杨碧月”
if df_head[handler_col].astype(str).str.contains('杨碧月').any():
logger.info("识别到杨碧月订单,执行专用预处理...")
from .special_suppliers_service import SpecialSuppliersService
special_svc = SpecialSuppliersService(self.config)
return special_svc.process_yang_biyue_only(file_path)
except Exception as e:
logger.warning(f"智能预处理识别失败: {e}")
return None
def get_purchase_orders(self) -> List[str]:
"""
获取采购单文件列表
Returns:
采购单文件路径列表
"""
return self.order_merger.get_purchase_orders()
def merge_purchase_orders(self, file_paths: List[str], progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]:
"""
合并指定的采购单文件
Args:
file_paths: 采购单文件路径列表
Returns:
合并后的采购单文件路径如果合并失败则返回None
"""
logger.info(f"OrderService开始合并指定采购单: {file_paths}")
return self.merge_orders(file_paths, progress_cb)
def merge_all_purchase_orders(self, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]:
"""
合并所有可用的采购单文件
Returns:
合并后的采购单文件路径如果合并失败则返回None
"""
logger.info("OrderService开始合并所有采购单")
return self.merge_orders(None, progress_cb)
def merge_orders(self, file_paths: Optional[List[str]] = None, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]:
"""
合并采购单
Args:
file_paths: 采购单文件路径列表如果为None则处理所有采购单
Returns:
合并后的采购单文件路径如果合并失败则返回None
"""
if file_paths:
logger.info(f"OrderService开始合并指定采购单: {file_paths}")
else:
logger.info("OrderService开始合并所有采购单")
return self.order_merger.process(file_paths, progress_cb)
def validate_unit_price(self, result_path: str) -> List[str]:
"""
校验采购单单价与商品资料进货价的差异
Args:
result_path: 待校验的采购单路径
Returns:
差异信息列表,无差异返回空列表
"""
try:
import pandas as pd
import os
from app.core.utils.file_utils import smart_read_excel
item_path = os.path.join('templates', '商品资料.xlsx')
if not os.path.exists(item_path):
logger.warning(f"未找到商品资料文件: {item_path}")
return []
df_item = smart_read_excel(item_path)
df_res = smart_read_excel(result_path)
def _find_col(df, candidates, contains=None):
cols = list(df.columns)
for c in candidates:
if c in cols:
return c
if contains:
for c in cols:
if contains in str(c):
return c
return None
item_barcode_col = _find_col(df_item, ['商品条码','商品条码(小条码)','条码','barcode'], contains='条码')
item_price_col = _find_col(df_item, ['进货价','进货价(必填)'], contains='进货价')
res_barcode_col = _find_col(df_res, ['条码','barcode'], contains='条码')
res_price_col = _find_col(df_res, ['采购单价','unit_price','单价'], contains='单价')
if not all([item_barcode_col, item_price_col, res_barcode_col, res_price_col]):
logger.warning("未能在文件和商品资料中找到完整的校验列(条码、单价)")
return []
item_map = df_item[[item_barcode_col, item_price_col]].dropna()
item_map[item_price_col] = pd.to_numeric(item_map[item_price_col], errors='coerce')
item_map = item_map.dropna()
imap = dict(zip(item_map[item_barcode_col].astype(str).str.strip(), item_map[item_price_col]))
df_res['_bc_'] = df_res[res_barcode_col].astype(str).str.strip()
df_res['_res_price_'] = pd.to_numeric(df_res[res_price_col], errors='coerce')
df_res['_item_price_'] = df_res['_bc_'].map(imap)
df_check = df_res.dropna(subset=['_res_price_','_item_price_'])
df_check['_diff_'] = (df_check['_res_price_'] - df_check['_item_price_']).abs()
bad = df_check[df_check['_diff_'] > 1.0]
results = []
if not bad.empty:
for i in range(len(bad)):
r = bad.iloc[i]
results.append(f"条码 {r['_bc_']}: 采购单价={r['_res_price_']} vs 进货价={r['_item_price_']} 差异={r['_diff_']:.2f}")
return results
except Exception as e:
logger.error(f"单价校验过程中发生错误: {e}")
return []