437 lines
18 KiB
Python
437 lines
18 KiB
Python
"""
|
||
订单合并模块
|
||
----------
|
||
提供采购单合并功能,将多个采购单合并为一个。
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import pandas as pd
|
||
import numpy as np
|
||
import xlrd
|
||
import xlwt
|
||
from xlutils.copy import copy as xlcopy
|
||
from typing import Dict, List, Optional, Tuple, Union, Any
|
||
from datetime import datetime
|
||
|
||
from ...config.settings import ConfigManager
|
||
from ..utils.log_utils import get_logger
|
||
from ..utils.file_utils import (
|
||
ensure_dir,
|
||
get_file_extension,
|
||
get_files_by_extensions,
|
||
load_json,
|
||
save_json
|
||
)
|
||
from ..utils.string_utils import (
|
||
clean_string,
|
||
clean_barcode,
|
||
format_barcode
|
||
)
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
class PurchaseOrderMerger:
|
||
"""
|
||
采购单合并器:将多个采购单Excel文件合并成一个文件
|
||
"""
|
||
|
||
def __init__(self, config: Optional[ConfigManager] = None):
|
||
"""
|
||
初始化采购单合并器
|
||
|
||
Args:
|
||
config: 配置管理器,如果为None则创建新的
|
||
"""
|
||
logger.info("初始化PurchaseOrderMerger")
|
||
self.config = config or ConfigManager()
|
||
|
||
# 获取配置
|
||
self.output_dir = self.config.get_path('Paths', 'output_folder', 'data/output', create=True)
|
||
|
||
# 获取模板文件路径
|
||
template_folder = self.config.get('Paths', 'template_folder', 'templates')
|
||
template_name = self.config.get('Templates', 'purchase_order', '银豹-采购单模板.xls')
|
||
|
||
self.template_path = os.path.join(template_folder, template_name)
|
||
|
||
# 检查模板文件是否存在
|
||
if not os.path.exists(self.template_path):
|
||
logger.error(f"模板文件不存在: {self.template_path}")
|
||
raise FileNotFoundError(f"模板文件不存在: {self.template_path}")
|
||
|
||
# 用于记录已合并的文件
|
||
self.cache_file = os.path.join(self.output_dir, "merged_files.json")
|
||
self.merged_files = self._load_merged_files()
|
||
|
||
logger.info(f"初始化完成,模板文件: {self.template_path}")
|
||
|
||
def _load_merged_files(self) -> Dict[str, str]:
|
||
"""
|
||
加载已合并文件的缓存
|
||
|
||
Returns:
|
||
合并记录字典
|
||
"""
|
||
return load_json(self.cache_file, {})
|
||
|
||
def _save_merged_files(self) -> None:
|
||
"""保存已合并文件的缓存"""
|
||
save_json(self.merged_files, self.cache_file)
|
||
|
||
def get_purchase_orders(self) -> List[str]:
|
||
"""
|
||
获取output目录下的采购单Excel文件
|
||
|
||
Returns:
|
||
采购单文件路径列表
|
||
"""
|
||
logger.info(f"搜索目录 {self.output_dir} 中的采购单Excel文件")
|
||
|
||
# 获取所有Excel文件
|
||
all_files = get_files_by_extensions(self.output_dir, ['.xls', '.xlsx'])
|
||
|
||
# 筛选采购单文件
|
||
purchase_orders = [
|
||
file for file in all_files
|
||
if os.path.basename(file).startswith('采购单_')
|
||
]
|
||
|
||
if not purchase_orders:
|
||
logger.warning(f"未在 {self.output_dir} 目录下找到采购单Excel文件")
|
||
return []
|
||
|
||
# 按修改时间排序,最新的在前
|
||
purchase_orders.sort(key=lambda x: os.path.getmtime(x), reverse=True)
|
||
|
||
logger.info(f"找到 {len(purchase_orders)} 个采购单Excel文件")
|
||
return purchase_orders
|
||
|
||
def read_purchase_order(self, file_path: str) -> Optional[pd.DataFrame]:
|
||
"""
|
||
读取采购单Excel文件
|
||
|
||
Args:
|
||
file_path: 采购单文件路径
|
||
|
||
Returns:
|
||
数据帧,如果读取失败则返回None
|
||
"""
|
||
try:
|
||
# 读取Excel文件
|
||
df = pd.read_excel(file_path)
|
||
logger.info(f"成功读取采购单文件: {file_path}")
|
||
|
||
# 打印列名,用于调试
|
||
logger.debug(f"Excel文件的列名: {df.columns.tolist()}")
|
||
|
||
# 处理特殊情况:检查是否需要读取指定行作为标题行
|
||
for header_row_idx in range(5): # 检查前5行
|
||
if len(df) <= header_row_idx:
|
||
continue
|
||
|
||
potential_header = df.iloc[header_row_idx].astype(str)
|
||
header_keywords = ['条码', '条形码', '商品条码', '商品名称', '规格', '单价', '数量', '金额', '单位', '必填']
|
||
matches = sum(1 for keyword in header_keywords if any(keyword in str(val) for val in potential_header.values))
|
||
|
||
if matches >= 3: # 如果至少匹配3个关键词,认为是表头
|
||
logger.info(f"检测到表头在第 {header_row_idx+1} 行")
|
||
|
||
# 使用此行作为列名,数据从下一行开始
|
||
header_row = potential_header
|
||
data_rows = df.iloc[header_row_idx+1:].reset_index(drop=True)
|
||
|
||
# 为每一列分配名称(避免重复的列名)
|
||
new_columns = []
|
||
for i, col in enumerate(header_row):
|
||
col_str = str(col)
|
||
if col_str == 'nan' or col_str == 'None' or pd.isna(col):
|
||
new_columns.append(f"Col_{i}")
|
||
else:
|
||
new_columns.append(col_str)
|
||
|
||
# 使用新列名创建新的DataFrame
|
||
data_rows.columns = new_columns
|
||
df = data_rows
|
||
logger.debug(f"重新构建的数据帧列名: {df.columns.tolist()}")
|
||
break
|
||
|
||
# 定义可能的列名映射
|
||
column_mapping = {
|
||
'条码': ['条码', '条形码', '商品条码', 'barcode', '商品条形码', '条形码', '商品条码', '商品编码', '商品编号', '条形码', '条码(必填)'],
|
||
'采购量': ['数量', '采购数量', '购买数量', '采购数量', '订单数量', '采购数量', '采购量(必填)', '采购量', '数量(必填)'],
|
||
'采购单价': ['单价', '价格', '采购单价', '销售价', '采购单价(必填)', '单价(必填)', '价格(必填)'],
|
||
'赠送量': ['赠送量', '赠品数量', '赠送数量', '赠品']
|
||
}
|
||
|
||
# 显示所有列名,用于调试
|
||
all_columns = df.columns.tolist()
|
||
logger.info(f"列名: {all_columns}")
|
||
|
||
# 映射实际的列名
|
||
mapped_columns = {}
|
||
for target_col, possible_names in column_mapping.items():
|
||
for col in all_columns:
|
||
# 清理列名以进行匹配
|
||
col_str = str(col).strip()
|
||
|
||
# 直接匹配整个列名
|
||
if col_str in possible_names:
|
||
mapped_columns[target_col] = col
|
||
logger.info(f"直接匹配列名: {col_str} -> {target_col}")
|
||
break
|
||
|
||
# 移除列名中的空白字符进行比较
|
||
clean_col = re.sub(r'\s+', '', col_str)
|
||
for name in possible_names:
|
||
clean_name = re.sub(r'\s+', '', name)
|
||
# 完全匹配
|
||
if clean_col == clean_name:
|
||
mapped_columns[target_col] = col
|
||
logger.info(f"清理后匹配列名: {col_str} -> {target_col}")
|
||
break
|
||
# 部分匹配(列名包含关键词)
|
||
elif clean_name in clean_col:
|
||
mapped_columns[target_col] = col
|
||
logger.info(f"部分匹配列名: {col_str} -> {target_col}")
|
||
break
|
||
|
||
if target_col in mapped_columns:
|
||
break
|
||
|
||
# 如果没有找到匹配,尝试模糊匹配
|
||
if target_col not in mapped_columns:
|
||
for col in all_columns:
|
||
col_str = str(col).strip().lower()
|
||
for name in possible_names:
|
||
name_lower = name.lower()
|
||
if name_lower in col_str:
|
||
mapped_columns[target_col] = col
|
||
logger.info(f"模糊匹配列名: {col} -> {target_col}")
|
||
break
|
||
if target_col in mapped_columns:
|
||
break
|
||
|
||
# 如果找到了必要的列,重命名列
|
||
if mapped_columns:
|
||
rename_dict = {mapped_columns[key]: key for key in mapped_columns}
|
||
logger.info(f"列名重命名映射: {rename_dict}")
|
||
df = df.rename(columns=rename_dict)
|
||
logger.info(f"重命名后的列名: {df.columns.tolist()}")
|
||
else:
|
||
logger.warning(f"未找到可映射的列名: {file_path}")
|
||
|
||
return df
|
||
|
||
except Exception as e:
|
||
logger.error(f"读取采购单文件失败: {file_path}, 错误: {str(e)}")
|
||
return None
|
||
|
||
def merge_purchase_orders(self, file_paths: List[str]) -> Optional[pd.DataFrame]:
|
||
"""
|
||
合并多个采购单文件
|
||
|
||
Args:
|
||
file_paths: 采购单文件路径列表
|
||
|
||
Returns:
|
||
合并后的数据帧,如果合并失败则返回None
|
||
"""
|
||
if not file_paths:
|
||
logger.warning("没有需要合并的采购单文件")
|
||
return None
|
||
|
||
# 读取所有采购单文件
|
||
dfs = []
|
||
for file_path in file_paths:
|
||
df = self.read_purchase_order(file_path)
|
||
if df is not None:
|
||
dfs.append(df)
|
||
|
||
if not dfs:
|
||
logger.warning("没有成功读取的采购单文件")
|
||
return None
|
||
|
||
# 合并数据
|
||
logger.info(f"开始合并 {len(dfs)} 个采购单文件")
|
||
|
||
# 首先,整理每个数据帧以确保它们有相同的结构
|
||
processed_dfs = []
|
||
for i, df in enumerate(dfs):
|
||
# 确保必要的列存在
|
||
required_columns = ['条码', '采购量', '采购单价']
|
||
missing_columns = [col for col in required_columns if col not in df.columns]
|
||
|
||
if missing_columns:
|
||
logger.warning(f"数据帧 {i} 缺少必要的列: {missing_columns}")
|
||
continue
|
||
|
||
# 处理赠送量列不存在的情况
|
||
if '赠送量' not in df.columns:
|
||
df['赠送量'] = 0
|
||
|
||
# 选择并清理需要的列
|
||
cleaned_df = pd.DataFrame()
|
||
|
||
# 清理条码 - 确保是字符串且无小数点
|
||
cleaned_df['条码'] = df['条码'].apply(lambda x: format_barcode(x) if pd.notna(x) else '')
|
||
|
||
# 清理采购量 - 确保是数字
|
||
cleaned_df['采购量'] = pd.to_numeric(df['采购量'], errors='coerce').fillna(0)
|
||
|
||
# 清理单价 - 确保是数字并保留4位小数
|
||
cleaned_df['采购单价'] = pd.to_numeric(df['采购单价'], errors='coerce').fillna(0).round(4)
|
||
|
||
# 清理赠送量 - 确保是数字
|
||
cleaned_df['赠送量'] = pd.to_numeric(df['赠送量'], errors='coerce').fillna(0)
|
||
|
||
# 过滤无效行 - 条码为空或采购量为0的行跳过
|
||
valid_df = cleaned_df[(cleaned_df['条码'] != '') & (cleaned_df['采购量'] > 0)]
|
||
|
||
if len(valid_df) > 0:
|
||
processed_dfs.append(valid_df)
|
||
logger.info(f"处理文件 {i+1}: 有效记录 {len(valid_df)} 行")
|
||
else:
|
||
logger.warning(f"处理文件 {i+1}: 没有有效记录")
|
||
|
||
if not processed_dfs:
|
||
logger.warning("没有有效的数据帧用于合并")
|
||
return None
|
||
|
||
# 将所有数据帧合并
|
||
merged_df = pd.concat(processed_dfs, ignore_index=True)
|
||
|
||
# 按条码和单价分组,合并相同商品
|
||
# 四舍五入到4位小数,避免浮点误差导致相同价格被当作不同价格
|
||
merged_df['采购单价'] = merged_df['采购单价'].round(4)
|
||
|
||
# 对于同一条码和单价的商品,合并数量和赠送量
|
||
result = merged_df.groupby(['条码', '采购单价'], as_index=False).agg({
|
||
'采购量': 'sum',
|
||
'赠送量': 'sum'
|
||
})
|
||
|
||
# 排序,按条码升序
|
||
result = result.sort_values('条码').reset_index(drop=True)
|
||
|
||
# 设置为0的赠送量设为空
|
||
result.loc[result['赠送量'] == 0, '赠送量'] = pd.NA
|
||
|
||
logger.info(f"合并完成,共 {len(result)} 条商品记录")
|
||
return result
|
||
|
||
def create_merged_purchase_order(self, df: pd.DataFrame) -> Optional[str]:
|
||
"""
|
||
创建合并的采购单文件,完全按照银豹格式要求
|
||
|
||
Args:
|
||
df: 合并后的数据帧
|
||
|
||
Returns:
|
||
输出文件路径,如果创建失败则返回None
|
||
"""
|
||
try:
|
||
# 打开模板文件
|
||
template_workbook = xlrd.open_workbook(self.template_path, formatting_info=True)
|
||
template_sheet = template_workbook.sheet_by_index(0)
|
||
|
||
# 首先分析模板结构,确定关键列的位置
|
||
logger.info(f"分析模板结构")
|
||
for i in range(min(5, template_sheet.nrows)):
|
||
row_values = [str(cell.value).strip() for cell in template_sheet.row(i)]
|
||
logger.debug(f"模板第{i+1}行: {row_values}")
|
||
|
||
# 银豹模板的标准列位置:
|
||
# 条码列(商品条码): B列(索引1)
|
||
barcode_col = 1
|
||
# 采购量列: C列(索引2)
|
||
quantity_col = 2
|
||
# 赠送量列: D列(索引3)
|
||
gift_col = 3
|
||
# 采购单价列: E列(索引4)
|
||
price_col = 4
|
||
|
||
# 找到数据开始行 - 通常是第二行(索引1)
|
||
data_start_row = 1
|
||
|
||
# 创建可写的副本
|
||
output_workbook = xlcopy(template_workbook)
|
||
output_sheet = output_workbook.get_sheet(0)
|
||
|
||
# 设置单价的格式样式(保留4位小数)
|
||
price_style = xlwt.XFStyle()
|
||
price_style.num_format_str = '0.0000'
|
||
|
||
# 数量格式
|
||
quantity_style = xlwt.XFStyle()
|
||
quantity_style.num_format_str = '0'
|
||
|
||
# 遍历数据并填充到Excel
|
||
for i, (_, row) in enumerate(df.iterrows()):
|
||
r = data_start_row + i
|
||
|
||
# 只填充银豹采购单格式要求的4个列:条码、采购量、赠送量、采购单价
|
||
|
||
# 条码(必填)- B列(1)
|
||
output_sheet.write(r, barcode_col, row['条码'])
|
||
|
||
# 采购量(必填)- C列(2)
|
||
output_sheet.write(r, quantity_col, float(row['采购量']), quantity_style)
|
||
|
||
# 赠送量 - D列(3)
|
||
if pd.notna(row['赠送量']) and float(row['赠送量']) > 0:
|
||
output_sheet.write(r, gift_col, float(row['赠送量']), quantity_style)
|
||
|
||
# 采购单价(必填)- E列(4)
|
||
output_sheet.write(r, price_col, float(row['采购单价']), price_style)
|
||
|
||
# 生成输出文件名
|
||
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
|
||
output_file = os.path.join(self.output_dir, f"合并采购单_{timestamp}.xls")
|
||
|
||
# 保存文件
|
||
output_workbook.save(output_file)
|
||
logger.info(f"合并采购单已保存到: {output_file},共{len(df)}条记录")
|
||
return output_file
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建合并采购单时出错: {e}")
|
||
return None
|
||
|
||
def process(self, file_paths: Optional[List[str]] = None) -> Optional[str]:
|
||
"""
|
||
处理采购单合并
|
||
|
||
Args:
|
||
file_paths: 指定要合并的文件路径列表,如果为None则自动获取
|
||
|
||
Returns:
|
||
合并后的文件路径,如果合并失败则返回None
|
||
"""
|
||
# 如果未指定文件路径,则获取所有采购单文件
|
||
if file_paths is None:
|
||
file_paths = self.get_purchase_orders()
|
||
|
||
# 检查是否有文件需要合并
|
||
if not file_paths:
|
||
logger.warning("没有找到可合并的采购单文件")
|
||
return None
|
||
|
||
# 合并采购单
|
||
merged_df = self.merge_purchase_orders(file_paths)
|
||
if merged_df is None:
|
||
logger.error("合并采购单失败")
|
||
return None
|
||
|
||
# 创建合并的采购单文件
|
||
output_file = self.create_merged_purchase_order(merged_df)
|
||
if output_file is None:
|
||
logger.error("创建合并采购单文件失败")
|
||
return None
|
||
|
||
# 记录已合并文件
|
||
for file_path in file_paths:
|
||
self.merged_files[file_path] = output_file
|
||
self._save_merged_files()
|
||
|
||
return output_file |