224 lines
9.4 KiB
Python
224 lines
9.4 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import os
|
||
import re
|
||
import time
|
||
import pandas as pd
|
||
import logging
|
||
from typing import Optional, Callable
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class SpecialSuppliersService:
|
||
"""
|
||
处理特殊供应商逻辑的服务类,如蓉城易购等
|
||
"""
|
||
|
||
def __init__(self, config_manager=None):
|
||
self.config_manager = config_manager
|
||
|
||
def process_yang_biyue_only(self, src_path: str) -> Optional[str]:
|
||
"""
|
||
仅执行杨碧月订单的预处理,返回预处理后的文件路径
|
||
"""
|
||
try:
|
||
from app.core.utils.file_utils import smart_read_excel
|
||
# 读取原始数据
|
||
df = smart_read_excel(src_path)
|
||
|
||
# 检查是否包含“杨碧月”
|
||
handler_col = None
|
||
for col in df.columns:
|
||
if '经手人' in str(col):
|
||
handler_col = col
|
||
break
|
||
|
||
if handler_col is None or not df[handler_col].astype(str).str.contains('杨碧月').any():
|
||
return None
|
||
|
||
# 识别到杨碧月订单,执行专用清洗
|
||
logger.info("识别到杨碧月订单,正在执行专用清洗...")
|
||
|
||
# 定义列映射关系 (映射到 ExcelProcessor 期望的中文列名)
|
||
# 使用精确匹配优先,防止“结算单位”匹配到“单位”
|
||
column_map = {
|
||
'商品条码': '商品条码',
|
||
'商品名称': '商品名称',
|
||
'商品规格': '规格',
|
||
'单位': '单位',
|
||
'数量': '数量',
|
||
'含税单价': '单价',
|
||
'含税金额': '金额'
|
||
}
|
||
|
||
found_cols = {}
|
||
# 1. 第一遍:尝试精确匹配
|
||
for target_zh, std_name in column_map.items():
|
||
for col in df.columns:
|
||
if str(col).strip() == target_zh:
|
||
found_cols[col] = std_name
|
||
break
|
||
|
||
# 2. 第二遍:对未匹配成功的列尝试模糊匹配(但要排除特定干扰词)
|
||
for target_zh, std_name in column_map.items():
|
||
if std_name in found_cols.values():
|
||
continue
|
||
for col in df.columns:
|
||
col_str = str(col)
|
||
if target_zh in col_str:
|
||
# 排除干扰列
|
||
if target_zh == '单位' and '结算单位' in col_str:
|
||
continue
|
||
if target_zh == '数量' and '基本单位数量' in col_str:
|
||
continue
|
||
found_cols[col] = std_name
|
||
break
|
||
|
||
if len(found_cols) < 4:
|
||
logger.error(f"杨碧月订单列匹配不足: 找到 {list(found_cols.values())}")
|
||
return None
|
||
|
||
df_clean = df[list(found_cols.keys())].copy()
|
||
df_clean = df_clean.rename(columns=found_cols)
|
||
|
||
# 过滤掉空的条码行
|
||
df_clean = df_clean.dropna(subset=['商品条码'])
|
||
|
||
# 保存预处理文件
|
||
out_dir = os.path.dirname(src_path)
|
||
base = os.path.basename(src_path)
|
||
final_path = os.path.join(out_dir, f"预处理之后_{base}")
|
||
df_clean.to_excel(final_path, index=False)
|
||
|
||
return final_path
|
||
except Exception as e:
|
||
logger.error(f"预处理杨碧月订单出错: {e}")
|
||
return None
|
||
|
||
def process_yang_biyue(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]:
|
||
"""
|
||
处理杨碧月经手的订单(预处理+处理)
|
||
"""
|
||
try:
|
||
if progress_cb: progress_cb(10, "正在进行杨碧月订单预处理...")
|
||
preprocessed_path = self.process_yang_biyue_only(src_path)
|
||
|
||
if not preprocessed_path:
|
||
return None
|
||
|
||
if progress_cb: progress_cb(60, "预处理文件已保存,开始标准转换流程...")
|
||
|
||
# 延迟导入以避免循环依赖
|
||
from app.services.order_service import OrderService
|
||
order_service = OrderService(self.config_manager)
|
||
result = order_service.process_excel(preprocessed_path, progress_cb=lambda p: progress_cb(60 + int(p*0.4), "生成采购单中...") if progress_cb else None)
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理杨碧月订单出错: {e}")
|
||
return None
|
||
|
||
def preprocess_rongcheng_yigou(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]:
|
||
"""
|
||
蓉城易购订单预处理:按用户提供的 E, N, Q, S 列索引进行强制清洗
|
||
"""
|
||
try:
|
||
if progress_cb: progress_cb(10, "正在处理蓉城易购预处理...")
|
||
|
||
from app.core.utils.file_utils import smart_read_excel
|
||
# 蓉城易购格式:Row 0是单号,Row 1是联系人,Row 2是表头,Row 3开始是数据
|
||
df_raw = smart_read_excel(src_path, header=None)
|
||
|
||
# 检查数据行数
|
||
if len(df_raw) <= 3:
|
||
logger.error("蓉城易购文件数据行数不足")
|
||
return None
|
||
|
||
# 提取数据部分 (Row 3开始)
|
||
df_data = df_raw.iloc[3:].reset_index(drop=True)
|
||
|
||
# 用户指定列映射:
|
||
# E列 (Index 4) -> 商品条码
|
||
# N列 (Index 13) -> 数量
|
||
# Q列 (Index 16) -> 单价
|
||
# S列 (Index 18) -> 金额
|
||
# C列 (Index 2) -> 商品名称 (通用需求)
|
||
|
||
idx_map = {
|
||
2: '商品名称',
|
||
4: '商品条码',
|
||
13: '数量',
|
||
16: '单价',
|
||
18: '金额'
|
||
}
|
||
|
||
# 确保列索引不越界
|
||
available_indices = [i for i in idx_map.keys() if i < df_data.shape[1]]
|
||
df2 = df_data.iloc[:, available_indices].copy()
|
||
df2.columns = [idx_map[i] for i in available_indices]
|
||
|
||
# 强制转换类型
|
||
for c in ['数量', '单价', '金额']:
|
||
if c in df2.columns:
|
||
df2[c] = pd.to_numeric(df2[c], errors='coerce').fillna(0)
|
||
|
||
# 过滤掉空的条码行
|
||
df2 = df2.dropna(subset=['商品条码'])
|
||
df2['商品条码'] = df2['商品条码'].astype(str).str.strip()
|
||
df2 = df2[df2['商品条码'] != '']
|
||
|
||
# 核心逻辑:分裂多条码行并均分数量
|
||
if '商品条码' in df2.columns and '数量' in df2.columns:
|
||
rows = []
|
||
for _, row in df2.iterrows():
|
||
bc_val = str(row.get('商品条码', '')).strip()
|
||
# 识别分隔符:/ , , 、
|
||
if any(sep in bc_val for sep in ['/', ',', ',', '、']):
|
||
parts = re.split(r'[/,,、]+', bc_val)
|
||
parts = [p.strip() for p in parts if p.strip()]
|
||
|
||
if len(parts) >= 2:
|
||
q_total = float(row.get('数量', 0) or 0)
|
||
if q_total > 0:
|
||
n = len(parts)
|
||
base_qty = int(q_total // n)
|
||
remainder = int(q_total % n)
|
||
|
||
for i, p_bc in enumerate(parts):
|
||
new_row = row.copy()
|
||
new_row['商品条码'] = p_bc
|
||
current_qty = base_qty + (1 if i < remainder else 0)
|
||
new_row['数量'] = current_qty
|
||
if '单价' in new_row:
|
||
try:
|
||
up = float(new_row['单价'] or 0)
|
||
new_row['金额'] = up * current_qty
|
||
except: pass
|
||
rows.append(new_row)
|
||
continue
|
||
rows.append(row)
|
||
df2 = pd.DataFrame(rows)
|
||
|
||
# 保存预处理文件
|
||
out_dir = os.path.dirname(src_path)
|
||
base = os.path.basename(src_path)
|
||
final_path = os.path.join(out_dir, f"预处理之后_{base}")
|
||
df2.to_excel(final_path, index=False)
|
||
|
||
if progress_cb: progress_cb(100, "蓉城易购预处理完成")
|
||
return final_path
|
||
|
||
except Exception as e:
|
||
logger.error(f"预处理蓉城易购订单出错: {e}")
|
||
return None
|
||
|
||
def process_rongcheng_yigou(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]:
|
||
"""
|
||
兼容性方法:处理蓉城易购订单并执行后续转换
|
||
"""
|
||
cleaned_path = self.preprocess_rongcheng_yigou(src_path, progress_cb)
|
||
if cleaned_path:
|
||
return self.order_service.process_excel(cleaned_path, progress_cb=lambda p: progress_cb(60 + int(p*0.4), "生成采购单中...") if progress_cb else None)
|
||
return None
|