""" 单位转换模块 ---------- 提供单位转换功能,支持规格推断和单位自动提取。 """ import re import logging import os import json from typing import Dict, Tuple, Optional, Any, List, Union from ..utils.log_utils import get_logger from .handlers.barcode_mapper import BarcodeMapper from .handlers.unit_converter_handlers import ( JianUnitHandler, BoxUnitHandler, TiHeUnitHandler, GiftUnitHandler, UnitHandler ) from .validators import ProductValidator logger = get_logger(__name__) # 条码映射配置文件路径 BARCODE_MAPPING_CONFIG = "config/barcode_mappings.json" class UnitConverter: """ 单位转换器:处理不同单位之间的转换,支持从商品名称推断规格 """ def __init__(self): """ 初始化单位转换器 """ # 加载特殊条码配置 self.special_barcodes = self.load_barcode_mappings() # 规格推断的正则表达式模式 self.spec_patterns = [ # 1*6、1x12、1X20等格式 (r'(\d+)[*xX×](\d+)', r'\1*\2'), # 1*5*12和1x5x12等三级格式 (r'(\d+)[*xX×](\d+)[*xX×](\d+)', r'\1*\2*\3'), # "xx入"格式,如"12入"、"24入" (r'(\d+)入', r'1*\1'), # "xxL*1"或"xx升*1"格式 (r'([\d\.]+)[L升][*xX×]?(\d+)?', r'\1L*\2' if r'\2' else r'\1L*1'), # "xxkg*1"或"xx公斤*1"格式 (r'([\d\.]+)(?:kg|公斤)[*xX×]?(\d+)?', r'\1kg*\2' if r'\2' else r'\1kg*1'), # "xxg*1"或"xx克*1"格式 (r'([\d\.]+)(?:g|克)[*xX×]?(\d+)?', r'\1g*\2' if r'\2' else r'\1g*1'), # "xxmL*1"或"xx毫升*1"格式 (r'([\d\.]+)(?:mL|毫升)[*xX×]?(\d+)?', r'\1mL*\2' if r'\2' else r'\1mL*1'), ] # 初始化处理程序 self._init_handlers() # 初始化验证器 self.validator = ProductValidator() def _init_handlers(self): """ 初始化各种处理程序 """ # 创建条码处理程序 self.barcode_mapper = BarcodeMapper(self.special_barcodes) # 创建单位处理程序列表,优先级从高到低 self.unit_handlers: List[UnitHandler] = [ GiftUnitHandler(), # 首先处理赠品,优先级最高 JianUnitHandler(), # 处理"件"单位 BoxUnitHandler(), # 处理"箱"单位 TiHeUnitHandler() # 处理"提"和"盒"单位 ] def extract_unit_from_quantity(self, quantity_str: str) -> Tuple[Optional[float], Optional[str]]: """ 从数量字符串中提取单位 支持的格式: 1. "2箱" -> (2, "箱") 2. "3件" -> (3, "件") 3. "1.5提" -> (1.5, "提") 4. "数量: 5盒" -> (5, "盒") 5. "× 2瓶" -> (2, "瓶") Args: quantity_str: 数量字符串,如"2箱"、"5件" Returns: (数量, 单位)的元组,如果无法提取则返回(None, None) """ if not quantity_str or not isinstance(quantity_str, str): return None, None # 清理字符串,移除前后空白和一些常见前缀 cleaned_str = quantity_str.strip() for prefix in ['数量:', '数量:', '×', 'x', 'X', '*']: cleaned_str = cleaned_str.replace(prefix, '').strip() # 匹配数字+单位格式 (基本格式) basic_match = re.match(r'^([\d\.]+)\s*([^\d\s\.]+)$', cleaned_str) if basic_match: try: num = float(basic_match.group(1)) unit = basic_match.group(2) logger.info(f"从数量提取单位(基本格式): {quantity_str} -> 数量={num}, 单位={unit}") return num, unit except ValueError: pass # 匹配更复杂的格式,如包含其他文本的情况 complex_match = re.search(r'([\d\.]+)\s*([箱|件|瓶|提|盒|袋|桶|包|kg|g|升|毫升|L|ml|个])', cleaned_str) if complex_match: try: num = float(complex_match.group(1)) unit = complex_match.group(2) logger.info(f"从数量提取单位(复杂格式): {quantity_str} -> 数量={num}, 单位={unit}") return num, unit except ValueError: pass return None, None def extract_specification(self, text: str) -> Optional[str]: """ 从文本中提取规格信息 Args: text: 文本字符串 Returns: 提取的规格字符串,如果无法提取则返回None """ if not text or not isinstance(text, str): return None # 处理XX入白膜格式,如"550纯净水24入白膜" match = re.search(r'.*?(\d+)入白膜', text) if match: result = f"1*{match.group(1)}" logger.info(f"提取规格(入白膜): {text} -> {result}") return result # 尝试所有模式 for pattern, replacement in self.spec_patterns: match = re.search(pattern, text) if match: # 特殊处理三级格式,确保正确显示为1*5*12 if '*' in replacement and replacement.count('*') == 1 and len(match.groups()) >= 2: result = f"{match.group(1)}*{match.group(2)}" logger.info(f"提取规格: {text} -> {result}") return result # 特殊处理三级规格格式 elif '*' in replacement and replacement.count('*') == 2 and len(match.groups()) >= 3: result = f"{match.group(1)}*{match.group(2)}*{match.group(3)}" logger.info(f"提取三级规格: {text} -> {result}") return result # 一般情况 else: result = re.sub(pattern, replacement, text) logger.info(f"提取规格: {text} -> {result}") return result # 没有匹配任何模式 return None def infer_specification_from_name(self, name: str) -> Optional[str]: """ 从商品名称中推断规格 规则: 1. "xx入纸箱" -> 1*xx (如"15入纸箱" -> 1*15) 2. 直接包含规格 "1*15" -> 1*15 3. "xx纸箱" -> 1*xx (如"15纸箱" -> 1*15) 4. "xx白膜" -> 1*xx (如"12白膜" -> 1*12) 5. "xxL" 容量单位特殊处理 6. "xx(g|ml|毫升|克)*数字" -> 1*数字 (如"450g*15" -> 1*15) Args: name: 商品名称 Returns: 推断的规格,如果无法推断则返回None """ if not name or not isinstance(name, str): return None # 记录原始商品名称,用于日志 original_name = name # 新增模式: 处理重量/容量*数字格式,如"450g*15", "450ml*15" # 忽略重量/容量值,只提取后面的数量作为规格 weight_volume_pattern = r'.*?\d+(?:g|ml|毫升|克)[*xX×](\d+)' match = re.search(weight_volume_pattern, name) if match: inferred_spec = f"1*{match.group(1)}" logger.info(f"从名称推断规格(重量/容量*数量): {original_name} -> {inferred_spec}") return inferred_spec # 特殊模式1.1: "xx入白膜" 格式,如"550纯净水24入白膜" -> "1*24" pattern1_1 = r'.*?(\d+)入白膜' match = re.search(pattern1_1, name) if match: inferred_spec = f"1*{match.group(1)}" logger.info(f"从名称推断规格(入白膜): {original_name} -> {inferred_spec}") return inferred_spec # 特殊模式1: "xx入纸箱" 格式,如"445水溶C血橙15入纸箱" -> "1*15" pattern1 = r'.*?(\d+)入纸箱' match = re.search(pattern1, name) if match: inferred_spec = f"1*{match.group(1)}" logger.info(f"从名称推断规格(入纸箱): {original_name} -> {inferred_spec}") return inferred_spec # 特殊模式2: 直接包含规格,如"500-东方树叶-乌龙茶1*15-纸箱装" -> "1*15" pattern2 = r'.*?(\d+)[*xX×](\d+).*' match = re.search(pattern2, name) if match: inferred_spec = f"{match.group(1)}*{match.group(2)}" logger.info(f"从名称推断规格(直接格式): {original_name} -> {inferred_spec}") return inferred_spec # 特殊模式3: "xx纸箱" 格式,如"500茶π蜜桃乌龙15纸箱" -> "1*15" pattern3 = r'.*?(\d+)纸箱' match = re.search(pattern3, name) if match: inferred_spec = f"1*{match.group(1)}" logger.info(f"从名称推断规格(纸箱): {original_name} -> {inferred_spec}") return inferred_spec # 特殊模式4: "xx白膜" 格式,如"1.5L水12白膜" 或 "550水24白膜" -> "1*12" 或 "1*24" pattern4 = r'.*?(\d+)白膜' match = re.search(pattern4, name) if match: inferred_spec = f"1*{match.group(1)}" logger.info(f"从名称推断规格(白膜): {original_name} -> {inferred_spec}") return inferred_spec # 特殊模式5: 容量单位带数量格式 "1.8L*8瓶" -> "1.8L*8" volume_count_pattern = r'.*?([\d\.]+)[Ll升][*×xX](\d+).*' match = re.search(volume_count_pattern, name) if match: volume = match.group(1) count = match.group(2) inferred_spec = f"{volume}L*{count}" logger.info(f"从名称推断规格(容量*数量): {original_name} -> {inferred_spec}") return inferred_spec # 特殊模式6: 简单容量单位如"12.9L桶装水" -> "12.9L*1" simple_volume_pattern = r'.*?([\d\.]+)[Ll升].*' match = re.search(simple_volume_pattern, name) if match: inferred_spec = f"{match.group(1)}L*1" logger.info(f"从名称推断规格(简单容量): {original_name} -> {inferred_spec}") return inferred_spec # 尝试通用模式匹配 spec = self.extract_specification(name) if spec: logger.info(f"从名称推断规格(通用模式): {original_name} -> {spec}") return spec return None def parse_specification(self, spec: str) -> Tuple[int, int, Optional[int]]: """ 解析规格字符串,支持1*12和1*5*12等格式 Args: spec: 规格字符串 Returns: (一级包装, 二级包装, 三级包装)元组,如果是二级包装,第三个值为None """ if not spec or not isinstance(spec, str): return 1, 1, None try: # 清理规格字符串,确保格式统一 spec = re.sub(r'\s+', '', spec) # 移除所有空白 spec = re.sub(r'[xX×]', '*', spec) # 统一分隔符为* logger.debug(f"解析规格: {spec}") # 新增:处理“1件=12桶/袋/盒...”等等式规格,统一为1*12 eq_match = re.match(r'(\d+(?:\.\d+)?)\s*(?:件|箱|提|盒)\s*[==]\s*(\d+)\s*(?:瓶|桶|盒|支|个|袋|罐|包|卷)', spec) if eq_match: try: level2 = int(eq_match.group(2)) logger.info(f"解析等式规格: {spec} -> 1*{level2}") return 1, level2, None except ValueError: pass # 处理三级包装,如1*5*12 three_level_match = re.match(r'(\d+)[*](\d+)[*](\d+)', spec) if three_level_match: try: level1 = int(three_level_match.group(1)) level2 = int(three_level_match.group(2)) level3 = int(three_level_match.group(3)) logger.info(f"解析三级规格: {spec} -> {level1}*{level2}*{level3}") return level1, level2, level3 except ValueError: pass # 处理带重量单位的规格,如5kg*6、500g*12等 weight_match = re.match(r'([\d\.]+)(?:kg|g|克|千克|公斤)[*](\d+)', spec, re.IGNORECASE) if weight_match: try: # 对于重量单位,使用1作为一级包装,后面的数字作为二级包装 level2 = int(weight_match.group(2)) logger.info(f"解析重量规格: {spec} -> 1*{level2}") return 1, level2, None except ValueError: pass # 处理带容量单位的规格,如500ml*15, 1L*12等 ml_match = re.match(r'(\d+)(?:ml|毫升)[*](\d+)', spec, re.IGNORECASE) if ml_match: try: # 对于ml单位,使用1作为一级包装,后面的数字作为二级包装 level2 = int(ml_match.group(2)) logger.info(f"解析容量(ml)规格: {spec} -> 1*{level2}") return 1, level2, None except ValueError: pass # 处理带L单位的规格,如1L*12等 l_match = re.match(r'(\d+(?:\.\d+)?)[Ll升][*](\d+)', spec) if l_match: try: # 对于L单位,正确提取第二部分作为包装数量 level2 = int(l_match.group(2)) logger.info(f"解析容量(L)规格: {spec} -> 1*{level2}") return 1, level2, None except ValueError: pass # 处理二级包装,如1*12 two_level_match = re.match(r'(\d+)[*](\d+)', spec) if two_level_match: try: level1 = int(two_level_match.group(1)) level2 = int(two_level_match.group(2)) logger.info(f"解析二级规格: {spec} -> {level1}*{level2}") return level1, level2, None except ValueError: pass # 特殊处理L/升为单位的规格,如12.5L*1 volume_match = re.match(r'([\d\.]+)[L升][*xX×](\d+)', spec) if volume_match: try: volume = float(volume_match.group(1)) quantity = int(volume_match.group(2)) logger.info(f"解析容量规格: {spec} -> {volume}L*{quantity}") return 1, quantity, None except ValueError: pass # 处理不规范格式,如IL*12, 6oo*12等,从中提取数字部分作为包装数量 # 只要规格中包含*和数字,就尝试提取*后面的数字作为件数 irregular_match = re.search(r'[^0-9]*\*(\d+)', spec) if irregular_match: try: level2 = int(irregular_match.group(1)) logger.info(f"解析不规范规格: {spec} -> 1*{level2}") return 1, level2, None except ValueError: pass # 默认值 logger.warning(f"无法解析规格: {spec},使用默认值1*1") return 1, 1, None except Exception as e: logger.error(f"解析规格时出错: {e}") return 1, 1, None def process_unit_conversion(self, product: Dict) -> Dict: """ 处理单位转换,按照以下规则: 1. 特殊条码: 优先处理特殊条码 2. 赠品处理: 对于赠品,维持数量转换但单价为0 3. "件"单位: 数量×包装数量, 单价÷包装数量, 单位转为"瓶" 4. "箱"单位: 数量×包装数量, 单价÷包装数量, 单位转为"瓶" 5. "提"和"盒"单位: 如果是三级规格, 按件处理; 如果是二级规格, 保持不变 6. 其他单位: 保持不变 Args: product: 商品信息字典 Returns: 处理后的商品信息字典 """ # 首先验证商品数据 product = self.validator.validate_product(product) # 复制原始数据,避免修改原始字典 result = product.copy() barcode = result.get('barcode', '') specification = result.get('specification', '') # 跳过无效数据 if not barcode: return result # 先处理条码映射 result = self.barcode_mapper.map_barcode(result) # 如果没有规格信息,无法进行单位转换 if not specification: # 尝试从商品名称推断规格 inferred_spec = self.infer_specification_from_name(result.get('name', '')) if inferred_spec: result['specification'] = inferred_spec logger.info(f"从商品名称推断规格: {result.get('name', '')} -> {inferred_spec}") else: return result # 解析规格信息 level1, level2, level3 = self.parse_specification(result.get('specification', '')) # 使用单位处理程序处理单位转换 for handler in self.unit_handlers: if handler.can_handle(result): return handler.handle(result, level1, level2, level3) # 没有找到适用的处理程序,保持不变 logger.info(f"其他单位处理: 保持原样 数量: {result.get('quantity', 0)}, 单价: {result.get('price', 0)}, 单位: {result.get('unit', '')}") return result def load_barcode_mappings(self) -> Dict[str, Dict[str, Any]]: """ 从配置文件加载条码映射 Returns: 条码映射字典 """ # 默认映射 default_mappings = { '6925019900087': { 'multiplier': 10, 'target_unit': '瓶', 'description': '特殊处理:数量*10,单位转换为瓶' }, '6921168593804': { 'multiplier': 30, 'target_unit': '瓶', 'description': 'NFC产品特殊处理:每箱30瓶' }, '6901826888138': { 'multiplier': 30, 'target_unit': '瓶', 'fixed_price': 112/30, 'specification': '1*30', 'description': '特殊处理: 规格1*30,数量*30,单价=112/30' }, # 条码映射配置 '6920584471055': { 'map_to': '6920584471017', 'description': '条码映射:6920584471055 -> 6920584471017' }, '6925861571159': { 'map_to': '69021824', 'description': '条码映射:6925861571159 -> 69021824' }, '6923644268923': { 'map_to': '6923644268480', 'description': '条码映射:6923644268923 -> 6923644268480' }, # 添加特殊条码6958620703716,既需要特殊处理又需要映射 '6958620703716': { 'specification': '1*14', 'map_to': '6958620703907', 'description': '特殊处理: 规格1*14,同时映射到6958620703907' } } try: # 检查配置文件是否存在 if os.path.exists(BARCODE_MAPPING_CONFIG): with open(BARCODE_MAPPING_CONFIG, 'r', encoding='utf-8') as file: mappings = json.load(file) logger.info(f"成功加载条码映射配置,共{len(mappings)}项") return mappings else: # 创建默认配置文件 self.save_barcode_mappings(default_mappings) logger.info(f"创建默认条码映射配置,共{len(default_mappings)}项") return default_mappings except Exception as e: logger.error(f"加载条码映射配置失败: {e}") return default_mappings def save_barcode_mappings(self, mappings: Dict[str, Dict[str, Any]]) -> bool: """ 保存条码映射到配置文件 Args: mappings: 条码映射字典 Returns: 保存是否成功 """ try: # 确保配置目录存在 os.makedirs(os.path.dirname(BARCODE_MAPPING_CONFIG), exist_ok=True) # 写入配置文件 with open(BARCODE_MAPPING_CONFIG, 'w', encoding='utf-8') as file: json.dump(mappings, file, ensure_ascii=False, indent=2) logger.info(f"条码映射配置保存成功,共{len(mappings)}项") return True except Exception as e: logger.error(f"保存条码映射配置失败: {e}") return False def update_barcode_mappings(self, new_mappings: Dict[str, Dict[str, Any]]) -> bool: """ 更新条码映射配置 Args: new_mappings: 新的条码映射字典 Returns: 更新是否成功 """ self.special_barcodes = new_mappings return self.save_barcode_mappings(new_mappings)