#!/usr/bin/env python # -*- coding: utf-8 -*- """ 烟草公司订单处理服务 ---------------- 处理烟草公司特定格式的订单明细文件,生成银豹采购单 """ import os import glob import datetime import pandas as pd import xlrd import xlwt import re from xlutils.copy import copy from openpyxl import load_workbook from typing import Optional, Dict, Any, List, Tuple from app.core.utils.log_utils import get_logger from app.core.utils.dialog_utils import show_custom_dialog # 导入自定义弹窗工具 from ..config.settings import ConfigManager logger = get_logger(__name__) class TobaccoService: """烟草公司订单处理服务""" def __init__(self, config: Dict[str, Any]): """ 初始化服务 Args: config: 配置信息 """ self.config = config # 修复配置获取方式,使用fallback机制 self.output_dir = config.get('Paths', 'output_folder', fallback='data/output') self.template_file = config.get('Paths', 'template_file', fallback='templates/银豹-采购单模板.xls') # 将烟草订单保存到result目录 result_dir = "data/result" os.makedirs(result_dir, exist_ok=True) self.output_file = os.path.join(result_dir, '银豹采购单_烟草公司.xls') def get_latest_tobacco_order(self) -> Optional[str]: """ 获取最新的烟草订单明细文件 Returns: 文件路径或None """ # 获取今日开始时间戳 today = datetime.date.today() today_start = datetime.datetime.combine(today, datetime.time.min).timestamp() # 查找订单明细文件 file_pattern = os.path.join(self.output_dir, "订单明细*.xlsx") candidates = glob.glob(file_pattern) if not candidates: logger.warning("未找到烟草公司订单明细文件") return None # 按创建时间排序 candidates.sort(key=os.path.getctime, reverse=True) latest_file = candidates[0] # 检查是否是今天的文件 if os.path.getctime(latest_file) >= today_start: logger.info(f"找到最新烟草订单明细文件: {latest_file}") return latest_file else: logger.warning(f"找到的烟草订单明细文件不是今天创建的: {latest_file}") return latest_file # 仍然返回最新文件,但给出警告 def preprocess_tobacco_order(self, file_path: str) -> Optional[str]: """ 烟草订单预处理:按用户提供的 B, E, G, H 列索引进行强制清洗 """ try: logger.info(f"执行烟草订单专用预处理: {file_path}") from app.core.utils.file_utils import smart_read_excel # 烟草格式:Row 0是专卖证号,Row 1是表头,Row 2是合计,Row 3开始是数据 df_raw = smart_read_excel(file_path, header=None) if len(df_raw) <= 3: logger.error("烟草订单文件数据行数不足") return None # 提取数据部分 (Row 3开始) df_data = df_raw.iloc[3:].reset_index(drop=True) # 用户指定列映射: # A列 (Index 0) -> 商品名称 # B列 (Index 1) -> 商品条码 (盒码) # E列 (Index 4) -> 批发价 (单价) # G列 (Index 6) -> 订单量 (数量) # H列 (Index 7) -> 金额 idx_map = { 0: '商品名称', 1: '商品条码', 4: '批发价', 6: '数量', 7: '金额' } available_indices = [i for i in idx_map.keys() if i < df_data.shape[1]] df = df_data.iloc[:, available_indices].copy() df.columns = [idx_map[i] for i in available_indices] # 1. 过滤订单量不为0的数据 df['数量'] = pd.to_numeric(df['数量'], errors='coerce').fillna(0) df = df[df['数量'] != 0].copy() if df.empty: logger.warning("烟草订单无有效订单量记录") return None # 2. 核心清洗逻辑: # 数量 = 订单量 * 10 (G列) # 单价 = 批发价 / 10 (E列) df['单价'] = pd.to_numeric(df['批发价'], errors='coerce').fillna(0) / 10 df['数量'] = df['数量'] * 10 # 3. 校验金额 (H列) df['金额'] = pd.to_numeric(df['金额'], errors='coerce').fillna(0) # 4. 只保留需要的列 final_cols = ['商品条码', '商品名称', '数量', '单价', '金额'] df_final = df[final_cols].copy() # 保存预处理文件 out_dir = os.path.dirname(file_path) base = os.path.basename(file_path) final_path = os.path.join(out_dir, f"预处理之后_{base}") df_final.to_excel(final_path, index=False) logger.info(f"烟草订单预处理完成: {final_path}") return final_path except Exception as e: logger.error(f"烟草订单预处理失败: {e}") return None def process_tobacco_order(self, input_file=None): """ 处理烟草订单 Args: input_file: 输入文件路径,如果为None则自动查找最新文件 Returns: 输出文件路径或None(如果处理失败) """ try: # 如果没有指定输入文件,查找最新的文件 if input_file is None: input_file = self.get_latest_tobacco_order() if input_file is None: logger.warning("未找到烟草公司订单明细文件") logger.error("未找到可处理的烟草订单明细文件") return None logger.info(f"开始处理烟草公司订单: {input_file}") # 读取订单时间和总金额 order_info = self._read_order_info(input_file) if not order_info: logger.error(f"读取订单信息失败: {input_file}") return None order_time, total_amount = order_info # 读取订单数据 order_data = self._read_order_data(input_file) if order_data is None or order_data.empty: logger.error(f"读取订单数据失败: {input_file}") return None # 生成银豹采购单 output_file = self._generate_pospal_order(order_data, order_time) if not output_file: logger.error("生成银豹采购单失败") return None # 获取处理条目数 total_count = len(order_data) # 输出处理结果 logger.info(f"烟草公司订单处理成功,订单时间: {order_time}, 总金额: {total_amount}, 处理条目: {total_count}") logger.info(f"采购单已生成: {output_file}") # 显示处理结果对话框 self.show_result_dialog(output_file, order_time, total_count, total_amount) return output_file except Exception as e: logger.error(f"处理烟草公司订单时发生错误: {e}", exc_info=True) return None def _read_order_info(self, file_path: str) -> Optional[Tuple[str, float]]: """ 读取订单信息(时间和总金额) Args: file_path: 文件路径 Returns: 包含订单时间和总金额的元组或None """ try: wb_info = load_workbook(file_path, data_only=True) ws_info = wb_info.active order_time = ws_info["H1"].value or "(空)" total_amount = ws_info["H3"].value or 0 return (order_time, total_amount) except Exception as e: logger.error(f"读取订单信息出错: {e}") return None def _read_order_data(self, file_path: str) -> Optional[pd.DataFrame]: """ 读取订单数据 Args: file_path: 文件路径 Returns: 订单数据DataFrame或None """ columns = ['商品', '盒码', '条码', '建议零售价', '批发价', '需求量', '订单量', '金额'] try: from app.core.utils.file_utils import smart_read_excel # 读取Excel文件 df_old = smart_read_excel(file_path, header=None, skiprows=3, names=columns) # 过滤订单量不为0的数据,并计算采购量和单价 df_filtered = df_old[df_old['订单量'] != 0].copy() df_filtered['采购量'] = df_filtered['订单量'] * 10 df_filtered['采购单价'] = df_filtered['金额'] / df_filtered['采购量'] df_filtered = df_filtered.reset_index(drop=True) return df_filtered except Exception as e: logger.error(f"读取订单数据失败: {e}") return None def _generate_pospal_order(self, order_data: pd.DataFrame, order_time: str) -> Optional[str]: """ 生成银豹采购单 Args: order_data: 订单数据 order_time: 订单时间 Returns: 输出文件路径或None """ try: # 检查模板文件是否存在 if not os.path.exists(self.template_file): logger.error(f"采购单模板文件不存在: {self.template_file}") return None # 打开模板,准备写入 template_rd = xlrd.open_workbook(self.template_file, formatting_info=True) template_wb = copy(template_rd) template_ws = template_wb.get_sheet(0) # 获取模板中的表头列索引 header_row = template_rd.sheet_by_index(0).row_values(0) barcode_col = header_row.index("条码(必填)") amount_col = header_row.index("采购量(必填)") gift_col = header_row.index("赠送量") price_col = header_row.index("采购单价(必填)") # 写入数据到模板 for i, row in order_data.iterrows(): template_ws.write(i + 1, barcode_col, row['盒码']) # 商品条码 template_ws.write(i + 1, amount_col, int(row['采购量'])) # 采购量 template_ws.write(i + 1, gift_col, "") # 赠送量为空 template_ws.write(i + 1, price_col, round(row['采购单价'], 2)) # 采购单价保留两位小数 # 确保输出目录存在 os.makedirs(os.path.dirname(self.output_file), exist_ok=True) # 保存输出文件 template_wb.save(self.output_file) logger.info(f"采购单生成成功: {self.output_file}") return self.output_file except Exception as e: logger.error(f"生成银豹采购单失败: {e}") return None def show_result_dialog(self, output_file, order_time, total_count, total_amount): """ 显示处理结果对话框 Args: output_file: 输出文件路径 order_time: 订单时间 total_count: 总处理条目 total_amount: 总金额 """ # 创建附加信息 additional_info = { "订单来源": "烟草公司", "处理时间": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") } # 确保 total_amount 是数字类型 try: if isinstance(total_amount, str): total_amount = float(total_amount.replace(',', '')) amount_display = f"¥{total_amount:.2f}" except (ValueError, TypeError): # 如果转换失败,直接使用原始值 amount_display = f"¥{total_amount}" # 显示自定义对话框 show_custom_dialog( title="烟草订单处理结果", message="烟草订单处理完成", result_file=output_file, time_info=order_time, count_info=f"{total_count}个商品", amount_info=amount_display, additional_info=additional_info ) # 记录日志 logger.info(f"烟草公司订单处理成功,订单时间: {order_time}, 总金额: {total_amount}, 处理条目: {total_count}")