orc-order-v2/app/services/tobacco_service.py
houhuan 32d41244e5 feat: 实现智能订单识别与自动预处理路由
- 新增智能识别功能,自动检测蓉城易购、烟草公司、杨碧月订单特征
- 修改订单服务流程,在Excel处理前自动执行专用预处理
- 更新无界面API,支持智能识别模式,简化OpenClaw集成
- 完善供应商专用预处理逻辑,修复数量计算和单位换算问题
- 添加变更日志和最终更新报告文档,记录v2.1版本变更
2026-03-30 15:36:27 +08:00

339 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
烟草公司订单处理服务
----------------
处理烟草公司特定格式的订单明细文件,生成银豹采购单
"""
import os
import glob
import datetime
import pandas as pd
import xlrd
import xlwt
import re
from xlutils.copy import copy
from openpyxl import load_workbook
from typing import Optional, Dict, Any, List, Tuple
from app.core.utils.log_utils import get_logger
from app.core.utils.dialog_utils import show_custom_dialog # 导入自定义弹窗工具
from ..config.settings import ConfigManager
logger = get_logger(__name__)
class TobaccoService:
"""烟草公司订单处理服务"""
def __init__(self, config: Dict[str, Any]):
"""
初始化服务
Args:
config: 配置信息
"""
self.config = config
# 修复配置获取方式使用fallback机制
self.output_dir = config.get('Paths', 'output_folder', fallback='data/output')
self.template_file = config.get('Paths', 'template_file', fallback='templates/银豹-采购单模板.xls')
# 将烟草订单保存到result目录
result_dir = "data/result"
os.makedirs(result_dir, exist_ok=True)
self.output_file = os.path.join(result_dir, '银豹采购单_烟草公司.xls')
def get_latest_tobacco_order(self) -> Optional[str]:
"""
获取最新的烟草订单明细文件
Returns:
文件路径或None
"""
# 获取今日开始时间戳
today = datetime.date.today()
today_start = datetime.datetime.combine(today, datetime.time.min).timestamp()
# 查找订单明细文件
file_pattern = os.path.join(self.output_dir, "订单明细*.xlsx")
candidates = glob.glob(file_pattern)
if not candidates:
logger.warning("未找到烟草公司订单明细文件")
return None
# 按创建时间排序
candidates.sort(key=os.path.getctime, reverse=True)
latest_file = candidates[0]
# 检查是否是今天的文件
if os.path.getctime(latest_file) >= today_start:
logger.info(f"找到最新烟草订单明细文件: {latest_file}")
return latest_file
else:
logger.warning(f"找到的烟草订单明细文件不是今天创建的: {latest_file}")
return latest_file # 仍然返回最新文件,但给出警告
def preprocess_tobacco_order(self, file_path: str) -> Optional[str]:
"""
烟草订单预处理:按用户提供的 B, E, G, H 列索引进行强制清洗
"""
try:
logger.info(f"执行烟草订单专用预处理: {file_path}")
from app.core.utils.file_utils import smart_read_excel
# 烟草格式Row 0是专卖证号Row 1是表头Row 2是合计Row 3开始是数据
df_raw = smart_read_excel(file_path, header=None)
if len(df_raw) <= 3:
logger.error("烟草订单文件数据行数不足")
return None
# 提取数据部分 (Row 3开始)
df_data = df_raw.iloc[3:].reset_index(drop=True)
# 用户指定列映射:
# A列 (Index 0) -> 商品名称
# B列 (Index 1) -> 商品条码 (盒码)
# E列 (Index 4) -> 批发价 (单价)
# G列 (Index 6) -> 订单量 (数量)
# H列 (Index 7) -> 金额
idx_map = {
0: '商品名称',
1: '商品条码',
4: '批发价',
6: '数量',
7: '金额'
}
available_indices = [i for i in idx_map.keys() if i < df_data.shape[1]]
df = df_data.iloc[:, available_indices].copy()
df.columns = [idx_map[i] for i in available_indices]
# 1. 过滤订单量不为0的数据
df['数量'] = pd.to_numeric(df['数量'], errors='coerce').fillna(0)
df = df[df['数量'] != 0].copy()
if df.empty:
logger.warning("烟草订单无有效订单量记录")
return None
# 2. 核心清洗逻辑:
# 数量 = 订单量 * 10 (G列)
# 单价 = 批发价 / 10 (E列)
df['单价'] = pd.to_numeric(df['批发价'], errors='coerce').fillna(0) / 10
df['数量'] = df['数量'] * 10
# 3. 校验金额 (H列)
df['金额'] = pd.to_numeric(df['金额'], errors='coerce').fillna(0)
# 4. 只保留需要的列
final_cols = ['商品条码', '商品名称', '数量', '单价', '金额']
df_final = df[final_cols].copy()
# 保存预处理文件
out_dir = os.path.dirname(file_path)
base = os.path.basename(file_path)
final_path = os.path.join(out_dir, f"预处理之后_{base}")
df_final.to_excel(final_path, index=False)
logger.info(f"烟草订单预处理完成: {final_path}")
return final_path
except Exception as e:
logger.error(f"烟草订单预处理失败: {e}")
return None
def process_tobacco_order(self, input_file=None):
"""
处理烟草订单
Args:
input_file: 输入文件路径如果为None则自动查找最新文件
Returns:
输出文件路径或None如果处理失败
"""
try:
# 如果没有指定输入文件,查找最新的文件
if input_file is None:
input_file = self.get_latest_tobacco_order()
if input_file is None:
logger.warning("未找到烟草公司订单明细文件")
logger.error("未找到可处理的烟草订单明细文件")
return None
logger.info(f"开始处理烟草公司订单: {input_file}")
# 读取订单时间和总金额
order_info = self._read_order_info(input_file)
if not order_info:
logger.error(f"读取订单信息失败: {input_file}")
return None
order_time, total_amount = order_info
# 读取订单数据
order_data = self._read_order_data(input_file)
if order_data is None or order_data.empty:
logger.error(f"读取订单数据失败: {input_file}")
return None
# 生成银豹采购单
output_file = self._generate_pospal_order(order_data, order_time)
if not output_file:
logger.error("生成银豹采购单失败")
return None
# 获取处理条目数
total_count = len(order_data)
# 输出处理结果
logger.info(f"烟草公司订单处理成功,订单时间: {order_time}, 总金额: {total_amount}, 处理条目: {total_count}")
logger.info(f"采购单已生成: {output_file}")
# 显示处理结果对话框
self.show_result_dialog(output_file, order_time, total_count, total_amount)
return output_file
except Exception as e:
logger.error(f"处理烟草公司订单时发生错误: {e}", exc_info=True)
return None
def _read_order_info(self, file_path: str) -> Optional[Tuple[str, float]]:
"""
读取订单信息(时间和总金额)
Args:
file_path: 文件路径
Returns:
包含订单时间和总金额的元组或None
"""
try:
wb_info = load_workbook(file_path, data_only=True)
ws_info = wb_info.active
order_time = ws_info["H1"].value or "(空)"
total_amount = ws_info["H3"].value or 0
return (order_time, total_amount)
except Exception as e:
logger.error(f"读取订单信息出错: {e}")
return None
def _read_order_data(self, file_path: str) -> Optional[pd.DataFrame]:
"""
读取订单数据
Args:
file_path: 文件路径
Returns:
订单数据DataFrame或None
"""
columns = ['商品', '盒码', '条码', '建议零售价', '批发价', '需求量', '订单量', '金额']
try:
from app.core.utils.file_utils import smart_read_excel
# 读取Excel文件
df_old = smart_read_excel(file_path, header=None, skiprows=3, names=columns)
# 过滤订单量不为0的数据并计算采购量和单价
df_filtered = df_old[df_old['订单量'] != 0].copy()
df_filtered['采购量'] = df_filtered['订单量'] * 10
df_filtered['采购单价'] = df_filtered['金额'] / df_filtered['采购量']
df_filtered = df_filtered.reset_index(drop=True)
return df_filtered
except Exception as e:
logger.error(f"读取订单数据失败: {e}")
return None
def _generate_pospal_order(self, order_data: pd.DataFrame, order_time: str) -> Optional[str]:
"""
生成银豹采购单
Args:
order_data: 订单数据
order_time: 订单时间
Returns:
输出文件路径或None
"""
try:
# 检查模板文件是否存在
if not os.path.exists(self.template_file):
logger.error(f"采购单模板文件不存在: {self.template_file}")
return None
# 打开模板,准备写入
template_rd = xlrd.open_workbook(self.template_file, formatting_info=True)
template_wb = copy(template_rd)
template_ws = template_wb.get_sheet(0)
# 获取模板中的表头列索引
header_row = template_rd.sheet_by_index(0).row_values(0)
barcode_col = header_row.index("条码(必填)")
amount_col = header_row.index("采购量(必填)")
gift_col = header_row.index("赠送量")
price_col = header_row.index("采购单价(必填)")
# 写入数据到模板
for i, row in order_data.iterrows():
template_ws.write(i + 1, barcode_col, row['盒码']) # 商品条码
template_ws.write(i + 1, amount_col, int(row['采购量'])) # 采购量
template_ws.write(i + 1, gift_col, "") # 赠送量为空
template_ws.write(i + 1, price_col, round(row['采购单价'], 2)) # 采购单价保留两位小数
# 确保输出目录存在
os.makedirs(os.path.dirname(self.output_file), exist_ok=True)
# 保存输出文件
template_wb.save(self.output_file)
logger.info(f"采购单生成成功: {self.output_file}")
return self.output_file
except Exception as e:
logger.error(f"生成银豹采购单失败: {e}")
return None
def show_result_dialog(self, output_file, order_time, total_count, total_amount):
"""
显示处理结果对话框
Args:
output_file: 输出文件路径
order_time: 订单时间
total_count: 总处理条目
total_amount: 总金额
"""
# 创建附加信息
additional_info = {
"订单来源": "烟草公司",
"处理时间": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# 确保 total_amount 是数字类型
try:
if isinstance(total_amount, str):
total_amount = float(total_amount.replace(',', ''))
amount_display = f"¥{total_amount:.2f}"
except (ValueError, TypeError):
# 如果转换失败,直接使用原始值
amount_display = f"¥{total_amount}"
# 显示自定义对话框
show_custom_dialog(
title="烟草订单处理结果",
message="烟草订单处理完成",
result_file=output_file,
time_info=order_time,
count_info=f"{total_count}个商品",
amount_info=amount_display,
additional_info=additional_info
)
# 记录日志
logger.info(f"烟草公司订单处理成功,订单时间: {order_time}, 总金额: {total_amount}, 处理条目: {total_count}")