""" 订单服务模块 --------- 提供订单处理服务,协调Excel处理和订单合并流程。 """ import os from typing import Dict, List, Optional, Tuple, Union, Any, Callable from ..config.settings import ConfigManager from ..core.utils.log_utils import get_logger from ..core.excel.processor import ExcelProcessor from ..core.excel.merger import PurchaseOrderMerger logger = get_logger(__name__) class OrderService: """ 订单服务:协调Excel处理和订单合并流程 """ def __init__(self, config: Optional[ConfigManager] = None): """ 初始化订单服务 Args: config: 配置管理器,如果为None则创建新的 """ logger.info("初始化OrderService") self.config = config or ConfigManager() # 创建Excel处理器和采购单合并器 self.excel_processor = ExcelProcessor(self.config) self.order_merger = PurchaseOrderMerger(self.config) logger.info("OrderService初始化完成") def get_latest_excel(self) -> Optional[str]: """ 获取最新的Excel文件 Returns: 最新Excel文件路径,如果未找到则返回None """ return self.excel_processor.get_latest_excel() def process_excel(self, file_path: Optional[str] = None, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]: """ 处理Excel订单文件,生成标准采购单 Args: file_path: Excel文件路径,如果为None则处理最新的文件 Returns: 输出采购单文件路径,如果处理失败则返回None """ if not file_path: file_path = self.excel_processor.get_latest_excel() if not file_path: logger.warning("未找到可处理的Excel文件") return None logger.info("OrderService开始处理最新Excel文件") else: logger.info(f"OrderService开始处理指定Excel文件: {file_path}") # 检查是否需要特殊的供应商预处理(如杨碧月) try: from .special_suppliers_service import SpecialSuppliersService special_service = SpecialSuppliersService(self.config) # 尝试识别并预处理(注意:这里不再传入 progress_cb 避免无限递归或重复进度条, # 或者我们在 special_service 内部逻辑中处理完后直接返回结果) # 为了避免循环调用,我们在 SpecialSuppliersService 内部不再调用 process_excel, # 而是让 process_excel 识别后自己决定是否处理预处理后的文件。 # 我们新增一个 check_and_preprocess 方法 preprocessed_path = self._check_special_preprocess(file_path) if preprocessed_path: logger.info(f"检测到特殊供应商,已生成预处理文件: {preprocessed_path}") file_path = preprocessed_path except Exception as e: logger.error(f"检查特殊预处理时出错: {e}") return self.excel_processor.process_specific_file(file_path, progress_cb=progress_cb) def _check_special_preprocess(self, file_path: str) -> Optional[str]: """检查并执行特殊的预处理(支持杨碧月、烟草公司、蓉城易购)""" try: from app.core.utils.file_utils import smart_read_excel import pandas as pd import re # 仅读取前 50 行进行智能识别 df_head = smart_read_excel(file_path, nrows=50) df_str = df_head.astype(str) # 1. 识别:烟草公司 (Tobacco) # 特征:内容中包含“专卖证号”或特定证号“510109104938” is_tobacco = df_str.apply(lambda x: x.str.contains('专卖证号|510109104938')).any().any() if is_tobacco: logger.info("识别到烟草公司订单,执行专用预处理...") from .tobacco_service import TobaccoService tobacco_svc = TobaccoService(self.config) return tobacco_svc.process_tobacco_order(file_path) # 2. 识别:蓉城易购 (Rongcheng Yigou) # 特征:内容中包含单号标识“RCDH” is_rongcheng = df_str.apply(lambda x: x.str.contains('RCDH')).any().any() if is_rongcheng: logger.info("识别到蓉城易购订单,执行专用预处理...") from .special_suppliers_service import SpecialSuppliersService special_svc = SpecialSuppliersService(self.config) return special_svc.process_rongcheng_yigou(file_path) # 3. 识别:杨碧月 (Yang Biyue) handler_col = None for col in df_head.columns: if '经手人' in str(col): handler_col = col break if handler_col is not None and df_head[handler_col].astype(str).str.contains('杨碧月').any(): logger.info("识别到杨碧月订单,执行通用预处理...") df = smart_read_excel(file_path) column_map = { '商品条码': '商品条码', '商品名称': '商品名称', '规格': '规格', '单位': '单位', '数量': '数量', '单价': '单价', '金额': '金额' } found_cols = {} for target_zh, std_name in column_map.items(): for col in df.columns: col_str = str(col) if target_zh == col_str: found_cols[col] = std_name; break if std_name not in found_cols.values(): for col in df.columns: if target_zh in str(col): found_cols[col] = std_name; break if len(found_cols) >= 4: df_clean = df[list(found_cols.keys())].copy() df_clean = df_clean.rename(columns=found_cols) for c in ['数量', '单价', '金额']: if c in df_clean.columns: df_clean[c] = pd.to_numeric(df_clean[c], errors='coerce').fillna(0) df_clean = df_clean.dropna(subset=['商品条码']) out_dir = os.path.dirname(file_path) final_path = os.path.join(out_dir, "预处理之后.xlsx") df_clean.to_excel(final_path, index=False) return final_path except Exception as e: logger.warning(f"智能预处理识别失败: {e}") return None def get_purchase_orders(self) -> List[str]: """ 获取采购单文件列表 Returns: 采购单文件路径列表 """ return self.order_merger.get_purchase_orders() def merge_purchase_orders(self, file_paths: List[str], progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]: """ 合并指定的采购单文件 Args: file_paths: 采购单文件路径列表 Returns: 合并后的采购单文件路径,如果合并失败则返回None """ logger.info(f"OrderService开始合并指定采购单: {file_paths}") return self.merge_orders(file_paths, progress_cb) def merge_all_purchase_orders(self, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]: """ 合并所有可用的采购单文件 Returns: 合并后的采购单文件路径,如果合并失败则返回None """ logger.info("OrderService开始合并所有采购单") return self.merge_orders(None, progress_cb) def merge_orders(self, file_paths: Optional[List[str]] = None, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]: """ 合并采购单 Args: file_paths: 采购单文件路径列表,如果为None则处理所有采购单 Returns: 合并后的采购单文件路径,如果合并失败则返回None """ if file_paths: logger.info(f"OrderService开始合并指定采购单: {file_paths}") else: logger.info("OrderService开始合并所有采购单") return self.order_merger.process(file_paths, progress_cb) def validate_unit_price(self, result_path: str) -> List[str]: """ 校验采购单单价与商品资料进货价的差异 Args: result_path: 待校验的采购单路径 Returns: 差异信息列表,无差异返回空列表 """ try: import pandas as pd import os from app.core.utils.file_utils import smart_read_excel item_path = os.path.join('templates', '商品资料.xlsx') if not os.path.exists(item_path): logger.warning(f"未找到商品资料文件: {item_path}") return [] df_item = smart_read_excel(item_path) df_res = smart_read_excel(result_path) def _find_col(df, candidates, contains=None): cols = list(df.columns) for c in candidates: if c in cols: return c if contains: for c in cols: if contains in str(c): return c return None item_barcode_col = _find_col(df_item, ['商品条码','商品条码(小条码)','条码','barcode'], contains='条码') item_price_col = _find_col(df_item, ['进货价','进货价(必填)'], contains='进货价') res_barcode_col = _find_col(df_res, ['条码','barcode'], contains='条码') res_price_col = _find_col(df_res, ['采购单价','unit_price','单价'], contains='单价') if not all([item_barcode_col, item_price_col, res_barcode_col, res_price_col]): logger.warning("未能在文件和商品资料中找到完整的校验列(条码、单价)") return [] item_map = df_item[[item_barcode_col, item_price_col]].dropna() item_map[item_price_col] = pd.to_numeric(item_map[item_price_col], errors='coerce') item_map = item_map.dropna() imap = dict(zip(item_map[item_barcode_col].astype(str).str.strip(), item_map[item_price_col])) df_res['_bc_'] = df_res[res_barcode_col].astype(str).str.strip() df_res['_res_price_'] = pd.to_numeric(df_res[res_price_col], errors='coerce') df_res['_item_price_'] = df_res['_bc_'].map(imap) df_check = df_res.dropna(subset=['_res_price_','_item_price_']) df_check['_diff_'] = (df_check['_res_price_'] - df_check['_item_price_']).abs() bad = df_check[df_check['_diff_'] > 1.0] results = [] if not bad.empty: for i in range(len(bad)): r = bad.iloc[i] results.append(f"条码 {r['_bc_']}: 采购单价={r['_res_price_']} vs 进货价={r['_item_price_']} 差异={r['_diff_']:.2f}") return results except Exception as e: logger.error(f"单价校验过程中发生错误: {e}") return []