#!/usr/bin/env python # -*- coding: utf-8 -*- import os import re import time import pandas as pd import logging from typing import Optional, Callable logger = logging.getLogger(__name__) class SpecialSuppliersService: """ 处理特殊供应商逻辑的服务类,如蓉城易购等 """ def __init__(self, config_manager=None): self.config_manager = config_manager def process_yang_biyue_only(self, src_path: str) -> Optional[str]: """ 仅执行杨碧月订单的预处理,返回预处理后的文件路径 """ try: from app.core.utils.file_utils import smart_read_excel # 读取原始数据 df = smart_read_excel(src_path) # 检查是否包含“杨碧月” handler_col = None for col in df.columns: if '经手人' in str(col): handler_col = col break if handler_col is None or not df[handler_col].astype(str).str.contains('杨碧月').any(): return None # 识别到杨碧月订单,执行专用清洗 logger.info("识别到杨碧月订单,正在执行专用清洗...") # 定义列映射关系 (映射到 ExcelProcessor 期望的中文列名) # 使用精确匹配优先,防止“结算单位”匹配到“单位” column_map = { '商品条码': '商品条码', '商品名称': '商品名称', '商品规格': '规格', '单位': '单位', '数量': '数量', '含税单价': '单价', '含税金额': '金额' } found_cols = {} # 1. 第一遍:尝试精确匹配 for target_zh, std_name in column_map.items(): for col in df.columns: if str(col).strip() == target_zh: found_cols[col] = std_name break # 2. 第二遍:对未匹配成功的列尝试模糊匹配(但要排除特定干扰词) for target_zh, std_name in column_map.items(): if std_name in found_cols.values(): continue for col in df.columns: col_str = str(col) if target_zh in col_str: # 排除干扰列 if target_zh == '单位' and '结算单位' in col_str: continue if target_zh == '数量' and '基本单位数量' in col_str: continue found_cols[col] = std_name break if len(found_cols) < 4: logger.error(f"杨碧月订单列匹配不足: 找到 {list(found_cols.values())}") return None df_clean = df[list(found_cols.keys())].copy() df_clean = df_clean.rename(columns=found_cols) # 过滤掉空的条码行 df_clean = df_clean.dropna(subset=['商品条码']) # 保存预处理文件 out_dir = os.path.dirname(src_path) base = os.path.basename(src_path) final_path = os.path.join(out_dir, f"预处理之后_{base}") df_clean.to_excel(final_path, index=False) return final_path except Exception as e: logger.error(f"预处理杨碧月订单出错: {e}") return None def process_yang_biyue(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]: """ 处理杨碧月经手的订单(预处理+处理) """ try: if progress_cb: progress_cb(10, "正在进行杨碧月订单预处理...") preprocessed_path = self.process_yang_biyue_only(src_path) if not preprocessed_path: return None if progress_cb: progress_cb(60, "预处理文件已保存,开始标准转换流程...") # 延迟导入以避免循环依赖 from app.services.order_service import OrderService order_service = OrderService(self.config_manager) result = order_service.process_excel(preprocessed_path, progress_cb=lambda p: progress_cb(60 + int(p*0.4), "生成采购单中...") if progress_cb else None) return result except Exception as e: logger.error(f"处理杨碧月订单出错: {e}") return None def preprocess_rongcheng_yigou(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]: """ 蓉城易购订单预处理:按用户提供的 E, N, Q, S 列索引进行强制清洗 """ try: if progress_cb: progress_cb(10, "正在处理蓉城易购预处理...") from app.core.utils.file_utils import smart_read_excel # 蓉城易购格式:Row 0是单号,Row 1是联系人,Row 2是表头,Row 3开始是数据 df_raw = smart_read_excel(src_path, header=None) # 检查数据行数 if len(df_raw) <= 3: logger.error("蓉城易购文件数据行数不足") return None # 提取数据部分 (Row 3开始) df_data = df_raw.iloc[3:].reset_index(drop=True) # 用户指定列映射: # E列 (Index 4) -> 商品条码 # N列 (Index 13) -> 数量 # Q列 (Index 16) -> 单价 # S列 (Index 18) -> 金额 # C列 (Index 2) -> 商品名称 (通用需求) idx_map = { 2: '商品名称', 4: '商品条码', 13: '数量', 16: '单价', 18: '金额' } # 确保列索引不越界 available_indices = [i for i in idx_map.keys() if i < df_data.shape[1]] df2 = df_data.iloc[:, available_indices].copy() df2.columns = [idx_map[i] for i in available_indices] # 强制转换类型 for c in ['数量', '单价', '金额']: if c in df2.columns: df2[c] = pd.to_numeric(df2[c], errors='coerce').fillna(0) # 过滤掉空的条码行 df2 = df2.dropna(subset=['商品条码']) df2['商品条码'] = df2['商品条码'].astype(str).str.strip() df2 = df2[df2['商品条码'] != ''] # 核心逻辑:分裂多条码行并均分数量 if '商品条码' in df2.columns and '数量' in df2.columns: rows = [] for _, row in df2.iterrows(): bc_val = str(row.get('商品条码', '')).strip() # 识别分隔符:/ , , 、 if any(sep in bc_val for sep in ['/', ',', ',', '、']): parts = re.split(r'[/,,、]+', bc_val) parts = [p.strip() for p in parts if p.strip()] if len(parts) >= 2: q_total = float(row.get('数量', 0) or 0) if q_total > 0: n = len(parts) base_qty = int(q_total // n) remainder = int(q_total % n) for i, p_bc in enumerate(parts): new_row = row.copy() new_row['商品条码'] = p_bc current_qty = base_qty + (1 if i < remainder else 0) new_row['数量'] = current_qty if '单价' in new_row: try: up = float(new_row['单价'] or 0) new_row['金额'] = up * current_qty except: pass rows.append(new_row) continue rows.append(row) df2 = pd.DataFrame(rows) # 保存预处理文件 out_dir = os.path.dirname(src_path) base = os.path.basename(src_path) final_path = os.path.join(out_dir, f"预处理之后_{base}") df2.to_excel(final_path, index=False) if progress_cb: progress_cb(100, "蓉城易购预处理完成") return final_path except Exception as e: logger.error(f"预处理蓉城易购订单出错: {e}") return None def process_rongcheng_yigou(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]: """ 兼容性方法:处理蓉城易购订单并执行后续转换 """ cleaned_path = self.preprocess_rongcheng_yigou(src_path, progress_cb) if cleaned_path: return self.order_service.process_excel(cleaned_path, progress_cb=lambda p: progress_cb(60 + int(p*0.4), "生成采购单中...") if progress_cb else None) return None