最新提交,提交钱看看有没有优化的地方

This commit is contained in:
2025-05-02 22:46:04 +08:00
parent 693c17283b
commit 0b40caaf91
31 changed files with 3103 additions and 578 deletions
Binary file not shown.
Binary file not shown.
+69 -36
View File
@@ -116,6 +116,13 @@ class UnitConverter:
if not text or not isinstance(text, str):
return None
# 处理XX入白膜格式,如"550纯净水24入白膜"
match = re.search(r'.*?(\d+)入白膜', text)
if match:
result = f"1*{match.group(1)}"
logger.info(f"提取规格(入白膜): {text} -> {result}")
return result
# 尝试所有模式
for pattern, replacement in self.spec_patterns:
match = re.search(pattern, text)
@@ -149,6 +156,7 @@ class UnitConverter:
3. "xx纸箱" -> 1*xx (如"15纸箱" -> 1*15)
4. "xx白膜" -> 1*xx (如"12白膜" -> 1*12)
5. "xxL" 容量单位特殊处理
6. "xx(g|ml|毫升|克)*数字" -> 1*数字 (如"450g*15" -> 1*15)
Args:
name: 商品名称
@@ -162,6 +170,23 @@ class UnitConverter:
# 记录原始商品名称,用于日志
original_name = name
# 新增模式: 处理重量/容量*数字格式,如"450g*15", "450ml*15"
# 忽略重量/容量值,只提取后面的数量作为规格
weight_volume_pattern = r'.*?\d+(?:g|ml|毫升|克)[*xX×](\d+)'
match = re.search(weight_volume_pattern, name)
if match:
inferred_spec = f"1*{match.group(1)}"
logger.info(f"从名称推断规格(重量/容量*数量): {original_name} -> {inferred_spec}")
return inferred_spec
# 特殊模式1.1: "xx入白膜" 格式,如"550纯净水24入白膜" -> "1*24"
pattern1_1 = r'.*?(\d+)入白膜'
match = re.search(pattern1_1, name)
if match:
inferred_spec = f"1*{match.group(1)}"
logger.info(f"从名称推断规格(入白膜): {original_name} -> {inferred_spec}")
return inferred_spec
# 特殊模式1: "xx入纸箱" 格式,如"445水溶C血橙15入纸箱" -> "1*15"
pattern1 = r'.*?(\d+)入纸箱'
match = re.search(pattern1, name)
@@ -223,43 +248,51 @@ class UnitConverter:
if not spec or not isinstance(spec, str):
return 1, 1, None
# 处理三级包装,如1*5*12
three_level_match = re.match(r'(\d+)[*xX×](\d+)[*xX×](\d+)', spec)
if three_level_match:
try:
level1 = int(three_level_match.group(1))
level2 = int(three_level_match.group(2))
level3 = int(three_level_match.group(3))
logger.info(f"解析三级规格: {spec} -> {level1}*{level2}*{level3}")
return level1, level2, level3
except ValueError:
pass
try:
# 清理规格字符串,确保格式统一
spec = re.sub(r'\s+', '', spec) # 移除所有空白
spec = re.sub(r'[xX×]', '*', spec) # 统一分隔符为*
# 处理三级包装,如1*5*12
three_level_match = re.match(r'(\d+)[*](\d+)[*](\d+)', spec)
if three_level_match:
try:
level1 = int(three_level_match.group(1))
level2 = int(three_level_match.group(2))
level3 = int(three_level_match.group(3))
logger.info(f"解析三级规格: {spec} -> {level1}*{level2}*{level3}")
return level1, level2, level3
except ValueError:
pass
# 处理二级包装,如1*12
two_level_match = re.match(r'(\d+)[*](\d+)', spec)
if two_level_match:
try:
level1 = int(two_level_match.group(1))
level2 = int(two_level_match.group(2))
logger.info(f"解析二级规格: {spec} -> {level1}*{level2}")
return level1, level2, None
except ValueError:
pass
# 处理二级包装,如1*12
two_level_match = re.match(r'(\d+)[*xX×](\d+)', spec)
if two_level_match:
try:
level1 = int(two_level_match.group(1))
level2 = int(two_level_match.group(2))
logger.info(f"解析二级规格: {spec} -> {level1}*{level2}")
return level1, level2, None
except ValueError:
pass
# 特殊处理L/升为单位的规格,如12.5L*1
volume_match = re.match(r'([\d\.]+)[L升][*xX×](\d+)', spec)
if volume_match:
try:
volume = float(volume_match.group(1))
quantity = int(volume_match.group(2))
logger.info(f"解析容量规格: {spec} -> {volume}L*{quantity}")
return 1, quantity, None
except ValueError:
pass
# 默认值
logger.warning(f"无法解析规格: {spec},使用默认值1*1")
return 1, 1, None
# 特殊处理L/升为单位的规格,如12.5L*1
volume_match = re.match(r'([\d\.]+)[L升][*xX×](\d+)', spec)
if volume_match:
try:
volume = float(volume_match.group(1))
quantity = int(volume_match.group(2))
logger.info(f"解析容量规格: {spec} -> {volume}L*{quantity}")
return 1, quantity, None
except ValueError:
pass
# 默认值
logger.warning(f"无法解析规格: {spec},使用默认值1*1")
return 1, 1, None
except Exception as e:
logger.error(f"解析规格时出错: {e}")
return 1, 1, None
def process_unit_conversion(self, product: Dict) -> Dict:
"""
+304 -112
View File
@@ -210,121 +210,109 @@ class ExcelProcessor:
def extract_product_info(self, df: pd.DataFrame) -> List[Dict]:
"""
从数据中提取商品信息
处理后的数据中提取商品信息
支持处理不同格式的Excel文件
Args:
df: 数据
df: 数据
Returns:
商品信息列表
商品信息列表,每个商品为一个字典
"""
# 清理数据:移除全空行
df = df.dropna(how='all')
logger.info(f"移除空行后,有效数据行数: {len(df)}")
# 提取有用的列
barcode_cols = self.extract_barcode(df)
# 如果没有找到条码列,无法继续处理
if not barcode_cols:
logger.error("未找到条码列,无法处理")
return []
# 定义列名映射
column_mapping = {
'name': ['商品名称', '名称', '品名', '商品', '商品名', '商品或服务名称', '品项名', '产品名称', '品项'],
'specification': ['规格', '规格型号', '型号', '商品规格', '产品规格', '包装规格'],
'quantity': ['数量', '采购数量', '购买数量', '采购数量', '订单数量', '数量(必填)', '入库数', '入库数量'],
'unit': ['单位', '采购单位', '计量单位', '单位(必填)', '单位名称', '计价单位'],
'price': ['单价', '价格', '采购单价', '销售价', '进货价', '单价(必填)', '采购价', '参考价', '入库单价']
}
# 映射列名到标准名称
mapped_columns = {'barcode': barcode_cols[0]} # 使用第一个找到的条码列
# 记录列名映射详情
logger.info(f"使用条码列: {mapped_columns['barcode']}")
for target, possible_names in column_mapping.items():
for col in df.columns:
col_str = str(col).strip()
for name in possible_names:
if col_str == name:
mapped_columns[target] = col
logger.info(f"找到{target}列: {col}")
break
if target in mapped_columns:
break
# 如果没有找到精确匹配,尝试部分匹配
if target not in mapped_columns:
for col in df.columns:
col_str = str(col).strip().lower()
for name in possible_names:
if name.lower() in col_str:
mapped_columns[target] = col
logger.info(f"找到{target}列(部分匹配): {col}")
break
if target in mapped_columns:
break
logger.info(f"列名映射结果: {mapped_columns}")
# 检查是否有规格列
has_specification_column = 'specification' in mapped_columns
logger.info(f"是否存在规格列: {has_specification_column}")
# 提取商品信息
products = []
# 检测表头位置和数据格式
column_mapping = self._detect_column_mapping(df)
logger.info(f"列名映射结果: {column_mapping}")
# 检查是否有规格列
has_specification_column = '规格' in df.columns
logger.info(f"是否存在规格列: {has_specification_column}")
# 处理每一行数据
for idx, row in df.iterrows():
barcode = row.get(mapped_columns['barcode'])
# 跳过空行或无效条码
if pd.isna(barcode) or not self.validate_barcode(barcode):
logger.debug(f"跳过第{idx+1}行: 条码为空或无效 [{barcode}]")
try:
# 条码处理 - 确保条码总是字符串格式且不带小数点
barcode_raw = row[column_mapping['barcode']] if column_mapping.get('barcode') else ''
if pd.isna(barcode_raw) or barcode_raw == '' or str(barcode_raw).strip() in ['nan', 'None']:
continue
# 使用format_barcode函数处理条码,确保无小数点
barcode = format_barcode(barcode_raw)
# 处理数量字段,先提取数字部分再转换为浮点数
quantity_value = 0
quantity_str = ""
if column_mapping.get('quantity') and not pd.isna(row[column_mapping['quantity']]):
quantity_str = str(row[column_mapping['quantity']])
# 使用提取数字的函数
quantity_num = extract_number(quantity_str)
if quantity_num is not None:
quantity_value = quantity_num
# 基础信息
product = {
'barcode': barcode,
'name': str(row[column_mapping['name']]) if column_mapping.get('name') else '',
'quantity': quantity_value,
'price': float(row[column_mapping['price']]) if column_mapping.get('price') and not pd.isna(row[column_mapping['price']]) else 0,
'unit': str(row[column_mapping['unit']]) if column_mapping.get('unit') and not pd.isna(row[column_mapping['unit']]) else '',
'specification': '',
'package_quantity': None
}
# 清理单位
if product['unit'] == 'nan' or product['unit'] == 'None':
product['unit'] = ''
# 打印每行提取出的信息
logger.info(f"{idx+1}行: 提取商品信息 条码={product['barcode']}, 名称={product['name']}, 规格={product['specification']}, 数量={product['quantity']}, 单位={product['unit']}, 单价={product['price']}")
# 从数量字段中提取单位(如果单位字段为空)
if not product['unit'] and quantity_str:
num, unit = self.unit_converter.extract_unit_from_quantity(quantity_str)
if unit:
product['unit'] = unit
logger.info(f"从数量提取单位: {quantity_str} -> {unit}")
# 如果数量被提取出来,更新数量
if num is not None:
product['quantity'] = num
# 提取规格并解析包装数量
if '规格' in df.columns and not pd.isna(row['规格']):
product['specification'] = str(row['规格'])
package_quantity = self.parse_specification(product['specification'])
if package_quantity:
product['package_quantity'] = package_quantity
logger.info(f"解析规格: {product['specification']} -> 包装数量={package_quantity}")
else:
# 逻辑1: 如果规格为空,尝试从商品名称推断规格
if product['name']:
# 特殊处理:"营养快线原味450g*15"或"娃哈哈瓶装大AD水蜜桃450ml*15"等形式的名称
weight_volume_pattern = r'.*?\d+(?:g|ml|毫升|克)[*xX×](\d+)'
match = re.search(weight_volume_pattern, product['name'])
if match:
inferred_spec = f"1*{match.group(1)}"
inferred_qty = int(match.group(1))
product['specification'] = inferred_spec
product['package_quantity'] = inferred_qty
logger.info(f"从商品名称提取重量/容量规格: {product['name']} -> {inferred_spec}, 包装数量={inferred_qty}")
else:
# 一般情况的规格推断
inferred_spec, inferred_qty = self.infer_specification_from_name(product['name'])
if inferred_spec:
product['specification'] = inferred_spec
product['package_quantity'] = inferred_qty
logger.info(f"从商品名称推断规格: {product['name']} -> {inferred_spec}, 包装数量={inferred_qty}")
# 应用单位转换规则
product = self.unit_converter.process_unit_conversion(product)
products.append(product)
except Exception as e:
logger.error(f"提取第{idx+1}行商品信息时出错: {e}", exc_info=True)
continue
# 创建商品信息字典
product = {
'barcode': format_barcode(barcode),
'name': row.get(mapped_columns.get('name', ''), ''),
'specification': row.get(mapped_columns.get('specification', ''), ''),
'quantity': extract_number(str(row.get(mapped_columns.get('quantity', ''), 0))) or 0,
'unit': str(row.get(mapped_columns.get('unit', ''), '')),
'price': extract_number(str(row.get(mapped_columns.get('price', ''), 0))) or 0
}
logger.info(f"{idx+1}行: 提取商品信息 条码={product['barcode']}, 名称={product['name']}, 规格={product['specification']}, 数量={product['quantity']}, 单位={product['unit']}, 单价={product['price']}")
# 如果商品名称为空但商品条码不为空,则使用条码作为名称
if not product['name'] and product['barcode']:
product['name'] = f"商品 ({product['barcode']})"
logger.info(f"商品名称为空,使用条码作为名称: {product['name']}")
# 单位处理:如果单位为空但数量包含单位信息
quantity_str = str(row.get(mapped_columns.get('quantity', ''), ''))
if not product['unit'] and 'quantity' in mapped_columns:
num, unit = self.unit_converter.extract_unit_from_quantity(quantity_str)
if unit:
product['unit'] = unit
logger.info(f"从数量提取单位: {quantity_str} -> {unit}")
# 如果数量被提取出来,更新数量
if num is not None:
product['quantity'] = num
# 推断规格:如果规格为空或不存在规格列,尝试从商品名称推断
if (not product['specification'] or not has_specification_column) and product['name']:
inferred_spec = self.unit_converter.infer_specification_from_name(product['name'])
if inferred_spec:
product['specification'] = inferred_spec
logger.info(f"从商品名称推断规格: {product['name']} -> {inferred_spec}")
# 应用单位转换规则
product = self.unit_converter.process_unit_conversion(product)
products.append(product)
logger.info(f"提取到 {len(products)} 个商品信息")
return products
@@ -355,6 +343,9 @@ class ExcelProcessor:
logger.info(f"开始处理{len(products)} 个产品信息")
for product in products:
barcode = product.get('barcode', '')
# 确保条码是整数字符串
barcode = format_barcode(barcode)
if not barcode:
logger.warning(f"跳过无条码商品")
continue
@@ -573,12 +564,8 @@ class ExcelProcessor:
self.processed_files[file_path] = output_file
self._save_processed_files()
# 自动打开输出目录
try:
os.startfile(os.path.abspath(self.output_dir))
logger.info(f"已自动打开输出目录: {self.output_dir}")
except Exception as e:
logger.warning(f"无法自动打开输出目录: {e}")
# 不再自动打开输出目录
logger.info(f"采购单已保存到: {output_file}")
return output_file
@@ -602,4 +589,209 @@ class ExcelProcessor:
return None
# 处理文件
return self.process_specific_file(latest_file)
return self.process_specific_file(latest_file)
def _detect_column_mapping(self, df: pd.DataFrame) -> Dict[str, str]:
"""
检测和映射Excel表头列名
Args:
df: 数据框
Returns:
列名映射字典,键为标准列名,值为实际列名
"""
# 提取有用的列
barcode_cols = self.extract_barcode(df)
# 如果没有找到条码列,无法继续处理
if not barcode_cols:
logger.error("未找到条码列,无法处理")
return {}
# 定义列名映射
column_mapping = {
'name': ['商品名称', '名称', '品名', '商品', '商品名', '商品或服务名称', '品项名', '产品名称', '品项'],
'specification': ['规格', '规格型号', '型号', '商品规格', '产品规格', '包装规格'],
'quantity': ['数量', '采购数量', '购买数量', '采购数量', '订单数量', '数量(必填)', '入库数', '入库数量'],
'unit': ['单位', '采购单位', '计量单位', '单位(必填)', '单位名称', '计价单位'],
'price': ['单价', '价格', '采购单价', '销售价', '进货价', '单价(必填)', '采购价', '参考价', '入库单价']
}
# 映射列名到标准名称
mapped_columns = {'barcode': barcode_cols[0]} # 使用第一个找到的条码列
# 记录列名映射详情
logger.info(f"使用条码列: {mapped_columns['barcode']}")
for target, possible_names in column_mapping.items():
for col in df.columns:
col_str = str(col).strip()
for name in possible_names:
if col_str == name:
mapped_columns[target] = col
logger.info(f"找到{target}列: {col}")
break
if target in mapped_columns:
break
# 如果没有找到精确匹配,尝试部分匹配
if target not in mapped_columns:
for col in df.columns:
col_str = str(col).strip().lower()
for name in possible_names:
if name.lower() in col_str:
mapped_columns[target] = col
logger.info(f"找到{target}列(部分匹配): {col}")
break
if target in mapped_columns:
break
return mapped_columns
def infer_specification_from_name(self, product_name: str) -> Tuple[Optional[str], Optional[int]]:
"""
从商品名称推断规格
根据特定的命名规则匹配规格信息
Args:
product_name: 商品名称
Returns:
规格字符串和包装数量的元组
"""
if not product_name or not isinstance(product_name, str):
logger.warning(f"无效的商品名: {product_name}")
return None, None
product_name = product_name.strip()
# 特殊处理:重量/容量*数字格式
weight_volume_pattern = r'.*?\d+(?:g|ml|毫升|克)[*xX×](\d+)'
match = re.search(weight_volume_pattern, product_name)
if match:
inferred_spec = f"1*{match.group(1)}"
inferred_qty = int(match.group(1))
logger.info(f"从商品名称提取重量/容量规格: {product_name} -> {inferred_spec}, 包装数量={inferred_qty}")
return inferred_spec, inferred_qty
# 使用单位转换器推断规格
inferred_spec = self.unit_converter.infer_specification_from_name(product_name)
if inferred_spec:
# 解析规格中的包装数量
package_quantity = self.parse_specification(inferred_spec)
if package_quantity:
logger.info(f"从商品名称推断规格: {product_name} -> {inferred_spec}, 包装数量={package_quantity}")
return inferred_spec, package_quantity
# 特定商品规则匹配
spec_rules = [
# XX入白膜格式,如"550纯净水24入白膜"
(r'.*?(\d+)入白膜', lambda m: (f"1*{m.group(1)}", int(m.group(1)))),
# 白膜格式,如"550水24白膜"
(r'.*?(\d+)白膜', lambda m: (f"1*{m.group(1)}", int(m.group(1)))),
# 445水溶C系列
(r'445水溶C.*?(\d+)[入个]纸箱', lambda m: (f"1*{m.group(1)}", int(m.group(1)))),
# 东方树叶系列
(r'东方树叶.*?(\d+\*\d+).*纸箱', lambda m: (m.group(1), int(m.group(1).split('*')[1]))),
# 桶装
(r'(\d+\.?\d*L)桶装', lambda m: (f"{m.group(1)}*1", 1)),
# 树叶茶系
(r'树叶.*?(\d+)[入个]纸箱', lambda m: (f"1*{m.group(1)}", int(m.group(1)))),
# 茶π系列
(r'茶[πΠπ].*?(\d+)纸箱', lambda m: (f"1*{m.group(1)}", int(m.group(1)))),
# 通用入数匹配
(r'.*?(\d+)[入个](?:纸箱|箱装|白膜)', lambda m: (f"1*{m.group(1)}", int(m.group(1)))),
# 通用数字+纸箱格式
(r'.*?(\d+)纸箱', lambda m: (f"1*{m.group(1)}", int(m.group(1))))
]
# 尝试所有规则
for pattern, formatter in spec_rules:
match = re.search(pattern, product_name)
if match:
spec, qty = formatter(match)
logger.info(f"根据特定规则推断规格: {product_name} -> {spec}, 包装数量={qty}")
return spec, qty
# 尝试直接从名称中提取数字*数字格式
match = re.search(r'(\d+\*\d+)', product_name)
if match:
spec = match.group(1)
package_quantity = self.parse_specification(spec)
if package_quantity:
logger.info(f"从名称中直接提取规格: {spec}, 包装数量={package_quantity}")
return spec, package_quantity
# 最后尝试提取任何位置的数字,默认典型件装数
numbers = re.findall(r'\d+', product_name)
if numbers:
for num in numbers:
# 检查是否为典型的件装数(12/15/24/30)
if num in ['12', '15', '24', '30']:
spec = f"1*{num}"
logger.info(f"从名称中提取可能的件装数: {spec}, 包装数量={int(num)}")
return spec, int(num)
logger.warning(f"无法从商品名'{product_name}' 推断规格")
return None, None
def parse_specification(self, spec_str: str) -> Optional[int]:
"""
解析规格字符串,提取包装数量
支持格式:1*15, 1x15, 1*5*10
Args:
spec_str: 规格字符串
Returns:
包装数量,如果无法解析则返回None
"""
if not spec_str or not isinstance(spec_str, str):
return None
try:
# 清理规格字符串
spec_str = clean_string(spec_str)
# 匹配重量/容量格式,如"450g*15"、"450ml*15"
match = re.search(r'\d+(?:g|ml|毫升|克)[*xX×](\d+)', spec_str)
if match:
# 返回后面的数量
return int(match.group(1))
# 匹配1*5*10 格式的三级规格
match = re.search(r'(\d+)[\*xX×](\d+)[\*xX×](\d+)', spec_str)
if match:
# 取最后一个数字作为袋数量
return int(match.group(3))
# 匹配1*15, 1x15 格式
match = re.search(r'(\d+)[\*xX×](\d+)', spec_str)
if match:
# 取第二个数字作为包装数量
return int(match.group(2))
# 匹配24瓶/件等格式
match = re.search(r'(\d+)[瓶个支袋][/](件|箱)', spec_str)
if match:
return int(match.group(1))
# 匹配4L格式
match = re.search(r'(\d+(?:\.\d+)?)\s*[Ll升][*×]?(\d+)?', spec_str)
if match:
# 如果有第二个数字,返回它;否则返回1
return int(match.group(2)) if match.group(2) else 1
except Exception as e:
logger.warning(f"解析规格'{spec_str}'时出错: {e}")
return None