orc-order-v2/app/core/excel/merger.py
2025-11-15 18:46:03 +08:00

469 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
订单合并模块
----------
提供采购单合并功能,将多个采购单合并为一个。
"""
import os
import re
import pandas as pd
import numpy as np
import xlrd
import xlwt
from xlutils.copy import copy as xlcopy
from typing import Dict, List, Optional, Tuple, Union, Any, Callable
from datetime import datetime
from ...config.settings import ConfigManager
from ..utils.log_utils import get_logger
from ..utils.file_utils import (
ensure_dir,
get_file_extension,
get_files_by_extensions,
load_json,
save_json
)
from ..utils.string_utils import (
clean_string,
clean_barcode,
format_barcode
)
logger = get_logger(__name__)
class PurchaseOrderMerger:
"""
采购单合并器将多个采购单Excel文件合并成一个文件
"""
def __init__(self, config):
"""
初始化采购单合并器
Args:
config: 配置信息
"""
self.config = config
# 修复ConfigParser对象没有get_path方法的问题
try:
# 获取输出目录
self.output_dir = config.get('Paths', 'output_folder', fallback='data/output')
# 确保目录存在
os.makedirs(self.output_dir, exist_ok=True)
# 记录实际路径
logger.info(f"使用输出目录: {os.path.abspath(self.output_dir)}")
# 获取模板文件路径
template_folder = config.get('Paths', 'template_folder', fallback='templates')
template_name = config.get('Templates', 'purchase_order', fallback='银豹-采购单模板.xls')
self.template_path = os.path.join(template_folder, template_name)
# 检查模板文件是否存在
if not os.path.exists(self.template_path):
logger.warning(f"模板文件不存在: {self.template_path}")
# 用于记录已合并的文件
self.merged_files_json = os.path.join(self.output_dir, "merged_files.json")
self.merged_files = self._load_merged_files()
logger.info(f"初始化PurchaseOrderMerger完成模板文件: {self.template_path}")
except Exception as e:
logger.error(f"初始化PurchaseOrderMerger失败: {e}")
raise
def _load_merged_files(self) -> Dict[str, str]:
"""
加载已合并文件的缓存
Returns:
合并记录字典
"""
return load_json(self.merged_files_json, {})
def _save_merged_files(self) -> None:
"""保存已合并文件的缓存"""
save_json(self.merged_files, self.merged_files_json)
def get_purchase_orders(self) -> List[str]:
"""
获取result目录下的采购单Excel文件
Returns:
采购单文件路径列表
"""
# 采购单文件保存在data/result目录
result_dir = "data/result"
logger.info(f"搜索目录 {result_dir} 中的采购单Excel文件")
# 确保目录存在
os.makedirs(result_dir, exist_ok=True)
# 获取所有Excel文件
all_files = get_files_by_extensions(result_dir, ['.xls', '.xlsx'])
# 筛选采购单文件
purchase_orders = [
file for file in all_files
if os.path.basename(file).startswith('采购单_')
]
if not purchase_orders:
logger.warning(f"未在 {result_dir} 目录下找到采购单Excel文件")
return []
# 按修改时间排序,最新的在前
purchase_orders.sort(key=lambda x: os.path.getmtime(x), reverse=True)
logger.info(f"找到 {len(purchase_orders)} 个采购单Excel文件")
return purchase_orders
def read_purchase_order(self, file_path: str) -> Optional[pd.DataFrame]:
"""
读取采购单Excel文件
Args:
file_path: 采购单文件路径
Returns:
数据帧如果读取失败则返回None
"""
try:
# 读取Excel文件
df = pd.read_excel(file_path)
logger.info(f"成功读取采购单文件: {file_path}")
# 打印列名,用于调试
logger.debug(f"Excel文件的列名: {df.columns.tolist()}")
# 处理特殊情况:检查是否需要读取指定行作为标题行
for header_row_idx in range(5): # 检查前5行
if len(df) <= header_row_idx:
continue
potential_header = df.iloc[header_row_idx].astype(str)
header_keywords = ['条码', '条形码', '商品条码', '商品名称', '规格', '单价', '数量', '金额', '单位', '必填']
matches = sum(1 for keyword in header_keywords if any(keyword in str(val) for val in potential_header.values))
if matches >= 3: # 如果至少匹配3个关键词认为是表头
logger.info(f"检测到表头在第 {header_row_idx+1}")
# 使用此行作为列名,数据从下一行开始
header_row = potential_header
data_rows = df.iloc[header_row_idx+1:].reset_index(drop=True)
# 为每一列分配名称(避免重复的列名)
new_columns = []
for i, col in enumerate(header_row):
col_str = str(col)
if col_str == 'nan' or col_str == 'None' or pd.isna(col):
new_columns.append(f"Col_{i}")
else:
new_columns.append(col_str)
# 使用新列名创建新的DataFrame
data_rows.columns = new_columns
df = data_rows
logger.debug(f"重新构建的数据帧列名: {df.columns.tolist()}")
break
# 定义可能的列名映射
column_mapping = {
'条码': ['条码', '条形码', '商品条码', 'barcode', '商品条形码', '条形码', '商品条码', '商品编码', '商品编号', '条形码', '条码(必填)'],
'采购量': ['数量', '采购数量', '购买数量', '采购数量', '订单数量', '采购数量', '采购量(必填)', '采购量', '数量(必填)'],
'采购单价': ['单价', '价格', '采购单价', '销售价', '采购单价(必填)', '单价(必填)', '价格(必填)'],
'赠送量': ['赠送量', '赠品数量', '赠送数量', '赠品']
}
# 显示所有列名,用于调试
all_columns = df.columns.tolist()
logger.info(f"列名: {all_columns}")
# 映射实际的列名
mapped_columns = {}
for target_col, possible_names in column_mapping.items():
for col in all_columns:
# 清理列名以进行匹配
col_str = str(col).strip()
# 直接匹配整个列名
if col_str in possible_names:
mapped_columns[target_col] = col
logger.info(f"直接匹配列名: {col_str} -> {target_col}")
break
# 移除列名中的空白字符进行比较
clean_col = re.sub(r'\s+', '', col_str)
for name in possible_names:
clean_name = re.sub(r'\s+', '', name)
# 完全匹配
if clean_col == clean_name:
mapped_columns[target_col] = col
logger.info(f"清理后匹配列名: {col_str} -> {target_col}")
break
# 部分匹配(列名包含关键词)
elif clean_name in clean_col:
mapped_columns[target_col] = col
logger.info(f"部分匹配列名: {col_str} -> {target_col}")
break
if target_col in mapped_columns:
break
# 如果没有找到匹配,尝试模糊匹配
if target_col not in mapped_columns:
for col in all_columns:
col_str = str(col).strip().lower()
for name in possible_names:
name_lower = name.lower()
if name_lower in col_str:
mapped_columns[target_col] = col
logger.info(f"模糊匹配列名: {col} -> {target_col}")
break
if target_col in mapped_columns:
break
# 如果找到了必要的列,重命名列
if mapped_columns:
rename_dict = {mapped_columns[key]: key for key in mapped_columns}
logger.info(f"列名重命名映射: {rename_dict}")
df = df.rename(columns=rename_dict)
logger.info(f"重命名后的列名: {df.columns.tolist()}")
else:
logger.warning(f"未找到可映射的列名: {file_path}")
return df
except Exception as e:
logger.error(f"读取采购单文件失败: {file_path}, 错误: {str(e)}")
return None
def merge_purchase_orders(self, file_paths: List[str]) -> Optional[pd.DataFrame]:
"""
合并多个采购单文件
Args:
file_paths: 采购单文件路径列表
Returns:
合并后的数据帧如果合并失败则返回None
"""
if not file_paths:
logger.warning("没有需要合并的采购单文件")
return None
# 读取所有采购单文件
dfs = []
for file_path in file_paths:
df = self.read_purchase_order(file_path)
if df is not None:
dfs.append(df)
if not dfs:
logger.warning("没有成功读取的采购单文件")
return None
# 合并数据
logger.info(f"开始合并 {len(dfs)} 个采购单文件")
# 首先,整理每个数据帧以确保它们有相同的结构
processed_dfs = []
for i, df in enumerate(dfs):
# 确保必要的列存在
required_columns = ['条码', '采购量', '采购单价']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
logger.warning(f"数据帧 {i} 缺少必要的列: {missing_columns}")
continue
# 处理赠送量列不存在的情况
if '赠送量' not in df.columns:
df['赠送量'] = 0
# 选择并清理需要的列
cleaned_df = pd.DataFrame()
# 清理条码 - 确保是字符串且无小数点
cleaned_df['条码'] = df['条码'].apply(lambda x: format_barcode(x) if pd.notna(x) else '')
# 清理采购量 - 确保是数字
cleaned_df['采购量'] = pd.to_numeric(df['采购量'], errors='coerce').fillna(0)
# 清理单价 - 确保是数字并保留4位小数
cleaned_df['采购单价'] = pd.to_numeric(df['采购单价'], errors='coerce').fillna(0).round(4)
# 清理赠送量 - 确保是数字
cleaned_df['赠送量'] = pd.to_numeric(df['赠送量'], errors='coerce').fillna(0)
# 过滤无效行 - 条码为空或采购量为0的行跳过
valid_df = cleaned_df[(cleaned_df['条码'] != '') & (cleaned_df['采购量'] > 0)]
if len(valid_df) > 0:
processed_dfs.append(valid_df)
logger.info(f"处理文件 {i+1}: 有效记录 {len(valid_df)}")
else:
logger.warning(f"处理文件 {i+1}: 没有有效记录")
if not processed_dfs:
logger.warning("没有有效的数据帧用于合并")
return None
# 将所有数据帧合并
merged_df = pd.concat(processed_dfs, ignore_index=True)
# 按条码和单价分组,合并相同商品
# 四舍五入到4位小数避免浮点误差导致相同价格被当作不同价格
merged_df['采购单价'] = merged_df['采购单价'].round(4)
# 对于同一条码和单价的商品,合并数量和赠送量
result = merged_df.groupby(['条码', '采购单价'], as_index=False).agg({
'采购量': 'sum',
'赠送量': 'sum'
})
# 排序,按条码升序
result = result.sort_values('条码').reset_index(drop=True)
# 设置为0的赠送量设为空
result.loc[result['赠送量'] == 0, '赠送量'] = pd.NA
logger.info(f"合并完成,共 {len(result)} 条商品记录")
return result
def create_merged_purchase_order(self, df: pd.DataFrame) -> Optional[str]:
"""
创建合并的采购单文件,完全按照银豹格式要求
Args:
df: 合并后的数据帧
Returns:
输出文件路径如果创建失败则返回None
"""
try:
# 打开模板文件
template_workbook = xlrd.open_workbook(self.template_path, formatting_info=True)
template_sheet = template_workbook.sheet_by_index(0)
# 首先分析模板结构,确定关键列的位置
logger.info(f"分析模板结构")
for i in range(min(5, template_sheet.nrows)):
row_values = [str(cell.value).strip() for cell in template_sheet.row(i)]
logger.debug(f"模板第{i+1}行: {row_values}")
# 银豹模板的标准列位置:
# 条码列(商品条码): B列(索引1)
barcode_col = 1
# 采购量列: C列(索引2)
quantity_col = 2
# 赠送量列: D列(索引3)
gift_col = 3
# 采购单价列: E列(索引4)
price_col = 4
# 找到数据开始行 - 通常是第二行(索引1)
data_start_row = 1
# 创建可写的副本
output_workbook = xlcopy(template_workbook)
output_sheet = output_workbook.get_sheet(0)
# 设置单价的格式样式保留4位小数
price_style = xlwt.XFStyle()
price_style.num_format_str = '0.0000'
# 数量格式
quantity_style = xlwt.XFStyle()
quantity_style.num_format_str = '0'
# 遍历数据并填充到Excel
for i, (_, row) in enumerate(df.iterrows()):
r = data_start_row + i
# 只填充银豹采购单格式要求的4个列条码、采购量、赠送量、采购单价
# 条码(必填)- B列(1)
output_sheet.write(r, barcode_col, row['条码'])
# 采购量(必填)- C列(2)
output_sheet.write(r, quantity_col, float(row['采购量']), quantity_style)
# 赠送量 - D列(3)
if pd.notna(row['赠送量']) and float(row['赠送量']) > 0:
output_sheet.write(r, gift_col, float(row['赠送量']), quantity_style)
# 采购单价(必填)- E列(4)
output_sheet.write(r, price_col, float(row['采购单价']), price_style)
# 生成输出文件名保存到data/result目录
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
result_dir = "data/result"
os.makedirs(result_dir, exist_ok=True)
output_file = os.path.join(result_dir, f"合并采购单_{timestamp}.xls")
# 保存文件
output_workbook.save(output_file)
logger.info(f"合并采购单已保存到: {output_file},共{len(df)}条记录")
return output_file
except Exception as e:
logger.error(f"创建合并采购单时出错: {e}")
return None
def process(self, file_paths: Optional[List[str]] = None, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]:
"""
处理采购单合并
Args:
file_paths: 指定要合并的文件路径列表如果为None则自动获取
Returns:
合并后的文件路径如果合并失败则返回None
"""
# 如果未指定文件路径,则获取所有采购单文件
if file_paths is None:
file_paths = self.get_purchase_orders()
try:
if progress_cb:
progress_cb(97)
except Exception:
pass
# 检查是否有文件需要合并
if not file_paths:
logger.warning("没有找到可合并的采购单文件")
return None
# 合并采购单
merged_df = self.merge_purchase_orders(file_paths)
if merged_df is None:
logger.error("合并采购单失败")
return None
try:
if progress_cb:
progress_cb(98)
except Exception:
pass
# 创建合并的采购单文件
output_file = self.create_merged_purchase_order(merged_df)
if output_file is None:
logger.error("创建合并采购单文件失败")
return None
try:
if progress_cb:
progress_cb(100)
except Exception:
pass
# 记录已合并文件
for file_path in file_paths:
self.merged_files[file_path] = output_file
self._save_merged_files()
return output_file