新版本

This commit is contained in:
2025-11-15 18:46:03 +08:00
parent 9f97ac3f21
commit 73d17836d7
68 changed files with 49834 additions and 69055 deletions
+11 -1
View File
@@ -285,6 +285,16 @@ class UnitConverter:
logger.debug(f"解析规格: {spec}")
# 新增:处理“1件=12桶/袋/盒...”等等式规格,统一为1*12
eq_match = re.match(r'(\d+(?:\.\d+)?)\s*(?:件|箱|提|盒)\s*[=]\s*(\d+)\s*(?:瓶|桶|盒|支|个|袋|罐|包|卷)', spec)
if eq_match:
try:
level2 = int(eq_match.group(2))
logger.info(f"解析等式规格: {spec} -> 1*{level2}")
return 1, level2, None
except ValueError:
pass
# 处理三级包装,如1*5*12
three_level_match = re.match(r'(\d+)[*](\d+)[*](\d+)', spec)
if three_level_match:
@@ -522,4 +532,4 @@ class UnitConverter:
更新是否成功
"""
self.special_barcodes = new_mappings
return self.save_barcode_mappings(new_mappings)
return self.save_barcode_mappings(new_mappings)
+18 -3
View File
@@ -11,7 +11,7 @@ import numpy as np
import xlrd
import xlwt
from xlutils.copy import copy as xlcopy
from typing import Dict, List, Optional, Tuple, Union, Any
from typing import Dict, List, Optional, Tuple, Union, Any, Callable
from datetime import datetime
from ...config.settings import ConfigManager
@@ -414,7 +414,7 @@ class PurchaseOrderMerger:
logger.error(f"创建合并采购单时出错: {e}")
return None
def process(self, file_paths: Optional[List[str]] = None) -> Optional[str]:
def process(self, file_paths: Optional[List[str]] = None, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]:
"""
处理采购单合并
@@ -427,6 +427,11 @@ class PurchaseOrderMerger:
# 如果未指定文件路径,则获取所有采购单文件
if file_paths is None:
file_paths = self.get_purchase_orders()
try:
if progress_cb:
progress_cb(97)
except Exception:
pass
# 检查是否有文件需要合并
if not file_paths:
@@ -438,16 +443,26 @@ class PurchaseOrderMerger:
if merged_df is None:
logger.error("合并采购单失败")
return None
try:
if progress_cb:
progress_cb(98)
except Exception:
pass
# 创建合并的采购单文件
output_file = self.create_merged_purchase_order(merged_df)
if output_file is None:
logger.error("创建合并采购单文件失败")
return None
try:
if progress_cb:
progress_cb(100)
except Exception:
pass
# 记录已合并文件
for file_path in file_paths:
self.merged_files[file_path] = output_file
self._save_merged_files()
return output_file
return output_file
+61 -6
View File
@@ -11,7 +11,7 @@ import numpy as np
import xlrd
import xlwt
from xlutils.copy import copy as xlcopy
from typing import Dict, List, Optional, Tuple, Union, Any
from typing import Dict, List, Optional, Tuple, Union, Any, Callable
from datetime import datetime
from ...config.settings import ConfigManager
@@ -281,6 +281,36 @@ class ExcelProcessor:
product['amount'] = row['小计']
elif column_mapping.get('amount') and not pd.isna(row[column_mapping['amount']]):
product['amount'] = row[column_mapping['amount']]
# 根据金额判断赠品:金额为0、为空、或为o/O
amt = product.get('amount', None)
try:
is_amt_gift = False
if amt is None:
is_amt_gift = True
elif isinstance(amt, str):
s = amt.strip()
if s == '' or s.lower() == 'o' or s == '0' or s == '':
is_amt_gift = True
else:
amt_clean = re.sub(r'[^\d\.,]', '', s)
if ',' in amt_clean and '.' not in amt_clean:
amt_clean = amt_clean.replace(',', '.')
elif ',' in amt_clean and '.' in amt_clean:
amt_clean = amt_clean.replace(',', '')
if amt_clean:
try:
is_amt_gift = float(amt_clean) == 0.0
except ValueError:
pass
else:
try:
is_amt_gift = float(amt) == 0.0
except (ValueError, TypeError):
pass
if is_amt_gift:
product['is_gift'] = True
except Exception:
pass
# 提取数量
if '数量' in df.columns and not pd.isna(row['数量']):
@@ -472,7 +502,7 @@ class ExcelProcessor:
logger.warning(f"通过金额和单价计算数量失败: {e}")
# 判断是否为赠品(价格为0
is_gift = price == 0
is_gift = bool(product.get('is_gift', False)) or (price == 0)
logger.info(f"处理商品: 条码={barcode}, 数量={quantity}, 单价={price}, 是否赠品={is_gift}")
@@ -631,7 +661,7 @@ class ExcelProcessor:
logger.warning("无法识别表头行")
return None
def process_specific_file(self, file_path: str) -> Optional[str]:
def process_specific_file(self, file_path: str, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]:
"""
处理指定的Excel文件
@@ -649,6 +679,11 @@ class ExcelProcessor:
try:
# 读取Excel文件时不立即指定表头
if progress_cb:
try:
progress_cb(92)
except Exception:
pass
df = pd.read_excel(file_path, header=None)
logger.info(f"成功读取Excel文件: {file_path}, 共 {len(df)}")
@@ -661,10 +696,20 @@ class ExcelProcessor:
logger.info(f"识别到表头在第 {header_row+1}")
# 重新读取Excel,正确指定表头行
if progress_cb:
try:
progress_cb(94)
except Exception:
pass
df = pd.read_excel(file_path, header=header_row)
logger.info(f"使用表头行重新读取数据,共 {len(df)} 行有效数据")
# 提取商品信息
if progress_cb:
try:
progress_cb(96)
except Exception:
pass
products = self.extract_product_info(df)
if not products:
@@ -685,6 +730,11 @@ class ExcelProcessor:
# 不再自动打开输出目录
logger.info(f"采购单已保存到: {output_file}")
if progress_cb:
try:
progress_cb(100)
except Exception:
pass
return output_file
@@ -694,7 +744,7 @@ class ExcelProcessor:
logger.error(f"处理Excel文件时出错: {file_path}, 错误: {e}")
return None
def process_latest_file(self) -> Optional[str]:
def process_latest_file(self, progress_cb: Optional[Callable[[int], None]] = None) -> Optional[str]:
"""
处理最新的Excel文件
@@ -708,7 +758,7 @@ class ExcelProcessor:
return None
# 处理文件
return self.process_specific_file(latest_file)
return self.process_specific_file(latest_file, progress_cb=progress_cb)
def _detect_column_mapping(self, df: pd.DataFrame) -> Dict[str, str]:
"""
@@ -889,6 +939,11 @@ class ExcelProcessor:
logger.debug(f"清理后的规格字符串: {spec_str}")
# 新增:匹配“1件=12桶/袋/盒…”等等式规格,取右侧数量作为包装数量
eq_match = re.search(r'(\d+(?:\.\d+)?)\s*(?:件|箱|提|盒)\s*[=]\s*(\d+)\s*(?:瓶|桶|盒|支|个|袋|罐|包|卷)', spec_str)
if eq_match:
return int(eq_match.group(2))
# 匹配带单位的格式,如"5kg*6"、"450g*15"、"450ml*15"
weight_pattern = r'(\d+(?:\.\d+)?)\s*(?:kg|KG|千克|公斤)[*×](\d+)'
match = re.search(weight_pattern, spec_str)
@@ -946,4 +1001,4 @@ class ExcelProcessor:
except Exception as e:
logger.warning(f"解析规格'{spec_str}'时出错: {e}")
return None
return None
-355
View File
@@ -1,355 +0,0 @@
"""
单位转换器测试模块
---------------
测试单位转换和条码映射逻辑
"""
import os
import sys
import unittest
from typing import Dict, Any
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../..')))
from app.core.excel.converter import UnitConverter
from app.core.excel.validators import ProductValidator
class TestUnitConverter(unittest.TestCase):
"""
测试单位转换器功能
"""
def setUp(self):
"""
测试前的准备工作
"""
self.converter = UnitConverter()
def test_jian_unit_conversion(self):
"""
测试""单位的转换
"""
# 准备测试数据
product = {
'barcode': '6954767400129',
'name': '美汁源果粒橙1.8L*8瓶',
'specification': '1.8L*8',
'quantity': 1.0,
'unit': '',
'price': 65.0
}
# 执行转换
result = self.converter.process_unit_conversion(product)
# 验证结果
self.assertEqual(result['quantity'], 8.0)
self.assertEqual(result['price'], 8.125)
self.assertEqual(result['unit'], '')
def test_box_unit_conversion(self):
"""
测试""单位的转换
"""
# 准备测试数据
product = {
'barcode': '6925303721244',
'name': '统一鲜橙多2L*6瓶',
'specification': '2L*6',
'quantity': 1.0,
'unit': '',
'price': 43.0
}
# 执行转换
result = self.converter.process_unit_conversion(product)
# 验证结果
self.assertEqual(result['quantity'], 6.0)
self.assertEqual(result['price'], 7.1666666666666667)
self.assertEqual(result['unit'], '')
def test_tihe_unit_conversion_level3(self):
"""
测试""单位的转换(三级规格)
"""
# 准备测试数据(三级规格:1*6*4,表示1排6提,每提4瓶)
product = {
'barcode': '6921168509347',
'name': '农夫山泉550ml*24瓶',
'specification': '1*6*4',
'quantity': 2.0,
'unit': '',
'price': 16.0
}
# 执行转换
result = self.converter.process_unit_conversion(product)
# 验证结果:三级规格,提单位特殊处理,数量*最后一级
self.assertEqual(result['quantity'], 8.0) # 2提 * 4瓶/提
self.assertEqual(result['price'], 4.0) # 16元/提 ÷ 4瓶/提
self.assertEqual(result['unit'], '')
def test_tihe_unit_conversion_level2(self):
"""
测试""单位的转换(二级规格)
"""
# 准备测试数据(二级规格:1*4,表示每件4提)
product = {
'barcode': '6921168509347',
'name': '农夫山泉550ml*4瓶',
'specification': '1*4',
'quantity': 5.0,
'unit': '',
'price': 10.0
}
# 执行转换
result = self.converter.process_unit_conversion(product)
# 验证结果:二级规格,提单位保持不变
self.assertEqual(result['quantity'], 5.0)
self.assertEqual(result['price'], 10.0)
self.assertEqual(result['unit'], '')
def test_barcode_mapping(self):
"""
测试条码映射
"""
# 准备测试数据(使用需要被映射的条码)
product = {
'barcode': '6920584471055', # 这个条码应映射到6920584471017
'name': '测试映射条码商品',
'specification': '1*12',
'quantity': 1.0,
'unit': '',
'price': 60.0
}
# 执行转换
result = self.converter.process_unit_conversion(product)
# 验证结果:条码应该被映射
self.assertEqual(result['barcode'], '6920584471017')
self.assertEqual(result['quantity'], 12.0) # 同时处理件单位转换
self.assertEqual(result['price'], 5.0) # 60元/件 ÷ 12瓶/件
self.assertEqual(result['unit'], '')
def test_special_barcode_multiplier(self):
"""
测试特殊条码的倍数处理
"""
# 准备测试数据(使用特殊条码)
product = {
'barcode': '6925019900087', # 特殊条码:数量*10,单位转瓶
'name': '特殊条码商品',
'specification': '1*10',
'quantity': 2.0,
'unit': '',
'price': 100.0
}
# 执行转换
result = self.converter.process_unit_conversion(product)
# 验证结果:特殊条码乘数应该生效
self.assertEqual(result['quantity'], 20.0) # 2箱 * 10倍数
self.assertEqual(result['price'], 5.0) # 100元/箱 ÷ 10倍数/箱
self.assertEqual(result['unit'], '')
class TestProductValidator(unittest.TestCase):
"""
测试商品数据验证器功能
"""
def setUp(self):
"""
测试前的准备工作
"""
self.validator = ProductValidator()
def test_validate_barcode(self):
"""
测试条码验证
"""
# 测试有效条码
is_valid, barcode, error = self.validator.validate_barcode('6925303721244')
self.assertTrue(is_valid)
self.assertEqual(barcode, '6925303721244')
self.assertIsNone(error)
# 测试包含非数字字符的条码
is_valid, barcode, error = self.validator.validate_barcode('6925303-721244')
self.assertTrue(is_valid)
self.assertEqual(barcode, '6925303721244')
self.assertIsNone(error)
# 测试5开头的条码修正
is_valid, barcode, error = self.validator.validate_barcode('5925303721244')
self.assertTrue(is_valid)
self.assertEqual(barcode, '6925303721244')
self.assertIsNone(error)
# 测试过短的条码
is_valid, barcode, error = self.validator.validate_barcode('12345')
self.assertFalse(is_valid)
self.assertEqual(barcode, '12345')
self.assertIn("条码长度异常", error)
# 测试仓库标识
is_valid, barcode, error = self.validator.validate_barcode('仓库')
self.assertFalse(is_valid)
self.assertEqual(barcode, '仓库')
self.assertEqual(error, "条码为仓库标识")
# 测试空值
is_valid, barcode, error = self.validator.validate_barcode(None)
self.assertFalse(is_valid)
self.assertEqual(barcode, "")
self.assertEqual(error, "条码为空")
def test_validate_quantity(self):
"""
测试数量验证
"""
# 测试有效数量
is_valid, quantity, error = self.validator.validate_quantity(10)
self.assertTrue(is_valid)
self.assertEqual(quantity, 10.0)
self.assertIsNone(error)
# 测试字符串数量
is_valid, quantity, error = self.validator.validate_quantity("25.5")
self.assertTrue(is_valid)
self.assertEqual(quantity, 25.5)
self.assertIsNone(error)
# 测试带单位的数量
is_valid, quantity, error = self.validator.validate_quantity("30瓶")
self.assertTrue(is_valid)
self.assertEqual(quantity, 30.0)
self.assertIsNone(error)
# 测试零数量
is_valid, quantity, error = self.validator.validate_quantity(0)
self.assertFalse(is_valid)
self.assertEqual(quantity, 0.0)
self.assertIn("数量必须大于0", error)
# 测试负数量
is_valid, quantity, error = self.validator.validate_quantity(-5)
self.assertFalse(is_valid)
self.assertEqual(quantity, 0.0)
self.assertIn("数量必须大于0", error)
# 测试非数字
is_valid, quantity, error = self.validator.validate_quantity("abc")
self.assertFalse(is_valid)
self.assertEqual(quantity, 0.0)
self.assertIn("数量不包含数字", error)
# 测试空值
is_valid, quantity, error = self.validator.validate_quantity(None)
self.assertFalse(is_valid)
self.assertEqual(quantity, 0.0)
self.assertEqual(error, "数量为空")
def test_validate_price(self):
"""
测试单价验证
"""
# 测试有效单价
is_valid, price, is_gift, error = self.validator.validate_price(12.5)
self.assertTrue(is_valid)
self.assertEqual(price, 12.5)
self.assertFalse(is_gift)
self.assertIsNone(error)
# 测试字符串单价
is_valid, price, is_gift, error = self.validator.validate_price("8.0")
self.assertTrue(is_valid)
self.assertEqual(price, 8.0)
self.assertFalse(is_gift)
self.assertIsNone(error)
# 测试零单价(赠品)
is_valid, price, is_gift, error = self.validator.validate_price(0)
self.assertTrue(is_valid)
self.assertEqual(price, 0.0)
self.assertTrue(is_gift)
self.assertIsNone(error)
# 测试"赠品"标记
is_valid, price, is_gift, error = self.validator.validate_price("赠品")
self.assertTrue(is_valid)
self.assertEqual(price, 0.0)
self.assertTrue(is_gift)
self.assertIsNone(error)
# 测试负单价
is_valid, price, is_gift, error = self.validator.validate_price(-5)
self.assertFalse(is_valid)
self.assertEqual(price, 0.0)
self.assertTrue(is_gift)
self.assertIn("单价不能为负数", error)
# 测试空值
is_valid, price, is_gift, error = self.validator.validate_price(None)
self.assertFalse(is_valid)
self.assertEqual(price, 0.0)
self.assertTrue(is_gift)
self.assertEqual(error, "单价为空,视为赠品")
def test_validate_product(self):
"""
测试商品数据验证
"""
# 准备测试数据(有效商品)
product = {
'barcode': '6954767400129',
'name': '测试商品',
'specification': '1*12',
'quantity': 3.0,
'price': 36.0,
'unit': '',
'is_gift': False
}
# 验证有效商品
result = self.validator.validate_product(product)
self.assertEqual(result['barcode'], '6954767400129')
self.assertEqual(result['quantity'], 3.0)
self.assertEqual(result['price'], 36.0)
self.assertFalse(result['is_gift'])
# 验证赠品商品
gift_product = product.copy()
gift_product['price'] = 0
result = self.validator.validate_product(gift_product)
self.assertEqual(result['price'], 0.0)
self.assertTrue(result['is_gift'])
# 验证需要修复的商品
invalid_product = {
'barcode': '5954767-400129', # 需要修复前缀和移除非数字
'name': '测试商品',
'specification': '1*12',
'quantity': '2件', # 需要提取数字
'price': '赠品', # 赠品标记
'unit': '',
'is_gift': False
}
result = self.validator.validate_product(invalid_product)
self.assertEqual(result['barcode'], '6954767400129') # 5->6,移除 '-'
self.assertEqual(result['quantity'], 2.0) # 提取数字
self.assertEqual(result['price'], 0.0) # 赠品价格为0
self.assertTrue(result['is_gift']) # 标记为赠品
if __name__ == '__main__':
unittest.main()
+31 -1
View File
@@ -225,6 +225,36 @@ class ProductValidator:
validated_product['is_gift'] = True
if error_msg:
logger.info(error_msg)
amount = product.get('amount', None)
try:
is_amount_gift = False
if amount is None:
is_amount_gift = True
elif isinstance(amount, str):
s = amount.strip()
if s == '' or s.lower() == 'o' or s == '0':
is_amount_gift = True
else:
amt_clean = re.sub(r'[^\d\.,]', '', s)
if ',' in amt_clean and '.' not in amt_clean:
amt_clean = amt_clean.replace(',', '.')
elif ',' in amt_clean and '.' in amt_clean:
amt_clean = amt_clean.replace(',', '')
if amt_clean:
try:
is_amount_gift = float(amt_clean) == 0.0
except ValueError:
pass
else:
try:
is_amount_gift = float(amount) == 0.0
except (ValueError, TypeError):
pass
if is_amount_gift:
validated_product['is_gift'] = True
except Exception:
pass
# 验证数量
quantity = product.get('quantity', None)
@@ -268,4 +298,4 @@ class ProductValidator:
logger.warning(f"数量验证失败: {error_msg}")
validated_product['quantity'] = 0.0
return validated_product
return validated_product
+9
View File
@@ -0,0 +1,9 @@
"""
数据处理handlers模块初始化文件
"""
from .data_cleaner import DataCleaner
from .column_mapper import ColumnMapper
from .calculator import DataCalculator
__all__ = ['DataCleaner', 'ColumnMapper', 'DataCalculator']
+378
View File
@@ -0,0 +1,378 @@
"""
数据计算处理器
提供各种数据计算功能,如数量计算、价格计算、汇总统计等
"""
import pandas as pd
import numpy as np
from typing import Dict, Any, Optional, List, Union
from ...core.utils.log_utils import get_logger
logger = get_logger(__name__)
class DataCalculator:
"""数据计算处理器
提供标准化的数据计算功能,支持各种业务计算规则
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""初始化数据计算器
Args:
config: 计算配置
"""
self.config = config or {}
self.calculation_rules = []
def add_rule(self, rule_type: str, **kwargs):
"""添加计算规则
Args:
rule_type: 规则类型
**kwargs: 规则参数
"""
rule = {'type': rule_type, **kwargs}
self.calculation_rules.append(rule)
logger.debug(f"添加计算规则: {rule_type}")
def calculate(self, df: pd.DataFrame) -> pd.DataFrame:
"""执行数据计算
Args:
df: 输入数据
Returns:
计算后的数据
"""
logger.info(f"开始数据计算,原始数据形状: {df.shape}")
result_df = df.copy()
for i, rule in enumerate(self.calculation_rules):
try:
logger.debug(f"执行计算规则 {i+1}/{len(self.calculation_rules)}: {rule['type']}")
result_df = self._apply_rule(result_df, rule)
logger.debug(f"规则执行完成,数据形状: {result_df.shape}")
except Exception as e:
logger.error(f"计算规则执行失败: {rule}, 错误: {e}")
# 继续执行下一个规则,而不是中断整个流程
continue
logger.info(f"数据计算完成,最终数据形状: {result_df.shape}")
return result_df
def _apply_rule(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""应用单个计算规则
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
rule_type = rule.get('type')
if rule_type == 'multiply':
return self._multiply(df, rule)
elif rule_type == 'divide':
return self._divide(df, rule)
elif rule_type == 'add':
return self._add(df, rule)
elif rule_type == 'subtract':
return self._subtract(df, rule)
elif rule_type == 'formula':
return self._formula(df, rule)
elif rule_type == 'round':
return self._round(df, rule)
elif rule_type == 'sum':
return self._sum(df, rule)
elif rule_type == 'aggregate':
return self._aggregate(df, rule)
else:
logger.warning(f"未知的计算规则类型: {rule_type}")
return df
def _multiply(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""乘法计算
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
source_column = rule.get('source_column')
target_column = rule.get('target_column')
factor = rule.get('factor', 1)
if source_column and target_column:
if source_column in df.columns:
df[target_column] = df[source_column] * factor
logger.debug(f"乘法计算: {source_column} * {factor} -> {target_column}")
else:
logger.warning(f"源列不存在: {source_column}")
return df
def _divide(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""除法计算
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
source_column = rule.get('source_column')
target_column = rule.get('target_column')
divisor = rule.get('divisor', 1)
if source_column and target_column and divisor != 0:
if source_column in df.columns:
df[target_column] = df[source_column] / divisor
logger.debug(f"除法计算: {source_column} / {divisor} -> {target_column}")
else:
logger.warning(f"源列不存在: {source_column}")
elif divisor == 0:
logger.error("除数不能为0")
return df
def _add(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""加法计算
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
columns = rule.get('columns', [])
target_column = rule.get('target_column')
constant = rule.get('constant', 0)
if target_column:
if isinstance(columns, str):
columns = [columns]
if columns:
# 列相加
valid_columns = [col for col in columns if col in df.columns]
if valid_columns:
df[target_column] = df[valid_columns].sum(axis=1) + constant
logger.debug(f"加法计算: {valid_columns} + {constant} -> {target_column}")
else:
logger.warning(f"没有有效的列用于加法计算: {columns}")
else:
# 只加常数
if target_column in df.columns:
df[target_column] = df[target_column] + constant
logger.debug(f"加法计算: {target_column} + {constant}")
else:
logger.warning(f"目标列不存在: {target_column}")
return df
def _subtract(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""减法计算
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
minuend = rule.get('minuend') # 被减数列
subtrahend = rule.get('subtrahend') # 减数列
target_column = rule.get('target_column')
constant = rule.get('constant', 0)
if target_column and minuend and minuend in df.columns:
if subtrahend and subtrahend in df.columns:
df[target_column] = df[minuend] - df[subtrahend] - constant
logger.debug(f"减法计算: {minuend} - {subtrahend} - {constant} -> {target_column}")
else:
df[target_column] = df[minuend] - constant
logger.debug(f"减法计算: {minuend} - {constant} -> {target_column}")
else:
logger.warning(f"减法计算参数不完整或列不存在")
return df
def _formula(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""公式计算
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
formula = rule.get('formula')
target_column = rule.get('target_column')
if formula and target_column:
try:
df[target_column] = df.eval(formula)
logger.debug(f"公式计算: {formula} -> {target_column}")
except Exception as e:
logger.error(f"公式计算失败: {formula}, 错误: {e}")
else:
logger.warning("公式计算缺少公式或目标列")
return df
def _round(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""四舍五入
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
columns = rule.get('columns', [])
decimals = rule.get('decimals', 0)
if isinstance(columns, str):
columns = [columns]
target_columns = columns or df.select_dtypes(include=[np.number]).columns
for col in target_columns:
if col in df.columns and pd.api.types.is_numeric_dtype(df[col]):
df[col] = df[col].round(decimals)
logger.debug(f"四舍五入: {col} 保留 {decimals} 位小数")
return df
def _sum(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""求和计算
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
columns = rule.get('columns', [])
target_column = rule.get('target_column')
group_by = rule.get('group_by')
if isinstance(columns, str):
columns = [columns]
if group_by and group_by in df.columns:
# 分组求和
if columns:
for col in columns:
if col in df.columns:
sum_result = df.groupby(group_by)[col].sum()
logger.debug(f"分组求和: {col}{group_by} 分组")
else:
# 所有数值列分组求和
numeric_columns = df.select_dtypes(include=[np.number]).columns
sum_result = df.groupby(group_by)[numeric_columns].sum()
logger.debug(f"分组求和: 所有数值列 按 {group_by} 分组")
else:
# 总体求和
if columns:
valid_columns = [col for col in columns if col in df.columns]
if valid_columns and target_column:
df[target_column] = df[valid_columns].sum(axis=1)
logger.debug(f"求和计算: {valid_columns} -> {target_column}")
else:
# 所有数值列求和
numeric_columns = df.select_dtypes(include=[np.number]).columns
if target_column and len(numeric_columns) > 0:
df[target_column] = df[numeric_columns].sum(axis=1)
logger.debug(f"求和计算: {list(numeric_columns)} -> {target_column}")
return df
def _aggregate(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""聚合计算
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
group_by = rule.get('group_by')
aggregations = rule.get('aggregations', {})
if group_by and group_by in df.columns:
# 构建聚合函数字典
agg_dict = {}
for column, func in aggregations.items():
if column in df.columns:
if isinstance(func, str):
agg_dict[column] = func
elif isinstance(func, list):
agg_dict[column] = func
if agg_dict:
result = df.groupby(group_by).agg(agg_dict)
logger.debug(f"聚合计算: 按 {group_by} 分组, 聚合: {agg_dict}")
return result.reset_index()
return df
# 便捷方法
def multiply(self, source_column: str, target_column: str, factor: float):
"""乘法计算"""
self.add_rule('multiply', source_column=source_column,
target_column=target_column, factor=factor)
return self
def divide(self, source_column: str, target_column: str, divisor: float):
"""除法计算"""
self.add_rule('divide', source_column=source_column,
target_column=target_column, divisor=divisor)
return self
def add(self, columns: Union[str, List[str]], target_column: str, constant: float = 0):
"""加法计算"""
self.add_rule('add', columns=columns, target_column=target_column, constant=constant)
return self
def subtract(self, minuend: str, target_column: str,
subtrahend: Optional[str] = None, constant: float = 0):
"""减法计算"""
self.add_rule('subtract', minuend=minuend, target_column=target_column,
subtrahend=subtrahend, constant=constant)
return self
def formula(self, formula: str, target_column: str):
"""公式计算"""
self.add_rule('formula', formula=formula, target_column=target_column)
return self
def round_columns(self, columns: Optional[Union[str, List[str]]] = None, decimals: int = 0):
"""四舍五入"""
self.add_rule('round', columns=columns, decimals=decimals)
return self
def sum_columns(self, columns: Optional[Union[str, List[str]]] = None,
target_column: Optional[str] = None, group_by: Optional[str] = None):
"""求和计算"""
self.add_rule('sum', columns=columns, target_column=target_column, group_by=group_by)
return self
def aggregate(self, group_by: str, aggregations: Dict[str, Union[str, List[str]]]):
"""聚合计算"""
self.add_rule('aggregate', group_by=group_by, aggregations=aggregations)
return self
+276
View File
@@ -0,0 +1,276 @@
"""
列映射处理器
提供列名映射和转换功能,支持不同供应商的列名标准化
"""
import pandas as pd
from typing import Dict, Any, Optional, List, Union
from ...core.utils.log_utils import get_logger
logger = get_logger(__name__)
class ColumnMapper:
"""列映射处理器
提供列名标准化功能,将不同供应商的列名映射到标准列名
"""
# 标准列名定义
STANDARD_COLUMNS = {
'barcode': ['条码', '条形码', '商品条码', '产品条码', '条码(必填)', 'barcode', 'code'],
'name': ['商品名称', '产品名称', '名称', '商品', '产品', 'name', 'product_name'],
'specification': ['规格', '规格型号', '型号', 'specification', 'spec', 'model'],
'quantity': ['数量', '采购量', '订货数量', '订单量', '需求量', 'quantity', 'qty', '采购量(必填)'],
'unit': ['单位', '计量单位', 'unit', 'units'],
'unit_price': ['单价', '价格', '采购单价', '进货价', 'unit_price', 'price', '采购单价(必填)'],
'total_price': ['总价', '金额', '小计', 'total_price', 'total', 'amount'],
'category': ['类别', '分类', '商品类别', 'category', 'type'],
'brand': ['品牌', '商标', 'brand'],
'supplier': ['供应商', '供货商', 'supplier', 'vendor']
}
def __init__(self, mapping_config: Optional[Dict[str, Any]] = None):
"""初始化列映射器
Args:
mapping_config: 映射配置
"""
self.mapping_config = mapping_config or {}
self.custom_mappings = {}
self._build_reverse_mapping()
def _build_reverse_mapping(self):
"""构建反向映射表"""
self.reverse_mapping = {}
# 添加标准列的反向映射
for standard_name, variations in self.STANDARD_COLUMNS.items():
for variation in variations:
self.reverse_mapping[variation.lower()] = standard_name
# 添加自定义映射
for standard_name, custom_names in self.mapping_config.items():
if isinstance(custom_names, str):
custom_names = [custom_names]
for custom_name in custom_names:
self.reverse_mapping[custom_name.lower()] = standard_name
self.custom_mappings[custom_name.lower()] = standard_name
def map_columns(self, df: pd.DataFrame, target_columns: Optional[List[str]] = None) -> pd.DataFrame:
"""映射列名
Args:
df: 输入数据
target_columns: 目标列名列表,如果为None则使用所有标准列
Returns:
列名映射后的数据
"""
if target_columns is None:
target_columns = list(self.STANDARD_COLUMNS.keys())
logger.info(f"开始列名映射,目标列: {target_columns}")
logger.info(f"原始列名: {list(df.columns)}")
# 创建列名映射
column_mapping = {}
used_columns = set()
for target_col in target_columns:
# 查找匹配的原始列名
matched_column = self._find_matching_column(df.columns, target_col)
if matched_column:
column_mapping[matched_column] = target_col
used_columns.add(matched_column)
logger.debug(f"列名映射: {matched_column} -> {target_col}")
# 重命名列
if column_mapping:
df_mapped = df.rename(columns=column_mapping)
# 添加缺失的目标列
for target_col in target_columns:
if target_col not in df_mapped.columns:
df_mapped[target_col] = self._get_default_value(target_col)
logger.debug(f"添加缺失列: {target_col}")
# 只保留目标列
existing_target_columns = [col for col in target_columns if col in df_mapped.columns]
df_result = df_mapped[existing_target_columns]
logger.info(f"列名映射完成,结果列名: {list(df_result.columns)}")
return df_result
else:
logger.warning("没有找到可映射的列名")
return df
def _find_matching_column(self, columns: List[str], target_column: str) -> Optional[str]:
"""查找匹配的列名
Args:
columns: 原始列名列表
target_column: 目标标准列名
Returns:
匹配的原始列名或None
"""
# 获取目标列的所有可能变体
possible_names = []
# 标准列名变体
if target_column in self.STANDARD_COLUMNS:
possible_names.extend(self.STANDARD_COLUMNS[target_column])
# 自定义映射
for standard_name, custom_names in self.mapping_config.items():
if standard_name == target_column:
if isinstance(custom_names, str):
possible_names.append(custom_names)
else:
possible_names.extend(custom_names)
# 查找匹配
for possible_name in possible_names:
# 精确匹配(忽略大小写)
for column in columns:
if column.lower() == possible_name.lower():
return column
# 模糊匹配
for column in columns:
if possible_name.lower() in column.lower() or column.lower() in possible_name.lower():
return column
return None
def _get_default_value(self, column_name: str) -> Any:
"""获取列的默认值
Args:
column_name: 列名
Returns:
默认值
"""
# 根据列名类型返回合适的默认值
if column_name in ['quantity', 'unit_price', 'total_price']:
return 0
elif column_name in ['barcode', 'name', 'specification', 'unit', 'category', 'brand', 'supplier']:
return ''
else:
return None
def add_custom_mapping(self, standard_name: str, custom_names: Union[str, List[str]]):
"""添加自定义列名映射
Args:
standard_name: 标准列名
custom_names: 自定义列名或列名列表
"""
if isinstance(custom_names, str):
custom_names = [custom_names]
# 更新配置
self.mapping_config[standard_name] = custom_names
# 更新反向映射
for custom_name in custom_names:
self.reverse_mapping[custom_name.lower()] = standard_name
self.custom_mappings[custom_name.lower()] = standard_name
logger.info(f"添加自定义映射: {standard_name} <- {custom_names}")
def detect_column_types(self, df: pd.DataFrame) -> Dict[str, str]:
"""检测列的数据类型
Args:
df: 数据
Returns:
列类型字典
"""
column_types = {}
for column in df.columns:
if pd.api.types.is_numeric_dtype(df[column]):
column_types[column] = 'numeric'
elif pd.api.types.is_datetime64_any_dtype(df[column]):
column_types[column] = 'datetime'
elif pd.api.types.is_bool_dtype(df[column]):
column_types[column] = 'boolean'
else:
column_types[column] = 'text'
return column_types
def suggest_column_mapping(self, df: pd.DataFrame) -> Dict[str, List[str]]:
"""建议列名映射
Args:
df: 数据
Returns:
建议的映射关系
"""
suggestions = {}
for column in df.columns:
column_lower = column.lower()
suggestions[column] = []
# 检查标准列名
for standard_name, variations in self.STANDARD_COLUMNS.items():
for variation in variations:
if column_lower in variation.lower() or variation.lower() in column_lower:
suggestions[column].append(standard_name)
# 检查自定义映射
for custom_name, standard_name in self.custom_mappings.items():
if column_lower in custom_name or custom_name in column_lower:
suggestions[column].append(standard_name)
# 去重
suggestions[column] = list(set(suggestions[column]))
# 只返回有建议的列
return {k: v for k, v in suggestions.items() if v}
def validate_mapping(self, df: pd.DataFrame, required_columns: List[str]) -> Dict[str, Any]:
"""验证列映射结果
Args:
df: 映射后的数据
required_columns: 必需的列名列表
Returns:
验证结果
"""
result = {
'valid': True,
'missing_columns': [],
'empty_columns': [],
'warnings': []
}
# 检查缺失列
for col in required_columns:
if col not in df.columns:
result['missing_columns'].append(col)
result['valid'] = False
# 检查空列
for col in df.columns:
if df[col].isnull().all():
result['empty_columns'].append(col)
result['warnings'].append(f"'{col}' 全部为空值")
# 检查数值列
numeric_columns = ['quantity', 'unit_price', 'total_price']
for col in numeric_columns:
if col in df.columns and not pd.api.types.is_numeric_dtype(df[col]):
result['warnings'].append(f"'{col}' 不是数值类型")
return result
+401
View File
@@ -0,0 +1,401 @@
"""
数据清洗处理器
提供各种数据清洗功能,如空值处理、重复项处理、数据类型转换等
"""
import pandas as pd
from typing import Dict, Any, Optional, List, Union
from ...core.utils.log_utils import get_logger
logger = get_logger(__name__)
class DataCleaner:
"""数据清洗处理器
提供标准化的数据清洗功能,支持链式调用和规则配置
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""初始化数据清洗器
Args:
config: 清洗配置
"""
self.config = config or {}
self.cleaning_rules = []
def add_rule(self, rule_type: str, **kwargs):
"""添加清洗规则
Args:
rule_type: 规则类型
**kwargs: 规则参数
"""
rule = {'type': rule_type, **kwargs}
self.cleaning_rules.append(rule)
logger.debug(f"添加清洗规则: {rule_type}")
def clean(self, df: pd.DataFrame) -> pd.DataFrame:
"""执行数据清洗
Args:
df: 输入数据
Returns:
清洗后的数据
"""
logger.info(f"开始数据清洗,原始数据形状: {df.shape}")
result_df = df.copy()
for i, rule in enumerate(self.cleaning_rules):
try:
logger.debug(f"执行清洗规则 {i+1}/{len(self.cleaning_rules)}: {rule['type']}")
result_df = self._apply_rule(result_df, rule)
logger.debug(f"规则执行完成,数据形状: {result_df.shape}")
except Exception as e:
logger.error(f"清洗规则执行失败: {rule}, 错误: {e}")
# 继续执行下一个规则,而不是中断整个流程
continue
logger.info(f"数据清洗完成,最终数据形状: {result_df.shape}")
return result_df
def _apply_rule(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""应用单个清洗规则
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
rule_type = rule.get('type')
if rule_type == 'remove_duplicates':
return self._remove_duplicates(df, rule)
elif rule_type == 'fill_na':
return self._fill_na(df, rule)
elif rule_type == 'remove_rows':
return self._remove_rows(df, rule)
elif rule_type == 'convert_type':
return self._convert_type(df, rule)
elif rule_type == 'strip_whitespace':
return self._strip_whitespace(df, rule)
elif rule_type == 'normalize_text':
return self._normalize_text(df, rule)
elif rule_type == 'validate_data':
return self._validate_data(df, rule)
else:
logger.warning(f"未知的清洗规则类型: {rule_type}")
return df
def _remove_duplicates(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""移除重复项
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
subset = rule.get('subset') # 用于判断重复的列
keep = rule.get('keep', 'first') # 保留哪个重复项
before_count = len(df)
df_cleaned = df.drop_duplicates(subset=subset, keep=keep)
after_count = len(df_cleaned)
logger.info(f"移除重复项: {before_count - after_count} 行被移除")
return df_cleaned
def _fill_na(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""填充空值
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
columns = rule.get('columns') # 要处理的列
value = rule.get('value', 0) # 填充值
method = rule.get('method') # 填充方法('ffill', 'bfill', 'mean', 'median'
if columns:
# 处理指定列
if isinstance(columns, str):
columns = [columns]
for col in columns:
if col in df.columns:
if method == 'ffill':
df[col] = df[col].fillna(method='ffill')
elif method == 'bfill':
df[col] = df[col].fillna(method='bfill')
elif method == 'mean':
df[col] = df[col].fillna(df[col].mean())
elif method == 'median':
df[col] = df[col].fillna(df[col].median())
else:
df[col] = df[col].fillna(value)
logger.debug(f"填充列 {col} 的空值: {method or value}")
else:
# 处理所有列
if method == 'ffill':
df = df.fillna(method='ffill')
elif method == 'bfill':
df = df.fillna(method='bfill')
else:
df = df.fillna(value)
logger.debug(f"填充所有列的空值: {method or value}")
return df
def _remove_rows(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""移除行
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
condition = rule.get('condition') # 条件表达式
columns = rule.get('columns') # 要检查的列
values = rule.get('values') # 要移除的值
if condition:
# 使用条件表达式
try:
before_count = len(df)
df_filtered = df.query(condition)
after_count = len(df_filtered)
logger.info(f"条件过滤: {condition}, 移除了 {before_count - after_count}")
return df_filtered
except Exception as e:
logger.error(f"条件表达式执行失败: {condition}, 错误: {e}")
return df
if columns and values:
# 基于列值过滤
if isinstance(columns, str):
columns = [columns]
if not isinstance(values, list):
values = [values]
df_filtered = df.copy()
for col in columns:
if col in df_filtered.columns:
mask = ~df_filtered[col].isin(values)
df_filtered = df_filtered[mask]
logger.debug(f"{col} 过滤值 {values}")
return df_filtered
logger.warning("移除行规则缺少条件或列配置")
return df
def _convert_type(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""类型转换
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
columns = rule.get('columns')
target_type = rule.get('target_type', 'float')
errors = rule.get('errors', 'coerce') # 错误处理方式
if isinstance(columns, str):
columns = [columns]
for col in columns:
if col in df.columns:
try:
if target_type == 'int':
df[col] = pd.to_numeric(df[col], errors=errors).astype('Int64')
elif target_type == 'float':
df[col] = pd.to_numeric(df[col], errors=errors)
elif target_type == 'datetime':
df[col] = pd.to_datetime(df[col], errors=errors)
elif target_type == 'string':
df[col] = df[col].astype(str)
else:
df[col] = df[col].astype(target_type)
logger.debug(f"{col} 类型转换: {target_type}")
except Exception as e:
logger.error(f"{col} 类型转换失败: {e}")
return df
def _strip_whitespace(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""去除空白字符
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
columns = rule.get('columns')
if columns:
if isinstance(columns, str):
columns = [columns]
for col in columns:
if col in df.columns and df[col].dtype == 'object':
df[col] = df[col].str.strip()
logger.debug(f"{col} 去除空白字符")
else:
# 处理所有文本列
text_columns = df.select_dtypes(include=['object']).columns
for col in text_columns:
df[col] = df[col].str.strip()
logger.debug(f"所有文本列去除空白字符: {list(text_columns)}")
return df
def _normalize_text(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""文本标准化
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
columns = rule.get('columns')
lowercase = rule.get('lowercase', False)
uppercase = rule.get('uppercase', False)
replace_map = rule.get('replace_map', {}) # 替换映射
if isinstance(columns, str):
columns = [columns]
target_columns = columns or df.select_dtypes(include=['object']).columns
for col in target_columns:
if col in df.columns and df[col].dtype == 'object':
if lowercase:
df[col] = df[col].str.lower()
elif uppercase:
df[col] = df[col].str.upper()
# 应用替换映射
for old, new in replace_map.items():
df[col] = df[col].str.replace(old, new)
logger.debug(f"{col} 文本标准化完成")
return df
def _validate_data(self, df: pd.DataFrame, rule: Dict[str, Any]) -> pd.DataFrame:
"""数据验证
Args:
df: 数据
rule: 规则配置
Returns:
处理后的数据
"""
columns = rule.get('columns')
min_value = rule.get('min_value')
max_value = rule.get('max_value')
required = rule.get('required', False)
if isinstance(columns, str):
columns = [columns]
validation_results = []
for col in columns:
if col in df.columns:
# 检查必需值
if required:
null_count = df[col].isnull().sum()
if null_count > 0:
validation_results.append(f"{col}: {null_count} 个空值")
# 检查数值范围
if min_value is not None or max_value is not None:
if pd.api.types.is_numeric_dtype(df[col]):
invalid_mask = pd.Series(False, index=df.index)
if min_value is not None:
invalid_mask |= df[col] < min_value
if max_value is not None:
invalid_mask |= df[col] > max_value
invalid_count = invalid_mask.sum()
if invalid_count > 0:
validation_results.append(f"{col}: {invalid_count} 个值超出范围")
if validation_results:
logger.warning(f"数据验证发现问题: {', '.join(validation_results)}")
else:
logger.debug("数据验证通过")
return df
# 便捷方法
def remove_duplicates(self, subset: Optional[List[str]] = None, keep: str = 'first'):
"""移除重复项"""
self.add_rule('remove_duplicates', subset=subset, keep=keep)
return self
def fill_na(self, columns: Optional[Union[str, List[str]]] = None,
value: Any = 0, method: Optional[str] = None):
"""填充空值"""
self.add_rule('fill_na', columns=columns, value=value, method=method)
return self
def remove_rows(self, condition: Optional[str] = None,
columns: Optional[Union[str, List[str]]] = None,
values: Optional[Any] = None):
"""移除行"""
self.add_rule('remove_rows', condition=condition, columns=columns, values=values)
return self
def convert_type(self, columns: Union[str, List[str]], target_type: str, errors: str = 'coerce'):
"""类型转换"""
self.add_rule('convert_type', columns=columns, target_type=target_type, errors=errors)
return self
def strip_whitespace(self, columns: Optional[Union[str, List[str]]] = None):
"""去除空白字符"""
self.add_rule('strip_whitespace', columns=columns)
return self
def normalize_text(self, columns: Optional[Union[str, List[str]]] = None,
lowercase: bool = False, uppercase: bool = False,
replace_map: Optional[Dict[str, str]] = None):
"""文本标准化"""
self.add_rule('normalize_text', columns=columns, lowercase=lowercase,
uppercase=uppercase, replace_map=replace_map or {})
return self
def validate_data(self, columns: Union[str, List[str]],
min_value: Optional[float] = None,
max_value: Optional[float] = None,
required: bool = False):
"""数据验证"""
self.add_rule('validate_data', columns=columns, min_value=min_value,
max_value=max_value, required=required)
return self
+14 -2
View File
@@ -11,7 +11,7 @@ import json
import base64
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Optional, Tuple, Union, Any
from typing import Dict, List, Optional, Tuple, Union, Any, Callable
from ...config.settings import ConfigManager
from ..utils.log_utils import get_logger
@@ -332,7 +332,7 @@ class OCRProcessor:
logger.error(f"处理图片时出错: {image_path}, 错误: {e}")
return None
def process_images_batch(self, batch_size: int = None, max_workers: int = None) -> Tuple[int, int]:
def process_images_batch(self, batch_size: int = None, max_workers: int = None, progress_cb: Optional[Callable[[int], None]] = None) -> Tuple[int, int]:
"""
批量处理图片
@@ -369,6 +369,13 @@ class OCRProcessor:
for i in range(0, total, batch_size):
batch = unprocessed_images[i:i+batch_size]
logger.info(f"处理批次 {i//batch_size+1}/{(total+batch_size-1)//batch_size}: {len(batch)} 个文件")
try:
if progress_cb:
# 以批次为单位估算进度(0-90%),保留10%给后续阶段
percent = int(10 + (i / max(total, 1)) * 80)
progress_cb(min(percent, 90))
except Exception:
pass
# 使用多线程处理批次
with ThreadPoolExecutor(max_workers=max_workers) as executor:
@@ -378,4 +385,9 @@ class OCRProcessor:
success_count += sum(1 for result in results if result is not None)
logger.info(f"所有图片处理完成, 总计: {total}, 成功: {success_count}")
try:
if progress_cb:
progress_cb(90)
except Exception:
pass
return total, success_count
+9
View File
@@ -0,0 +1,9 @@
"""
处理器模块初始化文件
"""
from .base import BaseProcessor
from .ocr_processor import OCRProcessor
from .tobacco_processor import TobaccoProcessor
__all__ = ['BaseProcessor', 'OCRProcessor', 'TobaccoProcessor']
+139
View File
@@ -0,0 +1,139 @@
"""
基础处理器接口模块
定义所有处理器的基类,提供统一的处理接口
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
class BaseProcessor(ABC):
"""基础处理器接口 - 所有处理器的基类
采用策略模式设计,每个处理器负责特定类型的文件处理
"""
def __init__(self, config: Dict[str, Any]):
"""初始化处理器
Args:
config: 处理器配置字典
"""
self.config = config
self.name = self.__class__.__name__
self.description = ""
self._setup_logging()
def _setup_logging(self):
"""设置处理器日志"""
self.logger = logging.getLogger(f"{__name__}.{self.name}")
@abstractmethod
def can_process(self, file_path: Path) -> bool:
"""判断是否能处理该文件
Args:
file_path: 文件路径
Returns:
是否能处理该文件
"""
pass
@abstractmethod
def process(self, input_file: Path, output_dir: Path) -> Optional[Path]:
"""处理文件,返回输出文件路径
Args:
input_file: 输入文件路径
output_dir: 输出目录路径
Returns:
输出文件路径,处理失败返回None
"""
pass
@abstractmethod
def get_required_columns(self) -> List[str]:
"""返回需要的列名列表
Returns:
列名列表
"""
pass
def validate_input(self, file_path: Path) -> bool:
"""验证输入文件有效性
Args:
file_path: 文件路径
Returns:
文件是否有效
"""
try:
if not file_path.exists():
self.logger.warning(f"文件不存在: {file_path}")
return False
if not file_path.is_file():
self.logger.warning(f"不是文件: {file_path}")
return False
supported_extensions = self.get_supported_extensions()
if supported_extensions and file_path.suffix.lower() not in supported_extensions:
self.logger.warning(f"不支持的文件类型: {file_path.suffix}, 支持的类型: {supported_extensions}")
return False
return True
except Exception as e:
self.logger.error(f"验证文件时出错: {e}")
return False
def get_supported_extensions(self) -> List[str]:
"""获取支持的文件扩展名
Returns:
支持的扩展名列表,空列表表示支持所有类型
"""
return []
def get_output_filename(self, input_file: Path, suffix: str = "_processed") -> str:
"""生成输出文件名
Args:
input_file: 输入文件路径
suffix: 文件名后缀
Returns:
输出文件名
"""
return f"{input_file.stem}{suffix}{input_file.suffix}"
def log_processing_start(self, input_file: Path):
"""记录处理开始日志"""
self.logger.info(f"开始处理文件: {input_file}")
self.logger.info(f"处理器: {self.name} - {self.description}")
def log_processing_end(self, input_file: Path, output_file: Optional[Path] = None, success: bool = True):
"""记录处理结束日志"""
if success:
self.logger.info(f"处理完成: {input_file}")
if output_file:
self.logger.info(f"输出文件: {output_file}")
else:
self.logger.error(f"处理失败: {input_file}")
def __str__(self) -> str:
"""字符串表示"""
return f"{self.name}({self.description})"
def __repr__(self) -> str:
"""详细字符串表示"""
return f"{self.__class__.__module__}.{self.__class__.__name__}(name='{self.name}', description='{self.description}')"
+192
View File
@@ -0,0 +1,192 @@
"""
OCR处理器
处理图片文件的OCR识别完整流程:图片识别 → Excel处理 → 标准采购单生成
"""
import os
from pathlib import Path
from typing import Optional, Dict, Any, List
from .base import BaseProcessor
from ...services.ocr_service import OCRService
from ...services.order_service import OrderService
from ...core.utils.log_utils import get_logger
logger = get_logger(__name__)
class OCRProcessor(BaseProcessor):
"""OCR处理器
处理图片文件的完整OCR识别流程:
1. OCR识别图片中的表格信息
2. 处理识别结果生成Excel文件
3. 转换为标准采购单格式
"""
def __init__(self, config: Dict[str, Any]):
"""初始化OCR处理器
Args:
config: 配置信息
"""
super().__init__(config)
self.description = "OCR识别完整流程(图片→识别→Excel→采购单)"
# 初始化服务
self.ocr_service = OCRService(config)
self.order_service = OrderService(config)
def can_process(self, file_path: Path) -> bool:
"""判断是否为支持的图片文件
Args:
file_path: 文件路径
Returns:
是否能处理该文件
"""
if not self.validate_input(file_path):
return False
# 支持的图片格式
supported_extensions = ['.jpg', '.jpeg', '.png', '.bmp']
if file_path.suffix.lower() in supported_extensions:
self.logger.info(f"识别为图片文件: {file_path.name}")
return True
return False
def process(self, input_file: Path, output_dir: Path) -> Optional[Path]:
"""处理图片文件的完整OCR流程
Args:
input_file: 输入图片文件路径
output_dir: 输出目录路径
Returns:
输出文件路径,处理失败返回None
"""
self.log_processing_start(input_file)
try:
self.logger.info("开始OCR识别流程...")
# 步骤1: OCR识别
self.logger.info("步骤1/3: OCR识别图片...")
ocr_result = self._perform_ocr(input_file, output_dir)
if not ocr_result:
self.logger.error("OCR识别失败")
self.log_processing_end(input_file, success=False)
return None
# 步骤2: Excel处理
self.logger.info("步骤2/3: 处理Excel文件...")
excel_result = self._process_excel(ocr_result, output_dir)
if not excel_result:
self.logger.error("Excel处理失败")
self.log_processing_end(input_file, success=False)
return None
# 步骤3: 生成标准采购单
self.logger.info("步骤3/3: 生成标准采购单...")
final_result = self._generate_purchase_order(excel_result, output_dir)
if final_result:
self.logger.info(f"OCR处理流程完成,输出文件: {final_result}")
self.log_processing_end(input_file, final_result, success=True)
return final_result
else:
self.logger.error("生成采购单失败")
self.log_processing_end(input_file, success=False)
return None
except Exception as e:
self.logger.error(f"OCR处理流程出错: {e}", exc_info=True)
self.log_processing_end(input_file, success=False)
return None
def get_required_columns(self) -> List[str]:
"""返回需要的列名列表"""
# OCR处理不直接依赖列名,由后续处理步骤决定
return []
def get_supported_extensions(self) -> List[str]:
"""支持的文件扩展名"""
return ['.jpg', '.jpeg', '.png', '.bmp']
def _perform_ocr(self, input_file: Path, output_dir: Path) -> Optional[Path]:
"""执行OCR识别
Args:
input_file: 输入图片文件
output_dir: 输出目录
Returns:
OCR生成的Excel文件路径,失败返回None
"""
try:
self.logger.info(f"开始OCR识别: {input_file}")
# 使用OCR服务处理图片
result_path = self.ocr_service.process_image(str(input_file))
if result_path:
# 确保结果文件在输出目录中
result_path = Path(result_path)
if result_path.exists():
self.logger.info(f"OCR识别成功,输出文件: {result_path}")
return result_path
else:
self.logger.error(f"OCR结果文件不存在: {result_path}")
return None
else:
self.logger.error("OCR服务返回None")
return None
except Exception as e:
self.logger.error(f"OCR识别失败: {e}", exc_info=True)
return None
def _process_excel(self, excel_file: Path, output_dir: Path) -> Optional[Path]:
"""处理Excel文件
Args:
excel_file: Excel文件路径
output_dir: 输出目录
Returns:
处理后的Excel文件路径,失败返回None
"""
try:
self.logger.info(f"开始处理Excel文件: {excel_file}")
# 使用订单服务处理Excel文件(生成采购单)
result_path = self.order_service.process_excel(str(excel_file))
if result_path:
result_path = Path(result_path)
if result_path.exists():
self.logger.info(f"Excel处理成功,输出文件: {result_path}")
return result_path
else:
self.logger.error(f"Excel处理结果文件不存在: {result_path}")
return None
else:
self.logger.error("Excel处理服务返回None")
return None
except Exception as e:
self.logger.error(f"Excel处理失败: {e}", exc_info=True)
return None
def _generate_purchase_order(self, processed_file: Path, output_dir: Path) -> Optional[Path]:
"""采购单生成由OrderService完成,此处直接返回处理结果"""
try:
if processed_file and processed_file.exists():
return processed_file
return None
except Exception:
return None
@@ -0,0 +1,7 @@
"""
供应商处理器模块初始化文件
"""
from .generic_supplier_processor import GenericSupplierProcessor
__all__ = ['GenericSupplierProcessor']
@@ -0,0 +1,430 @@
"""
通用供应商处理器
可配置化的供应商处理器,支持通过配置文件定义处理规则
"""
import fnmatch
import pandas as pd
from typing import Optional, Dict, Any, List
from pathlib import Path
from ..base import BaseProcessor
from ...utils.log_utils import get_logger
logger = get_logger(__name__)
class GenericSupplierProcessor(BaseProcessor):
"""通用供应商处理器
基于配置文件处理不同供应商的Excel文件,支持:
- 文件名模式匹配
- 内容特征识别
- 列映射配置
- 数据清洗规则
- 计算处理规则
"""
def __init__(self, config: Dict[str, Any], supplier_config: Dict[str, Any]):
"""初始化通用供应商处理器
Args:
config: 系统配置
supplier_config: 供应商特定配置
"""
super().__init__(config)
self.supplier_config = supplier_config
# 从配置中提取基本信息
self.name = supplier_config.get('name', 'GenericSupplier')
self.description = supplier_config.get('description', '通用供应商处理器')
# 处理规则配置
self.filename_patterns = supplier_config.get('filename_patterns', [])
self.content_indicators = supplier_config.get('content_indicators', [])
self.column_mapping = supplier_config.get('column_mapping', {})
self.cleaning_rules = supplier_config.get('cleaning_rules', [])
self.calculations = supplier_config.get('calculations', [])
# 输出配置
self.output_template = supplier_config.get('output_template', 'templates/银豹-采购单模板.xls')
self.output_suffix = supplier_config.get('output_suffix', '_银豹采购单')
def can_process(self, file_path: Path) -> bool:
"""判断是否能处理该文件
Args:
file_path: 文件路径
Returns:
是否能处理
"""
if not self.validate_input(file_path):
return False
# 检查文件名模式
if self.filename_patterns:
filename_match = self._check_filename_patterns(file_path)
if filename_match:
return True
# 检查文件内容特征
if self.content_indicators:
content_match = self._check_content_indicators(file_path)
if content_match:
return True
# 如果都没有配置,则无法判断
if not self.filename_patterns and not self.content_indicators:
self.logger.warning(f"处理器 {self.name} 没有配置识别规则")
return False
return False
def process(self, input_file: Path, output_dir: Path) -> Optional[Path]:
"""处理文件
Args:
input_file: 输入文件路径
output_dir: 输出目录路径
Returns:
输出文件路径,处理失败返回None
"""
self.log_processing_start(input_file)
try:
# 步骤1: 读取数据
self.logger.info("步骤1/4: 读取数据...")
df = self._read_supplier_data(input_file)
if df is None or df.empty:
self.logger.error("读取数据失败或数据为空")
self.log_processing_end(input_file, success=False)
return None
# 步骤2: 应用列映射
self.logger.info("步骤2/4: 应用列映射...")
mapped_df = self._apply_column_mapping(df)
if mapped_df is None:
self.logger.error("列映射失败")
self.log_processing_end(input_file, success=False)
return None
# 步骤3: 数据清洗
self.logger.info("步骤3/4: 数据清洗...")
cleaned_df = self._apply_data_cleaning(mapped_df)
if cleaned_df is None:
self.logger.error("数据清洗失败")
self.log_processing_end(input_file, success=False)
return None
# 步骤4: 计算处理
self.logger.info("步骤4/4: 计算处理...")
calculated_df = self._apply_calculations(cleaned_df)
if calculated_df is None:
self.logger.error("计算处理失败")
self.log_processing_end(input_file, success=False)
return None
# 生成输出文件
output_file = self._generate_output(calculated_df, input_file, output_dir)
if output_file and output_file.exists():
self.logger.info(f"处理完成,输出文件: {output_file}")
self.log_processing_end(input_file, output_file, success=True)
return output_file
else:
self.logger.error("输出文件生成失败")
self.log_processing_end(input_file, success=False)
return None
except Exception as e:
self.logger.error(f"处理文件时出错: {e}", exc_info=True)
self.log_processing_end(input_file, success=False)
return None
def get_required_columns(self) -> List[str]:
"""返回需要的列名列表"""
# 从列映射配置中提取目标列名
return list(self.column_mapping.values()) if self.column_mapping else []
def _check_filename_patterns(self, file_path: Path) -> bool:
"""检查文件名模式
Args:
file_path: 文件路径
Returns:
是否匹配
"""
try:
filename = file_path.name
for pattern in self.filename_patterns:
if fnmatch.fnmatch(filename.lower(), pattern.lower()):
self.logger.info(f"文件名匹配成功: {filename} -> {pattern}")
return True
return False
except Exception as e:
self.logger.error(f"检查文件名模式时出错: {e}")
return False
def _check_content_indicators(self, file_path: Path) -> bool:
"""检查文件内容特征
Args:
file_path: 文件路径
Returns:
是否匹配
"""
try:
df = self._read_excel_safely(file_path, nrows=5)
# 检查列名中是否包含指定关键词
columns_str = str(list(df.columns)).lower()
for indicator in self.content_indicators:
if indicator.lower() in columns_str:
self.logger.info(f"内容特征匹配成功: {indicator}")
return True
return False
except Exception as e:
self.logger.error(f"检查内容特征时出错: {e}")
return False
def _read_supplier_data(self, file_path: Path) -> Optional[pd.DataFrame]:
"""读取供应商数据
Args:
file_path: 文件路径
Returns:
数据DataFrame或None
"""
try:
df = self._read_excel_safely(file_path)
if df.empty:
self.logger.warning("数据文件为空")
return None
self.logger.info(f"成功读取数据,形状: {df.shape}")
return df
except Exception as e:
self.logger.error(f"读取数据失败: {e}")
return None
def _read_excel_safely(self, file_path: Path, **kwargs) -> pd.DataFrame:
"""根据扩展名选择合适的读取引擎并带有回退"""
suffix = file_path.suffix.lower()
try:
if suffix == '.xlsx':
return pd.read_excel(file_path, engine='openpyxl', **kwargs)
elif suffix == '.xls':
try:
return pd.read_excel(file_path, engine='xlrd', **kwargs)
except Exception as e:
self.logger.warning(f"读取xls失败,可能缺少xlrd: {e}")
raise
else:
return pd.read_excel(file_path, **kwargs)
except Exception as e:
self.logger.error(f"读取Excel失败: {file_path} - {e}")
raise
def _apply_column_mapping(self, df: pd.DataFrame) -> Optional[pd.DataFrame]:
"""应用列映射
Args:
df: 原始数据
Returns:
映射后的数据或None
"""
if not self.column_mapping:
self.logger.info("没有列映射配置")
return df
try:
# 应用列重命名
df_renamed = df.rename(columns=self.column_mapping)
# 检查必需的列是否存在
required_columns = self.get_required_columns()
missing_columns = [col for col in required_columns if col not in df_renamed.columns]
if missing_columns:
self.logger.warning(f"缺少必需的列: {missing_columns}")
# 创建缺失的列并填充默认值
for col in missing_columns:
df_renamed[col] = 0 if '' in col or '' in col else ''
self.logger.info(f"创建缺失列: {col},默认值: {df_renamed[col].iloc[0] if len(df_renamed) > 0 else 'N/A'}")
self.logger.info(f"列映射完成,列名: {list(df_renamed.columns)}")
return df_renamed
except Exception as e:
self.logger.error(f"列映射失败: {e}")
return None
def _apply_data_cleaning(self, df: pd.DataFrame) -> Optional[pd.DataFrame]:
"""应用数据清洗规则
Args:
df: 映射后的数据
Returns:
清洗后的数据或None
"""
if not self.cleaning_rules:
self.logger.info("没有数据清洗规则")
return df
try:
df_cleaned = df.copy()
for rule in self.cleaning_rules:
rule_type = rule.get('type')
if rule_type == 'remove_rows':
# 删除行
condition = rule.get('condition')
if condition:
before_count = len(df_cleaned)
df_cleaned = df_cleaned.query(condition)
after_count = len(df_cleaned)
self.logger.info(f"删除行规则: {condition}, 删除数量: {before_count - after_count}")
elif rule_type == 'fill_na':
# 填充空值,兼容单列和多列
columns = rule.get('columns') or [rule.get('column')] if rule.get('column') else []
value = rule.get('value', 0)
for col in columns:
if col and col in df_cleaned.columns:
na_count = df_cleaned[col].isna().sum()
df_cleaned[col] = df_cleaned[col].fillna(value)
self.logger.info(f"填充空值: {col} -> {value}, 填充数量: {na_count}")
elif rule_type == 'convert_type':
# 类型转换,兼容单列和多列
target_type = rule.get('target_type', 'float')
columns = rule.get('columns') or [rule.get('column')] if rule.get('column') else []
for col in columns:
if col and col in df_cleaned.columns:
try:
if target_type == 'float':
df_cleaned[col] = pd.to_numeric(df_cleaned[col], errors='coerce')
elif target_type == 'int':
df_cleaned[col] = pd.to_numeric(df_cleaned[col], errors='coerce').astype('Int64')
self.logger.info(f"类型转换: {col} -> {target_type}")
except Exception as e:
self.logger.warning(f"类型转换失败: {col} -> {target_type}: {e}")
else:
self.logger.warning(f"未知的清洗规则类型: {rule_type}")
self.logger.info(f"数据清洗完成,数据形状: {df_cleaned.shape}")
return df_cleaned
except Exception as e:
self.logger.error(f"数据清洗失败: {e}")
return None
def _apply_calculations(self, df: pd.DataFrame) -> Optional[pd.DataFrame]:
"""应用计算处理
Args:
df: 清洗后的数据
Returns:
计算后的数据或None
"""
if not self.calculations:
self.logger.info("没有计算规则")
return df
try:
df_calculated = df.copy()
for calculation in self.calculations:
calc_type = calculation.get('type')
if calc_type == 'multiply':
# 乘法计算
source_column = calculation.get('source_column')
target_column = calculation.get('target_column')
factor = calculation.get('factor', 1)
if source_column and target_column:
if source_column in df_calculated.columns:
df_calculated[target_column] = df_calculated[source_column] * factor
self.logger.info(f"乘法计算: {source_column} * {factor} -> {target_column}")
else:
self.logger.warning(f"源列不存在: {source_column}")
elif calc_type == 'divide':
# 除法计算
source_column = calculation.get('source_column')
target_column = calculation.get('target_column')
divisor = calculation.get('divisor', 1)
if source_column and target_column and divisor != 0:
if source_column in df_calculated.columns:
df_calculated[target_column] = df_calculated[source_column] / divisor
self.logger.info(f"除法计算: {source_column} / {divisor} -> {target_column}")
else:
self.logger.warning(f"源列不存在: {source_column}")
elif calc_type == 'formula':
# 公式计算
formula = calculation.get('formula')
target_column = calculation.get('target_column')
if formula and target_column:
try:
df_calculated[target_column] = df_calculated.eval(formula)
self.logger.info(f"公式计算: {formula} -> {target_column}")
except Exception as e:
self.logger.error(f"公式计算失败: {formula}: {e}")
else:
self.logger.warning(f"未知的计算类型: {calc_type}")
self.logger.info(f"计算处理完成,数据形状: {df_calculated.shape}")
return df_calculated
except Exception as e:
self.logger.error(f"计算处理失败: {e}")
return None
def _generate_output(self, df: pd.DataFrame, input_file: Path, output_dir: Path) -> Optional[Path]:
"""生成输出文件
Args:
df: 最终数据
input_file: 输入文件路径
output_dir: 输出目录
Returns:
输出文件路径或None
"""
try:
# 生成输出文件名
timestamp = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"{input_file.stem}{self.output_suffix}_{timestamp}.xls"
output_file = output_dir / output_filename
# 这里应该使用实际的模板生成逻辑
# 暂时直接保存为Excel文件
df.to_excel(output_file, index=False)
self.logger.info(f"输出文件生成成功: {output_file}")
return output_file
except Exception as e:
self.logger.error(f"生成输出文件失败: {e}")
return None
+362
View File
@@ -0,0 +1,362 @@
"""
烟草订单处理器
处理烟草公司特定格式的订单明细文件,生成银豹采购单
"""
import os
import datetime
import pandas as pd
import xlrd
import xlwt
from xlutils.copy import copy
from openpyxl import load_workbook
from typing import Optional, Dict, Any, List, Tuple
from pathlib import Path
from .base import BaseProcessor
from ...core.utils.log_utils import get_logger
from ...core.utils.dialog_utils import show_custom_dialog
logger = get_logger(__name__)
class TobaccoProcessor(BaseProcessor):
"""烟草订单处理器
处理烟草公司订单明细文件,提取商品信息并生成标准银豹采购单格式
"""
def __init__(self, config: Dict[str, Any]):
"""初始化烟草订单处理器
Args:
config: 配置信息
"""
super().__init__(config)
self.description = "处理烟草公司订单明细文件"
self.template_file = config.get('Paths', 'template_file', fallback='templates/银豹-采购单模板.xls')
# 输出目录配置
self.result_dir = Path("data/result")
self.result_dir.mkdir(exist_ok=True)
# 默认输出文件名
self.default_output_name = "银豹采购单_烟草公司.xls"
def can_process(self, file_path: Path) -> bool:
"""判断是否为烟草订单文件
Args:
file_path: 文件路径
Returns:
是否能处理该文件
"""
if not self.validate_input(file_path):
return False
# 检查文件名特征
filename = file_path.name
tobacco_keywords = ['烟草', '卷烟', '订单明细', 'tobacco', '']
# 检查文件内容特征
try:
df = self._read_excel_safely(file_path, nrows=5)
required_columns = ['商品', '盒码', '订单量']
# 检查文件名或内容特征
filename_match = any(keyword in filename for keyword in tobacco_keywords)
content_match = all(col in df.columns for col in required_columns)
if filename_match or content_match:
self.logger.info(f"识别为烟草订单文件: {filename}")
return True
return False
except Exception as e:
self.logger.warning(f"检查文件内容时出错: {e}")
# 如果无法读取内容,仅基于文件名判断
return any(keyword in filename for keyword in tobacco_keywords)
def process(self, input_file: Path, output_dir: Path) -> Optional[Path]:
"""处理烟草订单
Args:
input_file: 输入文件路径
output_dir: 输出目录路径
Returns:
输出文件路径,处理失败返回None
"""
self.log_processing_start(input_file)
try:
# 读取订单信息(时间和总金额)
order_info = self._read_order_info(input_file)
if not order_info:
self.logger.error(f"读取订单信息失败: {input_file}")
self.log_processing_end(input_file, success=False)
return None
order_time, total_amount = order_info
self.logger.info(f"订单信息 - 时间: {order_time}, 总金额: {total_amount}")
# 读取订单数据
order_data = self._read_order_data(input_file)
if order_data is None or order_data.empty:
self.logger.error(f"读取订单数据失败或数据为空: {input_file}")
self.log_processing_end(input_file, success=False)
return None
self.logger.info(f"成功读取订单数据,共{len(order_data)}条记录")
# 生成输出文件路径
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"银豹采购单_烟草公司_{timestamp}.xls"
output_file = output_dir / output_filename
# 确保输出目录存在
output_file.parent.mkdir(parents=True, exist_ok=True)
# 生成银豹采购单
result = self._generate_pospal_order(order_data, order_time, output_file)
if result:
self.logger.info(f"采购单生成成功: {output_file}")
self.log_processing_end(input_file, output_file, success=True)
# 显示处理结果
self._show_processing_result(output_file, order_time, len(order_data), total_amount)
return output_file
else:
self.logger.error("生成银豹采购单失败")
self.log_processing_end(input_file, success=False)
return None
except Exception as e:
self.logger.error(f"处理烟草订单时发生错误: {e}", exc_info=True)
self.log_processing_end(input_file, success=False)
return None
def get_required_columns(self) -> List[str]:
"""返回需要的列名列表"""
return ['商品', '盒码', '条码', '建议零售价', '批发价', '需求量', '订单量', '金额']
def get_supported_extensions(self) -> List[str]:
"""支持的文件扩展名"""
return ['.xlsx', '.xls']
def _read_order_info(self, file_path: Path) -> Optional[Tuple[str, float]]:
"""读取订单信息(时间和总金额)
Args:
file_path: 文件路径
Returns:
包含订单时间和总金额的元组或None
"""
try:
wb_info = load_workbook(file_path, data_only=True)
ws_info = wb_info.active
# 从指定单元格读取订单信息
order_time = ws_info["H1"].value or "(空)"
total_amount = ws_info["H3"].value or 0.0
self.logger.info(f"成功读取订单信息: 时间={order_time}, 总金额={total_amount}")
return (order_time, total_amount)
except Exception as e:
self.logger.error(f"读取订单信息出错: {e}")
return None
def _read_order_data(self, file_path: Path) -> Optional[pd.DataFrame]:
"""读取订单数据
Args:
file_path: 文件路径
Returns:
订单数据DataFrame或None
"""
columns = ['商品', '盒码', '条码', '建议零售价', '批发价', '需求量', '订单量', '金额']
try:
df_old = self._read_excel_safely(file_path, header=None, skiprows=3, names=columns)
# 过滤订单量不为0的数据,并计算采购量和单价
df_filtered = df_old[df_old['订单量'] != 0].copy()
if df_filtered.empty:
self.logger.warning("没有订单量不为0的记录")
return None
# 计算采购量和单价
df_filtered['采购量'] = df_filtered['订单量'] * 10 # 烟草订单通常需要乘以10
df_filtered['采购单价'] = df_filtered['金额'] / df_filtered['采购量']
df_filtered = df_filtered.reset_index(drop=True)
self.logger.info(f"成功处理订单数据,有效记录数: {len(df_filtered)}")
return df_filtered
except Exception as e:
self.logger.error(f"读取订单数据失败: {e}")
return None
def _read_excel_safely(self, file_path: Path, **kwargs) -> pd.DataFrame:
suffix = file_path.suffix.lower()
if suffix == '.xlsx':
return pd.read_excel(file_path, engine='openpyxl', **kwargs)
elif suffix == '.xls':
try:
return pd.read_excel(file_path, engine='xlrd', **kwargs)
except Exception as e:
self.logger.error(f"读取xls失败,可能缺少xlrd: {e}")
raise
else:
return pd.read_excel(file_path, **kwargs)
def _generate_pospal_order(self, order_data: pd.DataFrame, order_time: str, output_file: Path) -> bool:
"""生成银豹采购单
Args:
order_data: 订单数据
order_time: 订单时间
output_file: 输出文件路径
Returns:
是否生成成功
"""
try:
# 检查模板文件是否存在
template_path = Path(self.template_file)
if not template_path.exists():
self.logger.error(f"采购单模板文件不存在: {template_path}")
return False
self.logger.info(f"使用模板文件: {template_path}")
# 打开模板,准备写入
template_rd = xlrd.open_workbook(str(template_path), formatting_info=True)
template_wb = copy(template_rd)
template_ws = template_wb.get_sheet(0)
# 获取模板中的表头列索引
header_row = template_rd.sheet_by_index(0).row_values(0)
# 查找需要的列索引
try:
barcode_col = header_row.index("条码(必填)")
amount_col = header_row.index("采购量(必填)")
gift_col = header_row.index("赠送量")
price_col = header_row.index("采购单价(必填)")
except ValueError as e:
self.logger.error(f"模板列查找失败: {e}")
return False
self.logger.info(f"模板列索引 - 条码:{barcode_col}, 采购量:{amount_col}, 赠送量:{gift_col}, 单价:{price_col}")
# 写入数据到模板
for i, row in order_data.iterrows():
template_ws.write(i + 1, barcode_col, row['盒码']) # 商品条码
template_ws.write(i + 1, amount_col, int(row['采购量'])) # 采购量
template_ws.write(i + 1, gift_col, "") # 赠送量为空
template_ws.write(i + 1, price_col, round(row['采购单价'], 2)) # 采购单价保留两位小数
# 确保输出目录存在
output_file.parent.mkdir(parents=True, exist_ok=True)
# 保存输出文件
template_wb.save(str(output_file))
self.logger.info(f"采购单生成成功: {output_file}")
return True
except Exception as e:
self.logger.error(f"生成银豹采购单失败: {e}", exc_info=True)
return False
def _show_processing_result(self, output_file: Path, order_time: str, total_count: int, total_amount: float):
"""显示处理结果
Args:
output_file: 输出文件路径
order_time: 订单时间
total_count: 处理条目数
total_amount: 总金额
"""
try:
# 创建附加信息
additional_info = {
"订单来源": "烟草公司",
"处理时间": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# 格式化金额显示
try:
if isinstance(total_amount, str):
total_amount = float(total_amount.replace(',', ''))
amount_display = f"¥{total_amount:.2f}"
except (ValueError, TypeError):
amount_display = f"¥{total_amount}"
# 显示自定义对话框
show_custom_dialog(
title="烟草订单处理结果",
message="烟草订单处理完成",
result_file=str(output_file),
time_info=order_time,
count_info=f"{total_count}个商品",
amount_info=amount_display,
additional_info=additional_info
)
self.logger.info(f"显示处理结果 - 文件:{output_file}, 时间:{order_time}, 数量:{total_count}, 金额:{total_amount}")
except Exception as e:
self.logger.error(f"显示处理结果时出错: {e}")
def get_latest_tobacco_order(self) -> Optional[Path]:
"""获取最新的烟草订单明细文件(兼容旧接口)
Returns:
文件路径或None
"""
try:
# 获取今日开始时间戳
today = datetime.date.today()
today_start = datetime.datetime.combine(today, datetime.time.min).timestamp()
# 查找订单明细文件
result_dir = Path("data/output")
if not result_dir.exists():
return None
# 查找符合条件的文件
candidates = []
for file_path in result_dir.glob("订单明细*.xlsx"):
if file_path.stat().st_ctime >= today_start:
candidates.append(file_path)
if not candidates:
self.logger.warning("未找到今天创建的烟草订单明细文件")
# 返回最新的文件
all_files = list(result_dir.glob("订单明细*.xlsx"))
if all_files:
all_files.sort(key=lambda x: x.stat().st_ctime, reverse=True)
return all_files[0]
return None
# 返回最新的文件
candidates.sort(key=lambda x: x.stat().st_ctime, reverse=True)
latest_file = candidates[0]
self.logger.info(f"找到最新烟草订单明细文件: {latest_file}")
return latest_file
except Exception as e:
self.logger.error(f"获取最新烟草订单文件时出错: {e}")
return None
+4 -2
View File
@@ -7,6 +7,7 @@
import os
import sys
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict
@@ -58,7 +59,8 @@ def setup_logger(name: str,
# 创建文件处理器
try:
file_handler = logging.FileHandler(log_file, encoding='utf-8')
# 使用滚动日志,限制单个日志大小与备份数量
file_handler = RotatingFileHandler(log_file, maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
file_handler.setFormatter(formatter)
file_handler.setLevel(level)
logger.addHandler(file_handler)
@@ -175,4 +177,4 @@ def cleanup_active_marker(name: str) -> None:
if os.path.exists(active_marker):
os.remove(active_marker)
except Exception as e:
print(f"无法清理日志活跃标记: {e}")
print(f"无法清理日志活跃标记: {e}")