feat(供应商管理): 新增规则引擎与词典配置支持

refactor(处理器): 重构通用供应商处理器以支持规则引擎
docs: 更新README与文档说明供应商管理功能
build: 更新打包脚本注入版本信息
test: 添加规则引擎单元测试
This commit is contained in:
2025-12-12 13:46:00 +08:00
parent 73d17836d7
commit fb12e63c4c
33 changed files with 8678 additions and 62 deletions
+3 -4
View File
@@ -605,17 +605,16 @@ class ExcelProcessor:
Returns:
表头行索引,如果未找到则返回None
"""
# 定义可能的表头关键词
header_keywords = [
'条码', '条形码', '商品条码', '商品名称', '名称', '数量', '单位', '单价',
'规格', '商品编码', '采购数量', '采购单位', '商品', '品名'
'规格', '商品编码', '采购数量', '采购单位', '商品', '品名',
'金额', '小计', '总计', '合计', '合计金额'
]
# 存储每行的匹配分数
row_scores = []
# 遍历前10行(通常表头不会太靠后)
max_rows_to_check = min(10, len(df))
max_rows_to_check = min(30, len(df))
for row in range(max_rows_to_check):
row_data = df.iloc[row]
score = 0
+1 -1
View File
@@ -298,4 +298,4 @@ class ProductValidator:
logger.warning(f"数量验证失败: {error_msg}")
validated_product['quantity'] = 0.0
return validated_product
return validated_product
+150
View File
@@ -0,0 +1,150 @@
import re
import pandas as pd
from typing import List, Dict, Any, Optional
def _split_quantity_unit(df: pd.DataFrame, source: str, dictionary: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
if source in df.columns:
vals = df[source].astype(str).fillna("")
nums = []
units = []
default_unit = (dictionary or {}).get("default_unit", "")
unit_synonyms = (dictionary or {}).get("unit_synonyms", {})
for v in vals:
m = re.search(r"(\d+(?:\.\d+)?)(箱|件|提|盒|瓶)", v)
if m:
nums.append(float(m.group(1)))
u = unit_synonyms.get(m.group(2), m.group(2))
units.append(u)
else:
try:
nums.append(float(v))
units.append(unit_synonyms.get(default_unit, default_unit))
except:
nums.append(0.0)
units.append(unit_synonyms.get(default_unit, default_unit))
df["quantity"] = nums
df["unit"] = units
return df
def _extract_spec_from_name(df: pd.DataFrame, source: str, dictionary: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
if source in df.columns:
names = df[source].astype(str).fillna("")
specs = []
packs = []
ignore_words = (dictionary or {}).get("ignore_words", [])
name_patterns = (dictionary or {}).get("name_patterns", [])
for s in names:
if ignore_words:
for w in ignore_words:
s = s.replace(w, "")
matched = False
for pat in name_patterns:
try:
m = re.search(pat, s)
if m and len(m.groups()) >= 2:
try:
qty = int(m.group(len(m.groups())))
except:
qty = None
specs.append(s)
packs.append(qty)
matched = True
break
except Exception:
pass
if matched:
continue
m = re.search(r"(\d+(?:\.\d+)?)(ml|l|升|毫升)[*×xX](\d+)", s, re.IGNORECASE)
if m:
specs.append(f"{m.group(1)}{m.group(2)}*{m.group(3)}")
packs.append(int(m.group(3)))
continue
m2 = re.search(r"(\d+)[*×xX](\d+)", s)
if m2:
specs.append(f"1*{m2.group(2)}")
packs.append(int(m2.group(2)))
continue
m3 = re.search(r"(\d{2,3})\D*(\d{1,3})\D*", s)
if m3:
specs.append(f"1*{m3.group(2)}")
packs.append(int(m3.group(2)))
continue
specs.append("")
packs.append(None)
df["specification"] = df.get("specification", pd.Series(specs))
df["package_quantity"] = packs
return df
def _normalize_unit(df: pd.DataFrame, target: str, unit_map: Dict[str, str], dictionary: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
if target in df.columns:
df[target] = df[target].astype(str)
df[target] = df[target].apply(lambda u: unit_map.get(u, u))
pack_multipliers = (dictionary or {}).get("pack_multipliers", {})
default_pq = (dictionary or {}).get("default_package_quantity", 1)
try:
if "quantity" in df.columns:
def convert_qty(row):
u = row.get(target)
q = row.get("quantity")
pq = row.get("package_quantity")
if u in ("", "", "", ""):
mult = pq or pack_multipliers.get(u, default_pq)
if pd.notna(q) and pd.notna(mult) and float(mult) > 0:
return float(q) * float(mult)
return q
df["quantity"] = df.apply(convert_qty, axis=1)
df[target] = df[target].apply(lambda u: "" if u in ("","","","") else u)
except Exception:
pass
return df
def _compute_quantity_from_total(df: pd.DataFrame) -> pd.DataFrame:
if "quantity" in df.columns and "unit_price" in df.columns:
qty = df["quantity"].fillna(0)
up = pd.to_numeric(df.get("unit_price", 0), errors="coerce").fillna(0)
tp = pd.to_numeric(df.get("total_price", 0), errors="coerce").fillna(0)
need = (qty <= 0) & (up > 0) & (tp > 0)
df.loc[need, "quantity"] = (tp[need] / up[need]).round(6)
return df
def _fill_missing(df: pd.DataFrame, fills: Dict[str, Any]) -> pd.DataFrame:
for k, v in fills.items():
if k in df.columns:
df[k] = df[k].fillna(v)
else:
df[k] = v
return df
def _mark_gift(df: pd.DataFrame) -> pd.DataFrame:
df["is_gift"] = False
tp = df.get("total_price")
up = df.get("unit_price")
flags = pd.Series([False]*len(df))
if tp is not None:
tpn = pd.to_numeric(tp, errors="coerce").fillna(0)
flags = flags | (tpn == 0)
if up is not None:
upn = pd.to_numeric(up, errors="coerce").fillna(0)
flags = flags | (upn == 0)
if "name" in df.columns:
flags = flags | df["name"].astype(str).str.contains(r"赠品|^o$|^O$", regex=True)
df.loc[flags, "is_gift"] = True
return df
def apply_rules(df: pd.DataFrame, rules: List[Dict[str, Any]], dictionary: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
out = df.copy()
for r in rules or []:
t = r.get("type")
if t == "split_quantity_unit":
out = _split_quantity_unit(out, r.get("source", "quantity"), dictionary)
elif t == "extract_spec_from_name":
out = _extract_spec_from_name(out, r.get("source", "name"), dictionary)
elif t == "normalize_unit":
out = _normalize_unit(out, r.get("target", "unit"), r.get("map", {}), dictionary)
elif t == "compute_quantity_from_total":
out = _compute_quantity_from_total(out)
elif t == "fill_missing":
out = _fill_missing(out, r.get("fills", {}))
elif t == "mark_gift":
out = _mark_gift(out)
return out
@@ -11,6 +11,7 @@ from pathlib import Path
from ..base import BaseProcessor
from ...utils.log_utils import get_logger
from ...handlers.rule_engine import apply_rules
logger = get_logger(__name__)
@@ -118,10 +119,17 @@ class GenericSupplierProcessor(BaseProcessor):
self.logger.error("数据清洗失败")
self.log_processing_end(input_file, success=False)
return None
try:
rules = self.supplier_config.get('rules', [])
dictionary = self.supplier_config.get('dictionary')
standardized_df = apply_rules(cleaned_df, rules, dictionary)
except Exception as e:
self.logger.warning(f"规则执行失败: {e}")
standardized_df = cleaned_df
# 步骤4: 计算处理
self.logger.info("步骤4/4: 计算处理...")
calculated_df = self._apply_calculations(cleaned_df)
calculated_df = self._apply_calculations(standardized_df)
if calculated_df is None:
self.logger.error("计算处理失败")
self.log_processing_end(input_file, success=False)
@@ -205,15 +213,26 @@ class GenericSupplierProcessor(BaseProcessor):
数据DataFrame或None
"""
try:
df = self._read_excel_safely(file_path)
if df.empty:
specified = self.supplier_config.get('header_row')
if specified is not None:
try:
df = self._read_excel_safely(file_path, header=int(specified))
except Exception:
df = self._read_excel_safely(file_path)
else:
df0 = self._read_excel_safely(file_path, header=None)
if df0 is None:
return None
header_row = self._find_header_row(df0)
if header_row is not None:
df = self._read_excel_safely(file_path, header=header_row)
else:
df = self._read_excel_safely(file_path)
if df is None or df.empty:
self.logger.warning("数据文件为空")
return None
self.logger.info(f"成功读取数据,形状: {df.shape}")
return df
except Exception as e:
self.logger.error(f"读取数据失败: {e}")
return None
@@ -235,6 +254,40 @@ class GenericSupplierProcessor(BaseProcessor):
except Exception as e:
self.logger.error(f"读取Excel失败: {file_path} - {e}")
raise
def _find_header_row(self, df: pd.DataFrame) -> Optional[int]:
try:
header_keywords = [
'条码','条形码','商品编码','商品名称','名称','数量','单位','单价','规格',
'金额','小计','总计','合计','合计金额'
]
scores = []
rows_to_check = min(30, len(df))
for r in range(rows_to_check):
row = df.iloc[r]
score = 0
for cell in row:
if isinstance(cell, str):
s = cell.strip().lower()
for kw in header_keywords:
if kw.lower() in s:
score += 5
non_empty = row.count()
if non_empty / max(1, len(row)) > 0.5:
score += 2
str_count = sum(1 for c in row if isinstance(c, str))
if str_count / max(1, len(row)) > 0.5:
score += 3
scores.append((r, score))
scores.sort(key=lambda x: x[1], reverse=True)
if scores and scores[0][1] >= 5:
return scores[0][0]
for r in range(len(df)):
if df.iloc[r].notna().sum() > 3:
return r
return None
except Exception:
return None
def _apply_column_mapping(self, df: pd.DataFrame) -> Optional[pd.DataFrame]:
"""应用列映射