""" 订单服务模块 --------- 提供订单处理服务,协调Excel处理和订单合并流程。 """ from typing import Dict, List, Optional, Tuple, Union, Any, Callable from ..config.settings import ConfigManager from ..core.utils.log_utils import get_logger from ..core.excel.processor import ExcelProcessor from ..core.excel.merger import PurchaseOrderMerger logger = get_logger(__name__) class OrderService: """ 订单服务:协调Excel处理和订单合并流程 """ def __init__(self, config: Optional[ConfigManager] = None): """ 初始化订单服务 Args: config: 配置管理器,如果为None则创建新的 """ logger.info("初始化OrderService") self.config = config or ConfigManager() # 创建Excel处理器和采购单合并器 self.excel_processor = ExcelProcessor(self.config) self.order_merger = PurchaseOrderMerger(self.config) logger.info("OrderService初始化完成") def get_latest_excel(self) -> Optional[str]: """ 获取最新的Excel文件 Returns: 最新Excel文件路径,如果未找到则返回None """ return self.excel_processor.get_latest_excel() def process_excel(self, file_path: Optional[str] = None, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]: """ 处理Excel文件,生成采购单 Args: file_path: Excel文件路径,如果为None则处理最新的文件 Returns: 输出采购单文件路径,如果处理失败则返回None """ if file_path: logger.info(f"OrderService开始处理指定Excel文件: {file_path}") return self.excel_processor.process_specific_file(file_path, progress_cb=progress_cb) else: logger.info("OrderService开始处理最新Excel文件") return self.excel_processor.process_latest_file(progress_cb=progress_cb) def get_purchase_orders(self) -> List[str]: """ 获取采购单文件列表 Returns: 采购单文件路径列表 """ return self.order_merger.get_purchase_orders() def merge_purchase_orders(self, file_paths: List[str], progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]: """ 合并指定的采购单文件 Args: file_paths: 采购单文件路径列表 Returns: 合并后的采购单文件路径,如果合并失败则返回None """ logger.info(f"OrderService开始合并指定采购单: {file_paths}") return self.merge_orders(file_paths, progress_cb) def merge_all_purchase_orders(self, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]: """ 合并所有可用的采购单文件 Returns: 合并后的采购单文件路径,如果合并失败则返回None """ logger.info("OrderService开始合并所有采购单") return self.merge_orders(None, progress_cb) def merge_orders(self, file_paths: Optional[List[str]] = None, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]: """ 合并采购单 Args: file_paths: 采购单文件路径列表,如果为None则处理所有采购单 Returns: 合并后的采购单文件路径,如果合并失败则返回None """ if file_paths: logger.info(f"OrderService开始合并指定采购单: {file_paths}") else: logger.info("OrderService开始合并所有采购单") return self.order_merger.process(file_paths, progress_cb) def validate_unit_price(self, result_path: str) -> List[str]: """ 校验采购单单价与商品资料进货价的差异 Args: result_path: 待校验的采购单路径 Returns: 差异信息列表,无差异返回空列表 """ try: import pandas as pd import os from app.core.utils.file_utils import smart_read_excel item_path = os.path.join('templates', '商品资料.xlsx') if not os.path.exists(item_path): logger.warning(f"未找到商品资料文件: {item_path}") return [] df_item = smart_read_excel(item_path) df_res = smart_read_excel(result_path) def _find_col(df, candidates, contains=None): cols = list(df.columns) for c in candidates: if c in cols: return c if contains: for c in cols: if contains in str(c): return c return None item_barcode_col = _find_col(df_item, ['商品条码','商品条码(小条码)','条码','barcode'], contains='条码') item_price_col = _find_col(df_item, ['进货价','进货价(必填)'], contains='进货价') res_barcode_col = _find_col(df_res, ['条码','barcode'], contains='条码') res_price_col = _find_col(df_res, ['采购单价','unit_price','单价'], contains='单价') if not all([item_barcode_col, item_price_col, res_barcode_col, res_price_col]): logger.warning("未能在文件和商品资料中找到完整的校验列(条码、单价)") return [] item_map = df_item[[item_barcode_col, item_price_col]].dropna() item_map[item_price_col] = pd.to_numeric(item_map[item_price_col], errors='coerce') item_map = item_map.dropna() imap = dict(zip(item_map[item_barcode_col].astype(str).str.strip(), item_map[item_price_col])) df_res['_bc_'] = df_res[res_barcode_col].astype(str).str.strip() df_res['_res_price_'] = pd.to_numeric(df_res[res_price_col], errors='coerce') df_res['_item_price_'] = df_res['_bc_'].map(imap) df_check = df_res.dropna(subset=['_res_price_','_item_price_']) df_check['_diff_'] = (df_check['_res_price_'] - df_check['_item_price_']).abs() bad = df_check[df_check['_diff_'] > 1.0] results = [] if not bad.empty: for i in range(len(bad)): r = bad.iloc[i] results.append(f"条码 {r['_bc_']}: 采购单价={r['_res_price_']} vs 进货价={r['_item_price_']} 差异={r['_diff_']:.2f}") return results except Exception as e: logger.error(f"单价校验过程中发生错误: {e}") return []