orc-order-v2/app/services/order_service.py

185 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
订单服务模块
---------
提供订单处理服务协调Excel处理和订单合并流程。
"""
from typing import Dict, List, Optional, Tuple, Union, Any, Callable
from ..config.settings import ConfigManager
from ..core.utils.log_utils import get_logger
from ..core.excel.processor import ExcelProcessor
from ..core.excel.merger import PurchaseOrderMerger
logger = get_logger(__name__)
class OrderService:
"""
订单服务协调Excel处理和订单合并流程
"""
def __init__(self, config: Optional[ConfigManager] = None):
"""
初始化订单服务
Args:
config: 配置管理器如果为None则创建新的
"""
logger.info("初始化OrderService")
self.config = config or ConfigManager()
# 创建Excel处理器和采购单合并器
self.excel_processor = ExcelProcessor(self.config)
self.order_merger = PurchaseOrderMerger(self.config)
logger.info("OrderService初始化完成")
def get_latest_excel(self) -> Optional[str]:
"""
获取最新的Excel文件
Returns:
最新Excel文件路径如果未找到则返回None
"""
return self.excel_processor.get_latest_excel()
def process_excel(self, file_path: Optional[str] = None, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]:
"""
处理Excel文件生成采购单
Args:
file_path: Excel文件路径如果为None则处理最新的文件
Returns:
输出采购单文件路径如果处理失败则返回None
"""
if file_path:
logger.info(f"OrderService开始处理指定Excel文件: {file_path}")
return self.excel_processor.process_specific_file(file_path, progress_cb=progress_cb)
else:
logger.info("OrderService开始处理最新Excel文件")
return self.excel_processor.process_latest_file(progress_cb=progress_cb)
def get_purchase_orders(self) -> List[str]:
"""
获取采购单文件列表
Returns:
采购单文件路径列表
"""
return self.order_merger.get_purchase_orders()
def merge_purchase_orders(self, file_paths: List[str], progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]:
"""
合并指定的采购单文件
Args:
file_paths: 采购单文件路径列表
Returns:
合并后的采购单文件路径如果合并失败则返回None
"""
logger.info(f"OrderService开始合并指定采购单: {file_paths}")
return self.merge_orders(file_paths, progress_cb)
def merge_all_purchase_orders(self, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]:
"""
合并所有可用的采购单文件
Returns:
合并后的采购单文件路径如果合并失败则返回None
"""
logger.info("OrderService开始合并所有采购单")
return self.merge_orders(None, progress_cb)
def merge_orders(self, file_paths: Optional[List[str]] = None, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]:
"""
合并采购单
Args:
file_paths: 采购单文件路径列表如果为None则处理所有采购单
Returns:
合并后的采购单文件路径如果合并失败则返回None
"""
if file_paths:
logger.info(f"OrderService开始合并指定采购单: {file_paths}")
else:
logger.info("OrderService开始合并所有采购单")
return self.order_merger.process(file_paths, progress_cb)
def validate_unit_price(self, result_path: str) -> List[str]:
"""
校验采购单单价与商品资料进货价的差异
Args:
result_path: 待校验的采购单路径
Returns:
差异信息列表,无差异返回空列表
"""
try:
import pandas as pd
import os
def _read_df(path):
ap = os.path.abspath(path)
if ap.lower().endswith('.xlsx'):
return pd.read_excel(ap, engine='openpyxl')
else:
return pd.read_excel(ap, engine='xlrd')
item_path = os.path.join('templates', '商品资料.xlsx')
if not os.path.exists(item_path):
logger.warning(f"未找到商品资料文件: {item_path}")
return []
df_item = _read_df(item_path)
df_res = _read_df(result_path)
def _find_col(df, candidates, contains=None):
cols = list(df.columns)
for c in candidates:
if c in cols:
return c
if contains:
for c in cols:
if contains in str(c):
return c
return None
item_barcode_col = _find_col(df_item, ['商品条码','商品条码(小条码)','条码','barcode'], contains='条码')
item_price_col = _find_col(df_item, ['进货价','进货价(必填)'], contains='进货价')
res_barcode_col = _find_col(df_res, ['条码','barcode'], contains='条码')
res_price_col = _find_col(df_res, ['采购单价','unit_price','单价'], contains='单价')
if not all([item_barcode_col, item_price_col, res_barcode_col, res_price_col]):
logger.warning("未能在文件和商品资料中找到完整的校验列(条码、单价)")
return []
item_map = df_item[[item_barcode_col, item_price_col]].dropna()
item_map[item_price_col] = pd.to_numeric(item_map[item_price_col], errors='coerce')
item_map = item_map.dropna()
imap = dict(zip(item_map[item_barcode_col].astype(str).str.strip(), item_map[item_price_col]))
df_res['_bc_'] = df_res[res_barcode_col].astype(str).str.strip()
df_res['_res_price_'] = pd.to_numeric(df_res[res_price_col], errors='coerce')
df_res['_item_price_'] = df_res['_bc_'].map(imap)
df_check = df_res.dropna(subset=['_res_price_','_item_price_'])
df_check['_diff_'] = (df_check['_res_price_'] - df_check['_item_price_']).abs()
bad = df_check[df_check['_diff_'] > 1.0]
results = []
if not bad.empty:
for i in range(len(bad)):
r = bad.iloc[i]
results.append(f"条码 {r['_bc_']}: 采购单价={r['_res_price_']} vs 进货价={r['_item_price_']} 差异={r['_diff_']:.2f}")
return results
except Exception as e:
logger.error(f"单价校验过程中发生错误: {e}")
return []