orc-order-v2/app/services/special_suppliers_service.py
houhuan 708402c7fb feat(订单处理): 添加杨碧月订单预处理功能
在特殊供应商服务中添加 process_yang_biyue 方法,用于处理经手人为"杨碧月"的订单。该方法能够自动识别相关列并进行数据清洗,生成标准格式的预处理文件。

同时优化订单服务的处理流程,在 process_excel 方法中集成特殊供应商预处理检查,通过 _check_special_preprocess 方法识别杨碧月订单并执行列映射转换,确保数据能够被后续标准流程正确处理。
2026-03-30 13:34:30 +08:00

214 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import re
import time
import pandas as pd
import logging
from typing import Optional, Callable
from app.services.order_service import OrderService
logger = logging.getLogger(__name__)
class SpecialSuppliersService:
"""
处理特殊供应商逻辑的服务类,如蓉城易购等
"""
def __init__(self, config_manager=None):
self.config_manager = config_manager
self.order_service = OrderService(config_manager)
def process_yang_biyue(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]:
"""
处理杨碧月经手的订单(预处理)
"""
try:
if progress_cb: progress_cb(10, "正在进行杨碧月订单预处理...")
from app.core.utils.file_utils import smart_read_excel
# 读取原始数据
df = smart_read_excel(src_path)
# 检查是否包含“杨碧月”
handler_col = None
for col in df.columns:
if '经手人' in str(col):
handler_col = col
break
if handler_col is None or not df[handler_col].astype(str).str.contains('杨碧月').any():
logger.info("未在订单中找到经手人'杨碧月',跳过特殊预处理")
return None
if progress_cb: progress_cb(30, "识别到杨碧月订单,正在清洗列数据...")
# 定义列映射关系
column_map = {
'商品条码': 'barcode',
'商品名称': 'name',
'商品规格': 'specification',
'单位': 'unit',
'数量': 'quantity',
'含税单价': 'unit_price',
'含税金额': 'total_price'
}
# 提取并重命名列
found_cols = {}
for target_zh, std_name in column_map.items():
for col in df.columns:
if target_zh in str(col):
found_cols[col] = std_name
break
if len(found_cols) < 4:
logger.error(f"杨碧月订单列匹配不足: 找到 {list(found_cols.values())}")
return None
df_clean = df[list(found_cols.keys())].copy()
df_clean = df_clean.rename(columns=found_cols)
# 过滤掉空的条码行
df_clean = df_clean.dropna(subset=['barcode'])
# 保存预处理文件
out_dir = os.path.dirname(src_path)
final_path = os.path.join(out_dir, "预处理之后.xlsx")
df_clean.to_excel(final_path, index=False)
if progress_cb: progress_cb(60, "预处理文件已保存,开始标准转换流程...")
# 调用标准处理流程
result = self.order_service.process_excel(final_path, progress_cb=lambda p: progress_cb(60 + int(p*0.4), "生成采购单中...") if progress_cb else None)
return result
except Exception as e:
logger.error(f"处理杨碧月订单出错: {e}")
return None
def process_rongcheng_yigou(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]:
"""
处理蓉城易购订单
"""
try:
if progress_cb: progress_cb(10, "正在处理蓉城易购...")
def _pick_col(df, exact_list=None, contains_list=None):
cols = list(df.columns)
if exact_list:
for name in exact_list:
for c in cols:
if str(c).strip() == str(name).strip():
return c
if contains_list:
for kw in contains_list:
for c in cols:
if kw in str(c):
return c
return None
from app.core.utils.file_utils import smart_read_excel
try:
df_raw = smart_read_excel(src_path, header=2)
except Exception:
df_raw = smart_read_excel(src_path)
df_raw = df_raw.iloc[2:].reset_index(drop=True)
# 去除全空列与行
df_raw = df_raw.dropna(how='all', axis=1).dropna(how='all', axis=0)
# 选择关键列
col_no = _pick_col(df_raw, contains_list=['序号'])
col_name = _pick_col(df_raw, contains_list=['商品名称','品名','名称'])
col_bc = _pick_col(df_raw, contains_list=['商品条码','条码'])
col_unit = _pick_col(df_raw, exact_list=['单位(订购单位)'], contains_list=['订购单位','小单位','单位'])
col_qty = _pick_col(df_raw, contains_list=['订购数量','订货数量','数量'])
col_price= _pick_col(df_raw, exact_list=['优惠后金额(小单位)'], contains_list=['单价','销售价','进货价','优惠后金额'])
col_amt = _pick_col(df_raw, exact_list=['出库小计(元)'], contains_list=['金额','优惠后金额','小计','合计','出库小计'])
selected = [c for c in [col_no,col_name,col_bc,col_unit,col_qty,col_price,col_amt] if c]
if not selected or len(selected) < 4:
df = pd.read_excel(src_path)
df = df.iloc[2:].reset_index(drop=True)
keep_idx = [0, 2, 3, 9, 12, 15, 17]
keep_idx = [i for i in keep_idx if i < df.shape[1]]
df2 = df.iloc[:, keep_idx].copy()
target_cols = ['序号','商品名称','商品条码','单位','数量','单价','金额']
df2.columns = target_cols[:len(df2.columns)]
else:
df2 = df_raw[selected].copy()
rename_map = {}
if col_no: rename_map[col_no] = '序号'
if col_name: rename_map[col_name] = '商品名称'
if col_bc: rename_map[col_bc] = '商品条码(小条码)'
if col_unit: rename_map[col_unit] = '单位'
if col_qty: rename_map[col_qty] = '订购数量(小单位)'
if col_price: rename_map[col_price] = '单价(小单位)'
if col_amt: rename_map[col_amt] = '优惠后金额(小单位)'
df2 = df2.rename(columns=rename_map)
if '单位' in df2.columns:
df2['单位'] = df2['单位'].astype(str).str.strip().replace({'':''})
# 分裂多条码行并均分数量
bc_col = '商品条码(小条码)' if '商品条码(小条码)' in df2.columns else ('商品条码' if '商品条码' in df2.columns else ('条码' if '条码' in df2.columns else None))
qty_col = '订购数量(小单位)' if '订购数量(小单位)' in df2.columns else ('订购数量' if '订购数量' in df2.columns else ('数量' if '数量' in df2.columns else None))
up_col = '单价(小单位)' if '单价(小单位)' in df2.columns else ('单价' if '单价' in df2.columns else ('销售价' if '销售价' in df2.columns else None))
amt_col = '优惠后金额(小单位)' if '优惠后金额(小单位)' in df2.columns else ('金额' if '金额' in df2.columns else ('小计' if '小计' in df2.columns else None))
if bc_col and qty_col:
rows = []
for _, row in df2.iterrows():
bc_val = str(row.get(bc_col, '')).strip()
if bc_val and any(sep in bc_val for sep in [',','','','/',' ']):
parts = []
temp_bc = bc_val
for sep in [',','','','/',' ']:
temp_bc = temp_bc.replace(sep, ' ')
for token in temp_bc.split():
tok = ''.join([ch for ch in token if ch.isdigit()])
if tok: parts.append(tok)
parts = [p for p in parts if p]
if len(parts) >= 2:
try:
q_total = float(row.get(qty_col, 0) or 0)
except Exception:
q_total = 0
if q_total > 0:
n = len(parts)
base = int(q_total) // n if q_total.is_integer() else q_total / n
remainder = int(q_total) % n if q_total.is_integer() else 0
for i, bc in enumerate(parts):
new_row = row.copy()
new_row[bc_col] = bc
q_each = base + (1 if remainder > 0 and i < remainder else 0)
new_row[qty_col] = q_each
if up_col and amt_col:
try:
upv = float(new_row.get(up_col, 0) or 0)
new_row[amt_col] = upv * float(q_each)
except Exception: pass
rows.append(new_row)
else: rows.append(row)
else: rows.append(row)
else: rows.append(row)
df2 = pd.DataFrame(rows)
out_dir = os.path.dirname(src_path)
base = os.path.basename(src_path)
final_name = f"蓉城易购预处理-{base}"
final_path = os.path.join(out_dir, final_name)
df2.to_excel(final_path, index=False)
if progress_cb: progress_cb(60, "预处理完成,开始标准流程...")
result = self.order_service.process_excel(final_path, progress_cb=lambda p: progress_cb(60 + int(p*0.4), "Excel处理中...") if progress_cb else None)
return result
except Exception as e:
logger.error(f"处理蓉城易购订单出错: {e}")
return None