Files
orc-order-v2/app/services/tobacco_service.py
T
houhuan 6f96bf50ac fix: 修复输出路径问题 — 路径解析改为基于应用目录而非CWD
当从外部目录(如D:\ccc)拖入文件时,输出文件会错误地写入源目录。
根因是所有路径使用相对路径 + os.getcwd() 解析,CWD不同则路径错误。

修复方案:
- ConfigManager.get_path() 改为使用 app_root (exe所在目录/脚本所在目录)
- 将 22 处裸硬编码 "data/result"/"data/output" 替换为 config.get_path()
- 添加 result_folder 到默认配置和 config.ini
- 修复 error_utils.py 中的路径匹配字符串

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-04 23:05:10 +08:00

336 lines
13 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
烟草公司订单处理服务
----------------
处理烟草公司特定格式的订单明细文件,生成银豹采购单
"""
import os
import glob
import datetime
import pandas as pd
import xlrd
import xlwt
import re
from xlutils.copy import copy
from openpyxl import load_workbook
from typing import Optional, Dict, Any, List, Tuple
from app.core.utils.log_utils import get_logger
from app.core.utils.string_utils import parse_monetary_string
from app.core.utils.dialog_utils import show_custom_dialog # 导入自定义弹窗工具
from ..config.settings import ConfigManager
logger = get_logger(__name__)
class TobaccoService:
"""烟草公司订单处理服务"""
def __init__(self, config: Dict[str, Any]):
"""
初始化服务
Args:
config: 配置信息
"""
self.config = config
# 修复配置获取方式,使用fallback机制
self.output_dir = config.get_path('Paths', 'output_folder', fallback='data/output', create=True) if hasattr(config, 'get_path') else os.path.abspath('data/output')
self.template_file = config.get('Paths', 'template_file', fallback='templates/银豹-采购单模板.xls')
# 将烟草订单保存到result目录
result_dir = config.get_path('Paths', 'result_folder', fallback='data/result', create=True) if hasattr(config, 'get_path') else os.path.abspath('data/result')
os.makedirs(result_dir, exist_ok=True)
self.output_file = os.path.join(result_dir, '银豹采购单_烟草公司.xls')
def get_latest_tobacco_order(self) -> Optional[str]:
"""
获取最新的烟草订单明细文件
Returns:
文件路径或None
"""
# 获取今日开始时间戳
today = datetime.date.today()
today_start = datetime.datetime.combine(today, datetime.time.min).timestamp()
# 查找订单明细文件
file_pattern = os.path.join(self.output_dir, "订单明细*.xlsx")
candidates = glob.glob(file_pattern)
if not candidates:
logger.warning("未找到烟草公司订单明细文件")
return None
# 按创建时间排序
candidates.sort(key=os.path.getctime, reverse=True)
latest_file = candidates[0]
# 检查是否是今天的文件
if os.path.getctime(latest_file) >= today_start:
logger.info(f"找到最新烟草订单明细文件: {latest_file}")
return latest_file
else:
logger.warning(f"找到的烟草订单明细文件不是今天创建的: {latest_file}")
return latest_file # 仍然返回最新文件,但给出警告
def preprocess_tobacco_order(self, file_path: str) -> Optional[str]:
"""
烟草订单预处理:按用户提供的 B, E, G, H 列索引进行强制清洗
"""
try:
logger.info(f"执行烟草订单专用预处理: {file_path}")
from app.core.utils.file_utils import smart_read_excel
# 烟草格式:Row 0是专卖证号,Row 1是表头,Row 2是合计,Row 3开始是数据
df_raw = smart_read_excel(file_path, header=None)
if len(df_raw) <= 3:
logger.error("烟草订单文件数据行数不足")
return None
# 提取数据部分 (Row 3开始)
df_data = df_raw.iloc[3:].reset_index(drop=True)
# 用户指定列映射:
# A列 (Index 0) -> 商品名称
# B列 (Index 1) -> 商品条码 (盒码)
# E列 (Index 4) -> 批发价 (单价)
# G列 (Index 6) -> 订单量 (数量)
# H列 (Index 7) -> 金额
idx_map = {
0: '商品名称',
1: '商品条码',
4: '批发价',
6: '数量',
7: '金额'
}
available_indices = [i for i in idx_map.keys() if i < df_data.shape[1]]
df = df_data.iloc[:, available_indices].copy()
df.columns = [idx_map[i] for i in available_indices]
# 1. 过滤订单量不为0的数据
df['数量'] = pd.to_numeric(df['数量'], errors='coerce').fillna(0)
df = df[df['数量'] != 0].copy()
if df.empty:
logger.warning("烟草订单无有效订单量记录")
return None
# 2. 核心清洗逻辑:
# 数量 = 订单量 * 10 (G列)
# 单价 = 批发价 / 10 (E列)
df['单价'] = pd.to_numeric(df['批发价'], errors='coerce').fillna(0) / 10
df['数量'] = df['数量'] * 10
# 3. 校验金额 (H列)
df['金额'] = pd.to_numeric(df['金额'], errors='coerce').fillna(0)
# 4. 只保留需要的列
final_cols = ['商品条码', '商品名称', '数量', '单价', '金额']
df_final = df[final_cols].copy()
# 保存预处理文件
out_dir = os.path.dirname(file_path)
base = os.path.basename(file_path)
final_path = os.path.join(out_dir, f"预处理之后_{base}")
df_final.to_excel(final_path, index=False)
logger.info(f"烟草订单预处理完成: {final_path}")
return final_path
except Exception as e:
logger.error(f"烟草订单预处理失败: {e}")
return None
def process_tobacco_order(self, input_file=None):
"""
处理烟草订单
Args:
input_file: 输入文件路径,如果为None则自动查找最新文件
Returns:
输出文件路径或None(如果处理失败)
"""
try:
# 如果没有指定输入文件,查找最新的文件
if input_file is None:
input_file = self.get_latest_tobacco_order()
if input_file is None:
logger.warning("未找到烟草公司订单明细文件")
logger.error("未找到可处理的烟草订单明细文件")
return None
logger.info(f"开始处理烟草公司订单: {input_file}")
# 读取订单时间和总金额
order_info = self._read_order_info(input_file)
if not order_info:
logger.error(f"读取订单信息失败: {input_file}")
return None
order_time, total_amount = order_info
# 读取订单数据
order_data = self._read_order_data(input_file)
if order_data is None or order_data.empty:
logger.error(f"读取订单数据失败: {input_file}")
return None
# 生成银豹采购单
output_file = self._generate_pospal_order(order_data, order_time)
if not output_file:
logger.error("生成银豹采购单失败")
return None
# 获取处理条目数
total_count = len(order_data)
# 输出处理结果
logger.info(f"烟草公司订单处理成功,订单时间: {order_time}, 总金额: {total_amount}, 处理条目: {total_count}")
logger.info(f"采购单已生成: {output_file}")
# 显示处理结果对话框
self.show_result_dialog(output_file, order_time, total_count, total_amount)
return output_file
except Exception as e:
logger.error(f"处理烟草公司订单时发生错误: {e}", exc_info=True)
return None
def _read_order_info(self, file_path: str) -> Optional[Tuple[str, float]]:
"""
读取订单信息(时间和总金额)
Args:
file_path: 文件路径
Returns:
包含订单时间和总金额的元组或None
"""
try:
wb_info = load_workbook(file_path, data_only=True)
ws_info = wb_info.active
order_time = ws_info["H1"].value or "(空)"
total_amount = ws_info["H3"].value or 0
return (order_time, total_amount)
except Exception as e:
logger.error(f"读取订单信息出错: {e}")
return None
def _read_order_data(self, file_path: str) -> Optional[pd.DataFrame]:
"""
读取订单数据
Args:
file_path: 文件路径
Returns:
订单数据DataFrame或None
"""
columns = ['商品', '盒码', '条码', '建议零售价', '批发价', '需求量', '订单量', '金额']
try:
from app.core.utils.file_utils import smart_read_excel
# 读取Excel文件
df_old = smart_read_excel(file_path, header=None, skiprows=3, names=columns)
# 过滤订单量不为0的数据,并计算采购量和单价
df_filtered = df_old[df_old['订单量'] != 0].copy()
df_filtered['采购量'] = df_filtered['订单量'] * 10
df_filtered['采购单价'] = df_filtered['金额'] / df_filtered['采购量']
df_filtered = df_filtered.reset_index(drop=True)
return df_filtered
except Exception as e:
logger.error(f"读取订单数据失败: {e}")
return None
def _generate_pospal_order(self, order_data: pd.DataFrame, order_time: str) -> Optional[str]:
"""
生成银豹采购单
Args:
order_data: 订单数据
order_time: 订单时间
Returns:
输出文件路径或None
"""
try:
# 检查模板文件是否存在
if not os.path.exists(self.template_file):
logger.error(f"采购单模板文件不存在: {self.template_file}")
return None
# 打开模板,准备写入
template_rd = xlrd.open_workbook(self.template_file, formatting_info=True)
template_wb = copy(template_rd)
template_ws = template_wb.get_sheet(0)
# 获取模板中的表头列索引
header_row = template_rd.sheet_by_index(0).row_values(0)
barcode_col = header_row.index("条码(必填)")
amount_col = header_row.index("采购量(必填)")
gift_col = header_row.index("赠送量")
price_col = header_row.index("采购单价(必填)")
# 写入数据到模板
for i, row in order_data.iterrows():
template_ws.write(i + 1, barcode_col, row['盒码']) # 商品条码
template_ws.write(i + 1, amount_col, int(row['采购量'])) # 采购量
template_ws.write(i + 1, gift_col, "") # 赠送量为空
template_ws.write(i + 1, price_col, round(row['采购单价'], 2)) # 采购单价保留两位小数
# 确保输出目录存在
os.makedirs(os.path.dirname(self.output_file), exist_ok=True)
# 保存输出文件
template_wb.save(self.output_file)
logger.info(f"采购单生成成功: {self.output_file}")
return self.output_file
except Exception as e:
logger.error(f"生成银豹采购单失败: {e}")
return None
def show_result_dialog(self, output_file, order_time, total_count, total_amount):
"""
显示处理结果对话框
Args:
output_file: 输出文件路径
order_time: 订单时间
total_count: 总处理条目
total_amount: 总金额
"""
# 创建附加信息
additional_info = {
"订单来源": "烟草公司",
"处理时间": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# 确保 total_amount 是数字类型
parsed = parse_monetary_string(total_amount)
total_amount = parsed if parsed is not None else 0.0
amount_display = f"¥{total_amount:.2f}"
# 显示自定义对话框
show_custom_dialog(
title="烟草订单处理结果",
message="烟草订单处理完成",
result_file=output_file,
time_info=order_time,
count_info=f"{total_count}个商品",
amount_info=amount_display,
additional_info=additional_info
)
# 记录日志
logger.info(f"烟草公司订单处理成功,订单时间: {order_time}, 总金额: {total_amount}, 处理条目: {total_count}")