orc-order-v2/app/services/special_suppliers_service.py

224 lines
9.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import re
import time
import pandas as pd
import logging
from typing import Optional, Callable
logger = logging.getLogger(__name__)
class SpecialSuppliersService:
"""
处理特殊供应商逻辑的服务类,如蓉城易购等
"""
def __init__(self, config_manager=None):
self.config_manager = config_manager
def process_yang_biyue_only(self, src_path: str) -> Optional[str]:
"""
仅执行杨碧月订单的预处理,返回预处理后的文件路径
"""
try:
from app.core.utils.file_utils import smart_read_excel
# 读取原始数据
df = smart_read_excel(src_path)
# 检查是否包含“杨碧月”
handler_col = None
for col in df.columns:
if '经手人' in str(col):
handler_col = col
break
if handler_col is None or not df[handler_col].astype(str).str.contains('杨碧月').any():
return None
# 识别到杨碧月订单,执行专用清洗
logger.info("识别到杨碧月订单,正在执行专用清洗...")
# 定义列映射关系 (映射到 ExcelProcessor 期望的中文列名)
# 使用精确匹配优先,防止“结算单位”匹配到“单位”
column_map = {
'商品条码': '商品条码',
'商品名称': '商品名称',
'商品规格': '规格',
'单位': '单位',
'数量': '数量',
'含税单价': '单价',
'含税金额': '金额'
}
found_cols = {}
# 1. 第一遍:尝试精确匹配
for target_zh, std_name in column_map.items():
for col in df.columns:
if str(col).strip() == target_zh:
found_cols[col] = std_name
break
# 2. 第二遍:对未匹配成功的列尝试模糊匹配(但要排除特定干扰词)
for target_zh, std_name in column_map.items():
if std_name in found_cols.values():
continue
for col in df.columns:
col_str = str(col)
if target_zh in col_str:
# 排除干扰列
if target_zh == '单位' and '结算单位' in col_str:
continue
if target_zh == '数量' and '基本单位数量' in col_str:
continue
found_cols[col] = std_name
break
if len(found_cols) < 4:
logger.error(f"杨碧月订单列匹配不足: 找到 {list(found_cols.values())}")
return None
df_clean = df[list(found_cols.keys())].copy()
df_clean = df_clean.rename(columns=found_cols)
# 过滤掉空的条码行
df_clean = df_clean.dropna(subset=['商品条码'])
# 保存预处理文件
out_dir = os.path.dirname(src_path)
base = os.path.basename(src_path)
final_path = os.path.join(out_dir, f"预处理之后_{base}")
df_clean.to_excel(final_path, index=False)
return final_path
except Exception as e:
logger.error(f"预处理杨碧月订单出错: {e}")
return None
def process_yang_biyue(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]:
"""
处理杨碧月经手的订单(预处理+处理)
"""
try:
if progress_cb: progress_cb(10, "正在进行杨碧月订单预处理...")
preprocessed_path = self.process_yang_biyue_only(src_path)
if not preprocessed_path:
return None
if progress_cb: progress_cb(60, "预处理文件已保存,开始标准转换流程...")
# 延迟导入以避免循环依赖
from app.services.order_service import OrderService
order_service = OrderService(self.config_manager)
result = order_service.process_excel(preprocessed_path, progress_cb=lambda p: progress_cb(60 + int(p*0.4), "生成采购单中...") if progress_cb else None)
return result
except Exception as e:
logger.error(f"处理杨碧月订单出错: {e}")
return None
def preprocess_rongcheng_yigou(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]:
"""
蓉城易购订单预处理:按用户提供的 E, N, Q, S 列索引进行强制清洗
"""
try:
if progress_cb: progress_cb(10, "正在处理蓉城易购预处理...")
from app.core.utils.file_utils import smart_read_excel
# 蓉城易购格式Row 0是单号Row 1是联系人Row 2是表头Row 3开始是数据
df_raw = smart_read_excel(src_path, header=None)
# 检查数据行数
if len(df_raw) <= 3:
logger.error("蓉城易购文件数据行数不足")
return None
# 提取数据部分 (Row 3开始)
df_data = df_raw.iloc[3:].reset_index(drop=True)
# 用户指定列映射:
# E列 (Index 4) -> 商品条码
# N列 (Index 13) -> 数量
# Q列 (Index 16) -> 单价
# S列 (Index 18) -> 金额
# C列 (Index 2) -> 商品名称 (通用需求)
idx_map = {
2: '商品名称',
4: '商品条码',
13: '数量',
16: '单价',
18: '金额'
}
# 确保列索引不越界
available_indices = [i for i in idx_map.keys() if i < df_data.shape[1]]
df2 = df_data.iloc[:, available_indices].copy()
df2.columns = [idx_map[i] for i in available_indices]
# 强制转换类型
for c in ['数量', '单价', '金额']:
if c in df2.columns:
df2[c] = pd.to_numeric(df2[c], errors='coerce').fillna(0)
# 过滤掉空的条码行
df2 = df2.dropna(subset=['商品条码'])
df2['商品条码'] = df2['商品条码'].astype(str).str.strip()
df2 = df2[df2['商品条码'] != '']
# 核心逻辑:分裂多条码行并均分数量
if '商品条码' in df2.columns and '数量' in df2.columns:
rows = []
for _, row in df2.iterrows():
bc_val = str(row.get('商品条码', '')).strip()
# 识别分隔符:/ ,
if any(sep in bc_val for sep in ['/', ',', '', '']):
parts = re.split(r'[/,,、]+', bc_val)
parts = [p.strip() for p in parts if p.strip()]
if len(parts) >= 2:
q_total = float(row.get('数量', 0) or 0)
if q_total > 0:
n = len(parts)
base_qty = int(q_total // n)
remainder = int(q_total % n)
for i, p_bc in enumerate(parts):
new_row = row.copy()
new_row['商品条码'] = p_bc
current_qty = base_qty + (1 if i < remainder else 0)
new_row['数量'] = current_qty
if '单价' in new_row:
try:
up = float(new_row['单价'] or 0)
new_row['金额'] = up * current_qty
except: pass
rows.append(new_row)
continue
rows.append(row)
df2 = pd.DataFrame(rows)
# 保存预处理文件
out_dir = os.path.dirname(src_path)
base = os.path.basename(src_path)
final_path = os.path.join(out_dir, f"预处理之后_{base}")
df2.to_excel(final_path, index=False)
if progress_cb: progress_cb(100, "蓉城易购预处理完成")
return final_path
except Exception as e:
logger.error(f"预处理蓉城易购订单出错: {e}")
return None
def process_rongcheng_yigou(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]:
"""
兼容性方法:处理蓉城易购订单并执行后续转换
"""
cleaned_path = self.preprocess_rongcheng_yigou(src_path, progress_cb)
if cleaned_path:
return self.order_service.process_excel(cleaned_path, progress_cb=lambda p: progress_cb(60 + int(p*0.4), "生成采购单中...") if progress_cb else None)
return None