feat: 实现智能订单识别与自动预处理路由
- 新增智能识别功能,自动检测蓉城易购、烟草公司、杨碧月订单特征 - 修改订单服务流程,在Excel处理前自动执行专用预处理 - 更新无界面API,支持智能识别模式,简化OpenClaw集成 - 完善供应商专用预处理逻辑,修复数量计算和单位换算问题 - 添加变更日志和最终更新报告文档,记录v2.1版本变更
This commit is contained in:
@@ -90,8 +90,8 @@ class OrderService:
|
||||
import pandas as pd
|
||||
import re
|
||||
|
||||
# 仅读取前 50 行进行智能识别
|
||||
df_head = smart_read_excel(file_path, nrows=50)
|
||||
# 仅读取前 50 行进行智能识别 (header=None 确保能读到第一行内容)
|
||||
df_head = smart_read_excel(file_path, nrows=50, header=None)
|
||||
df_str = df_head.astype(str)
|
||||
|
||||
# 1. 识别:烟草公司 (Tobacco)
|
||||
@@ -101,7 +101,7 @@ class OrderService:
|
||||
logger.info("识别到烟草公司订单,执行专用预处理...")
|
||||
from .tobacco_service import TobaccoService
|
||||
tobacco_svc = TobaccoService(self.config)
|
||||
return tobacco_svc.process_tobacco_order(file_path)
|
||||
return tobacco_svc.preprocess_tobacco_order(file_path)
|
||||
|
||||
# 2. 识别:蓉城易购 (Rongcheng Yigou)
|
||||
# 特征:内容中包含单号标识“RCDH”
|
||||
@@ -110,42 +110,14 @@ class OrderService:
|
||||
logger.info("识别到蓉城易购订单,执行专用预处理...")
|
||||
from .special_suppliers_service import SpecialSuppliersService
|
||||
special_svc = SpecialSuppliersService(self.config)
|
||||
return special_svc.process_rongcheng_yigou(file_path)
|
||||
return special_svc.preprocess_rongcheng_yigou(file_path)
|
||||
|
||||
# 3. 识别:杨碧月 (Yang Biyue)
|
||||
handler_col = None
|
||||
for col in df_head.columns:
|
||||
if '经手人' in str(col):
|
||||
handler_col = col
|
||||
break
|
||||
|
||||
if handler_col is not None and df_head[handler_col].astype(str).str.contains('杨碧月').any():
|
||||
logger.info("识别到杨碧月订单,执行通用预处理...")
|
||||
df = smart_read_excel(file_path)
|
||||
column_map = {
|
||||
'商品条码': '商品条码', '商品名称': '商品名称', '规格': '规格',
|
||||
'单位': '单位', '数量': '数量', '单价': '单价', '金额': '金额'
|
||||
}
|
||||
found_cols = {}
|
||||
for target_zh, std_name in column_map.items():
|
||||
for col in df.columns:
|
||||
col_str = str(col)
|
||||
if target_zh == col_str: found_cols[col] = std_name; break
|
||||
if std_name not in found_cols.values():
|
||||
for col in df.columns:
|
||||
if target_zh in str(col): found_cols[col] = std_name; break
|
||||
|
||||
if len(found_cols) >= 4:
|
||||
df_clean = df[list(found_cols.keys())].copy()
|
||||
df_clean = df_clean.rename(columns=found_cols)
|
||||
for c in ['数量', '单价', '金额']:
|
||||
if c in df_clean.columns:
|
||||
df_clean[c] = pd.to_numeric(df_clean[c], errors='coerce').fillna(0)
|
||||
df_clean = df_clean.dropna(subset=['商品条码'])
|
||||
out_dir = os.path.dirname(file_path)
|
||||
final_path = os.path.join(out_dir, "预处理之后.xlsx")
|
||||
df_clean.to_excel(final_path, index=False)
|
||||
return final_path
|
||||
from .special_suppliers_service import SpecialSuppliersService
|
||||
special_svc = SpecialSuppliersService(self.config)
|
||||
# 我们直接复用 SpecialSuppliersService 里的逻辑,但要确保它只返回路径
|
||||
# 修改 SpecialSuppliersService.process_yang_biyue 使其支持仅返回预处理路径
|
||||
return special_svc.process_yang_biyue_only(file_path)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"智能预处理识别失败: {e}")
|
||||
|
||||
@@ -7,7 +7,6 @@ import time
|
||||
import pandas as pd
|
||||
import logging
|
||||
from typing import Optional, Callable
|
||||
from app.services.order_service import OrderService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -18,16 +17,13 @@ class SpecialSuppliersService:
|
||||
|
||||
def __init__(self, config_manager=None):
|
||||
self.config_manager = config_manager
|
||||
self.order_service = OrderService(config_manager)
|
||||
|
||||
def process_yang_biyue(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]:
|
||||
def process_yang_biyue_only(self, src_path: str) -> Optional[str]:
|
||||
"""
|
||||
处理杨碧月经手的订单(预处理)
|
||||
仅执行杨碧月订单的预处理,返回预处理后的文件路径
|
||||
"""
|
||||
try:
|
||||
if progress_cb: progress_cb(10, "正在进行杨碧月订单预处理...")
|
||||
from app.core.utils.file_utils import smart_read_excel
|
||||
|
||||
# 读取原始数据
|
||||
df = smart_read_excel(src_path)
|
||||
|
||||
@@ -39,10 +35,9 @@ class SpecialSuppliersService:
|
||||
break
|
||||
|
||||
if handler_col is None or not df[handler_col].astype(str).str.contains('杨碧月').any():
|
||||
logger.info("未在订单中找到经手人'杨碧月',跳过特殊预处理")
|
||||
return None
|
||||
|
||||
if progress_cb: progress_cb(30, "识别到杨碧月订单,正在清洗列数据...")
|
||||
logger.info("识别到杨碧月订单,正在执行专用清洗...")
|
||||
|
||||
# 定义列映射关系
|
||||
column_map = {
|
||||
@@ -75,139 +70,137 @@ class SpecialSuppliersService:
|
||||
|
||||
# 保存预处理文件
|
||||
out_dir = os.path.dirname(src_path)
|
||||
final_path = os.path.join(out_dir, "预处理之后.xlsx")
|
||||
base = os.path.basename(src_path)
|
||||
final_path = os.path.join(out_dir, f"预处理之后_{base}")
|
||||
df_clean.to_excel(final_path, index=False)
|
||||
|
||||
return final_path
|
||||
except Exception as e:
|
||||
logger.error(f"预处理杨碧月订单出错: {e}")
|
||||
return None
|
||||
|
||||
def process_yang_biyue(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]:
|
||||
"""
|
||||
处理杨碧月经手的订单(预处理+处理)
|
||||
"""
|
||||
try:
|
||||
if progress_cb: progress_cb(10, "正在进行杨碧月订单预处理...")
|
||||
preprocessed_path = self.process_yang_biyue_only(src_path)
|
||||
|
||||
if not preprocessed_path:
|
||||
return None
|
||||
|
||||
if progress_cb: progress_cb(60, "预处理文件已保存,开始标准转换流程...")
|
||||
|
||||
# 调用标准处理流程
|
||||
result = self.order_service.process_excel(final_path, progress_cb=lambda p: progress_cb(60 + int(p*0.4), "生成采购单中...") if progress_cb else None)
|
||||
# 延迟导入以避免循环依赖
|
||||
from app.services.order_service import OrderService
|
||||
order_service = OrderService(self.config_manager)
|
||||
result = order_service.process_excel(preprocessed_path, progress_cb=lambda p: progress_cb(60 + int(p*0.4), "生成采购单中...") if progress_cb else None)
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"处理杨碧月订单出错: {e}")
|
||||
return None
|
||||
|
||||
def process_rongcheng_yigou(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]:
|
||||
def preprocess_rongcheng_yigou(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]:
|
||||
"""
|
||||
处理蓉城易购订单
|
||||
蓉城易购订单预处理:按用户提供的 E, N, Q, S 列索引进行强制清洗
|
||||
"""
|
||||
try:
|
||||
if progress_cb: progress_cb(10, "正在处理蓉城易购...")
|
||||
if progress_cb: progress_cb(10, "正在处理蓉城易购预处理...")
|
||||
|
||||
def _pick_col(df, exact_list=None, contains_list=None):
|
||||
cols = list(df.columns)
|
||||
if exact_list:
|
||||
for name in exact_list:
|
||||
for c in cols:
|
||||
if str(c).strip() == str(name).strip():
|
||||
return c
|
||||
if contains_list:
|
||||
for kw in contains_list:
|
||||
for c in cols:
|
||||
if kw in str(c):
|
||||
return c
|
||||
return None
|
||||
|
||||
from app.core.utils.file_utils import smart_read_excel
|
||||
try:
|
||||
df_raw = smart_read_excel(src_path, header=2)
|
||||
except Exception:
|
||||
df_raw = smart_read_excel(src_path)
|
||||
df_raw = df_raw.iloc[2:].reset_index(drop=True)
|
||||
|
||||
# 去除全空列与行
|
||||
df_raw = df_raw.dropna(how='all', axis=1).dropna(how='all', axis=0)
|
||||
|
||||
# 选择关键列
|
||||
col_no = _pick_col(df_raw, contains_list=['序号'])
|
||||
col_name = _pick_col(df_raw, contains_list=['商品名称','品名','名称'])
|
||||
col_bc = _pick_col(df_raw, contains_list=['商品条码','条码'])
|
||||
col_unit = _pick_col(df_raw, exact_list=['单位(订购单位)'], contains_list=['订购单位','小单位','单位'])
|
||||
col_qty = _pick_col(df_raw, contains_list=['订购数量','订货数量','数量'])
|
||||
col_price= _pick_col(df_raw, exact_list=['优惠后金额(小单位)'], contains_list=['单价','销售价','进货价','优惠后金额'])
|
||||
col_amt = _pick_col(df_raw, exact_list=['出库小计(元)'], contains_list=['金额','优惠后金额','小计','合计','出库小计'])
|
||||
|
||||
selected = [c for c in [col_no,col_name,col_bc,col_unit,col_qty,col_price,col_amt] if c]
|
||||
# 蓉城易购格式:Row 0是单号,Row 1是联系人,Row 2是表头,Row 3开始是数据
|
||||
df_raw = smart_read_excel(src_path, header=None)
|
||||
|
||||
if not selected or len(selected) < 4:
|
||||
df = pd.read_excel(src_path)
|
||||
df = df.iloc[2:].reset_index(drop=True)
|
||||
keep_idx = [0, 2, 3, 9, 12, 15, 17]
|
||||
keep_idx = [i for i in keep_idx if i < df.shape[1]]
|
||||
df2 = df.iloc[:, keep_idx].copy()
|
||||
target_cols = ['序号','商品名称','商品条码','单位','数量','单价','金额']
|
||||
df2.columns = target_cols[:len(df2.columns)]
|
||||
else:
|
||||
df2 = df_raw[selected].copy()
|
||||
rename_map = {}
|
||||
if col_no: rename_map[col_no] = '序号'
|
||||
if col_name: rename_map[col_name] = '商品名称'
|
||||
if col_bc: rename_map[col_bc] = '商品条码(小条码)'
|
||||
if col_unit: rename_map[col_unit] = '单位'
|
||||
if col_qty: rename_map[col_qty] = '订购数量(小单位)'
|
||||
if col_price: rename_map[col_price] = '单价(小单位)'
|
||||
if col_amt: rename_map[col_amt] = '优惠后金额(小单位)'
|
||||
df2 = df2.rename(columns=rename_map)
|
||||
# 检查数据行数
|
||||
if len(df_raw) <= 3:
|
||||
logger.error("蓉城易购文件数据行数不足")
|
||||
return None
|
||||
|
||||
# 提取数据部分 (Row 3开始)
|
||||
df_data = df_raw.iloc[3:].reset_index(drop=True)
|
||||
|
||||
# 用户指定列映射:
|
||||
# E列 (Index 4) -> 商品条码
|
||||
# N列 (Index 13) -> 数量
|
||||
# Q列 (Index 16) -> 单价
|
||||
# S列 (Index 18) -> 金额
|
||||
# C列 (Index 2) -> 商品名称 (通用需求)
|
||||
|
||||
idx_map = {
|
||||
2: '商品名称',
|
||||
4: '商品条码',
|
||||
13: '数量',
|
||||
16: '单价',
|
||||
18: '金额'
|
||||
}
|
||||
|
||||
# 确保列索引不越界
|
||||
available_indices = [i for i in idx_map.keys() if i < df_data.shape[1]]
|
||||
df2 = df_data.iloc[:, available_indices].copy()
|
||||
df2.columns = [idx_map[i] for i in available_indices]
|
||||
|
||||
# 强制转换类型
|
||||
for c in ['数量', '单价', '金额']:
|
||||
if c in df2.columns:
|
||||
df2[c] = pd.to_numeric(df2[c], errors='coerce').fillna(0)
|
||||
|
||||
# 过滤掉空的条码行
|
||||
df2 = df2.dropna(subset=['商品条码'])
|
||||
df2['商品条码'] = df2['商品条码'].astype(str).str.strip()
|
||||
df2 = df2[df2['商品条码'] != '']
|
||||
|
||||
if '单位' in df2.columns:
|
||||
df2['单位'] = df2['单位'].astype(str).str.strip().replace({'件':'份'})
|
||||
|
||||
# 分裂多条码行并均分数量
|
||||
bc_col = '商品条码(小条码)' if '商品条码(小条码)' in df2.columns else ('商品条码' if '商品条码' in df2.columns else ('条码' if '条码' in df2.columns else None))
|
||||
qty_col = '订购数量(小单位)' if '订购数量(小单位)' in df2.columns else ('订购数量' if '订购数量' in df2.columns else ('数量' if '数量' in df2.columns else None))
|
||||
up_col = '单价(小单位)' if '单价(小单位)' in df2.columns else ('单价' if '单价' in df2.columns else ('销售价' if '销售价' in df2.columns else None))
|
||||
amt_col = '优惠后金额(小单位)' if '优惠后金额(小单位)' in df2.columns else ('金额' if '金额' in df2.columns else ('小计' if '小计' in df2.columns else None))
|
||||
|
||||
if bc_col and qty_col:
|
||||
# 核心逻辑:分裂多条码行并均分数量
|
||||
if '商品条码' in df2.columns and '数量' in df2.columns:
|
||||
rows = []
|
||||
for _, row in df2.iterrows():
|
||||
bc_val = str(row.get(bc_col, '')).strip()
|
||||
if bc_val and any(sep in bc_val for sep in [',',',','、','/',' ']):
|
||||
parts = []
|
||||
temp_bc = bc_val
|
||||
for sep in [',',',','、','/',' ']:
|
||||
temp_bc = temp_bc.replace(sep, ' ')
|
||||
for token in temp_bc.split():
|
||||
tok = ''.join([ch for ch in token if ch.isdigit()])
|
||||
if tok: parts.append(tok)
|
||||
parts = [p for p in parts if p]
|
||||
bc_val = str(row.get('商品条码', '')).strip()
|
||||
# 识别分隔符:/ , , 、
|
||||
if any(sep in bc_val for sep in ['/', ',', ',', '、']):
|
||||
parts = re.split(r'[/,,、]+', bc_val)
|
||||
parts = [p.strip() for p in parts if p.strip()]
|
||||
|
||||
if len(parts) >= 2:
|
||||
try:
|
||||
q_total = float(row.get(qty_col, 0) or 0)
|
||||
except Exception:
|
||||
q_total = 0
|
||||
q_total = float(row.get('数量', 0) or 0)
|
||||
if q_total > 0:
|
||||
n = len(parts)
|
||||
base = int(q_total) // n if q_total.is_integer() else q_total / n
|
||||
remainder = int(q_total) % n if q_total.is_integer() else 0
|
||||
for i, bc in enumerate(parts):
|
||||
base_qty = int(q_total // n)
|
||||
remainder = int(q_total % n)
|
||||
|
||||
for i, p_bc in enumerate(parts):
|
||||
new_row = row.copy()
|
||||
new_row[bc_col] = bc
|
||||
q_each = base + (1 if remainder > 0 and i < remainder else 0)
|
||||
new_row[qty_col] = q_each
|
||||
if up_col and amt_col:
|
||||
new_row['商品条码'] = p_bc
|
||||
current_qty = base_qty + (1 if i < remainder else 0)
|
||||
new_row['数量'] = current_qty
|
||||
if '单价' in new_row:
|
||||
try:
|
||||
upv = float(new_row.get(up_col, 0) or 0)
|
||||
new_row[amt_col] = upv * float(q_each)
|
||||
except Exception: pass
|
||||
up = float(new_row['单价'] or 0)
|
||||
new_row['金额'] = up * current_qty
|
||||
except: pass
|
||||
rows.append(new_row)
|
||||
else: rows.append(row)
|
||||
else: rows.append(row)
|
||||
else: rows.append(row)
|
||||
continue
|
||||
rows.append(row)
|
||||
df2 = pd.DataFrame(rows)
|
||||
|
||||
# 保存预处理文件
|
||||
out_dir = os.path.dirname(src_path)
|
||||
base = os.path.basename(src_path)
|
||||
final_name = f"蓉城易购预处理-{base}"
|
||||
final_path = os.path.join(out_dir, final_name)
|
||||
final_path = os.path.join(out_dir, f"预处理之后_{base}")
|
||||
df2.to_excel(final_path, index=False)
|
||||
|
||||
if progress_cb: progress_cb(60, "预处理完成,开始标准流程...")
|
||||
|
||||
result = self.order_service.process_excel(final_path, progress_cb=lambda p: progress_cb(60 + int(p*0.4), "Excel处理中...") if progress_cb else None)
|
||||
return result
|
||||
if progress_cb: progress_cb(100, "蓉城易购预处理完成")
|
||||
return final_path
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"处理蓉城易购订单出错: {e}")
|
||||
logger.error(f"预处理蓉城易购订单出错: {e}")
|
||||
return None
|
||||
|
||||
def process_rongcheng_yigou(self, src_path: str, progress_cb: Optional[Callable[[int, str], None]] = None) -> Optional[str]:
|
||||
"""
|
||||
兼容性方法:处理蓉城易购订单并执行后续转换
|
||||
"""
|
||||
cleaned_path = self.preprocess_rongcheng_yigou(src_path, progress_cb)
|
||||
if cleaned_path:
|
||||
return self.order_service.process_excel(cleaned_path, progress_cb=lambda p: progress_cb(60 + int(p*0.4), "生成采购单中...") if progress_cb else None)
|
||||
return None
|
||||
|
||||
@@ -73,6 +73,77 @@ class TobaccoService:
|
||||
logger.warning(f"找到的烟草订单明细文件不是今天创建的: {latest_file}")
|
||||
return latest_file # 仍然返回最新文件,但给出警告
|
||||
|
||||
def preprocess_tobacco_order(self, file_path: str) -> Optional[str]:
|
||||
"""
|
||||
烟草订单预处理:按用户提供的 B, E, G, H 列索引进行强制清洗
|
||||
"""
|
||||
try:
|
||||
logger.info(f"执行烟草订单专用预处理: {file_path}")
|
||||
from app.core.utils.file_utils import smart_read_excel
|
||||
|
||||
# 烟草格式:Row 0是专卖证号,Row 1是表头,Row 2是合计,Row 3开始是数据
|
||||
df_raw = smart_read_excel(file_path, header=None)
|
||||
|
||||
if len(df_raw) <= 3:
|
||||
logger.error("烟草订单文件数据行数不足")
|
||||
return None
|
||||
|
||||
# 提取数据部分 (Row 3开始)
|
||||
df_data = df_raw.iloc[3:].reset_index(drop=True)
|
||||
|
||||
# 用户指定列映射:
|
||||
# A列 (Index 0) -> 商品名称
|
||||
# B列 (Index 1) -> 商品条码 (盒码)
|
||||
# E列 (Index 4) -> 批发价 (单价)
|
||||
# G列 (Index 6) -> 订单量 (数量)
|
||||
# H列 (Index 7) -> 金额
|
||||
|
||||
idx_map = {
|
||||
0: '商品名称',
|
||||
1: '商品条码',
|
||||
4: '批发价',
|
||||
6: '数量',
|
||||
7: '金额'
|
||||
}
|
||||
|
||||
available_indices = [i for i in idx_map.keys() if i < df_data.shape[1]]
|
||||
df = df_data.iloc[:, available_indices].copy()
|
||||
df.columns = [idx_map[i] for i in available_indices]
|
||||
|
||||
# 1. 过滤订单量不为0的数据
|
||||
df['数量'] = pd.to_numeric(df['数量'], errors='coerce').fillna(0)
|
||||
df = df[df['数量'] != 0].copy()
|
||||
|
||||
if df.empty:
|
||||
logger.warning("烟草订单无有效订单量记录")
|
||||
return None
|
||||
|
||||
# 2. 核心清洗逻辑:
|
||||
# 数量 = 订单量 * 10 (G列)
|
||||
# 单价 = 批发价 / 10 (E列)
|
||||
df['单价'] = pd.to_numeric(df['批发价'], errors='coerce').fillna(0) / 10
|
||||
df['数量'] = df['数量'] * 10
|
||||
|
||||
# 3. 校验金额 (H列)
|
||||
df['金额'] = pd.to_numeric(df['金额'], errors='coerce').fillna(0)
|
||||
|
||||
# 4. 只保留需要的列
|
||||
final_cols = ['商品条码', '商品名称', '数量', '单价', '金额']
|
||||
df_final = df[final_cols].copy()
|
||||
|
||||
# 保存预处理文件
|
||||
out_dir = os.path.dirname(file_path)
|
||||
base = os.path.basename(file_path)
|
||||
final_path = os.path.join(out_dir, f"预处理之后_{base}")
|
||||
df_final.to_excel(final_path, index=False)
|
||||
|
||||
logger.info(f"烟草订单预处理完成: {final_path}")
|
||||
return final_path
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"烟草订单预处理失败: {e}")
|
||||
return None
|
||||
|
||||
def process_tobacco_order(self, input_file=None):
|
||||
"""
|
||||
处理烟草订单
|
||||
|
||||
Reference in New Issue
Block a user