feat: update barcode mappings and improve build script robustness

This commit is contained in:
侯欢 2026-03-25 20:49:24 +08:00
parent 76f7adddd5
commit 83405a9b8e
7 changed files with 261 additions and 51 deletions

View File

@ -7,6 +7,10 @@
## 核心功能 ## 核心功能
- 图片/Excel处理拖拽或选择文件生成银豹采购单`templates/银豹-采购单模板.xls` - 图片/Excel处理拖拽或选择文件生成银豹采购单`templates/银豹-采购单模板.xls`
- **无界面自动化接口** (headless_api.py):支持与 OpenClaw 等自动化平台对接。
- `python headless_api.py [图片路径]`:处理指定图片。
- `python headless_api.py`:自动处理 `data/input` 目录下最新的图片。
- 输出:成功时在标准输出打印最终 Excel 的绝对路径,失败时在标准错误打印错误信息。
- 供应商管理(系统设置 → 供应商管理): - 供应商管理(系统设置 → 供应商管理):
- 基本信息、文件名匹配、表头行号 - 基本信息、文件名匹配、表头行号
- 列映射与表头预览前30行、表头选行、加载列、智能映射、导入/导出 - 列映射与表头预览前30行、表头选行、加载列、智能映射、导入/导出

View File

@ -108,3 +108,77 @@ class OrderService:
logger.info("OrderService开始合并所有采购单") logger.info("OrderService开始合并所有采购单")
return self.order_merger.process(file_paths, progress_cb) return self.order_merger.process(file_paths, progress_cb)
def validate_unit_price(self, result_path: str) -> List[str]:
"""
校验采购单单价与商品资料进货价的差异
Args:
result_path: 待校验的采购单路径
Returns:
差异信息列表无差异返回空列表
"""
try:
import pandas as pd
import os
def _read_df(path):
ap = os.path.abspath(path)
if ap.lower().endswith('.xlsx'):
return pd.read_excel(ap, engine='openpyxl')
else:
return pd.read_excel(ap, engine='xlrd')
item_path = os.path.join('templates', '商品资料.xlsx')
if not os.path.exists(item_path):
logger.warning(f"未找到商品资料文件: {item_path}")
return []
df_item = _read_df(item_path)
df_res = _read_df(result_path)
def _find_col(df, candidates, contains=None):
cols = list(df.columns)
for c in candidates:
if c in cols:
return c
if contains:
for c in cols:
if contains in str(c):
return c
return None
item_barcode_col = _find_col(df_item, ['商品条码','商品条码(小条码)','条码','barcode'], contains='条码')
item_price_col = _find_col(df_item, ['进货价','进货价(必填)'], contains='进货价')
res_barcode_col = _find_col(df_res, ['条码','barcode'], contains='条码')
res_price_col = _find_col(df_res, ['采购单价','unit_price','单价'], contains='单价')
if not all([item_barcode_col, item_price_col, res_barcode_col, res_price_col]):
logger.warning("未能在文件和商品资料中找到完整的校验列(条码、单价)")
return []
item_map = df_item[[item_barcode_col, item_price_col]].dropna()
item_map[item_price_col] = pd.to_numeric(item_map[item_price_col], errors='coerce')
item_map = item_map.dropna()
imap = dict(zip(item_map[item_barcode_col].astype(str).str.strip(), item_map[item_price_col]))
df_res['_bc_'] = df_res[res_barcode_col].astype(str).str.strip()
df_res['_res_price_'] = pd.to_numeric(df_res[res_price_col], errors='coerce')
df_res['_item_price_'] = df_res['_bc_'].map(imap)
df_check = df_res.dropna(subset=['_res_price_','_item_price_'])
df_check['_diff_'] = (df_check['_res_price_'] - df_check['_item_price_']).abs()
bad = df_check[df_check['_diff_'] > 1.0]
results = []
if not bad.empty:
for i in range(len(bad)):
r = bad.iloc[i]
results.append(f"条码 {r['_bc_']}: 采购单价={r['_res_price_']} vs 进货价={r['_item_price_']} 差异={r['_diff_']:.2f}")
return results
except Exception as e:
logger.error(f"单价校验过程中发生错误: {e}")
return []

View File

@ -201,8 +201,18 @@ def create_portable_package():
# 创建发布目录 # 创建发布目录
release_dir = Path('release') release_dir = Path('release')
if release_dir.exists(): if release_dir.exists():
try:
shutil.rmtree(release_dir) shutil.rmtree(release_dir)
release_dir.mkdir() except Exception as e:
print(f"警告: 无法完全清理发布目录 (可能文件被占用): {e}")
# 如果目录还在,尝试清理能清理的部分
for item in release_dir.iterdir():
try:
if item.is_dir(): shutil.rmtree(item)
else: item.unlink()
except Exception: pass
release_dir.mkdir(exist_ok=True)
# 复制exe文件 # 复制exe文件
exe_file = Path('dist/OCR订单处理系统.exe') exe_file = Path('dist/OCR订单处理系统.exe')

View File

@ -27,4 +27,4 @@ max_file_size_mb = 4
purchase_order = 银豹-采购单模板.xls purchase_order = 银豹-采购单模板.xls
[App] [App]
version = 2026.03.25.1945 version = 2026.03.25.2048

View File

@ -187,6 +187,50 @@
"map_to": "6907992513195", "map_to": "6907992513195",
"description": "条码映射6907992513157 -> 6907992513195" "description": "条码映射6907992513157 -> 6907992513195"
}, },
"6902083893842": {
"map_to": "6902083907150",
"description": "条码映射6902083893842 -> 6902083907150"
},
"6902083904685": {
"map_to": "6902083905217",
"description": "条码映射6902083904685 -> 6902083905217"
},
"6917878036849": {
"map_to": "6917878036847",
"description": "条码映射6917878036849 -> 6917878036847"
},
"6903979000078": {
"map_to": "6903979000061",
"description": "条码映射6903979000078 -> 6903979000061"
},
"6937003706353": {
"map_to": "6937003706360",
"description": "条码映射6937003706353 -> 6937003706360"
},
"6923644242961": {
"map_to": "6907992100043",
"description": "条码映射6923644242961 -> 6907992100043"
},
"6923644258382": {
"map_to": "6923644252823",
"description": "条码映射6923644258382 -> 6923644252823"
},
"6923450657430": {
"map_to": "69029110",
"description": "条码映射6923450657430 -> 69029110"
},
"6923450660232": {
"map_to": "6923450690123",
"description": "条码映射6923450660232 -> 6923450690123"
},
"6923450657614": {
"map_to": "6923450657607",
"description": "条码映射6923450657614 -> 6923450657607"
},
"6972556000022": {
"map_to": "6977826050028",
"description": "条码映射6972556000022 -> 6977826050028"
},
"6925019900087": { "6925019900087": {
"multiplier": 10, "multiplier": 10,
"target_unit": "瓶", "target_unit": "瓶",

112
headless_api.py Normal file
View File

@ -0,0 +1,112 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
OCR订单处理系统 - 无界面自动化接口
-----------------------------
专为与 openclaw 等自动化平台对接设计
处理流程输入图片 -> OCR识别 -> 数据清洗 -> 价格校验 -> 输出结果路径
"""
import os
import sys
import logging
import time
from pathlib import Path
from typing import Optional
# 添加当前目录到路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from app.config.settings import ConfigManager
from app.services.ocr_service import OCRService
from app.services.order_service import OrderService
from app.core.utils.log_utils import set_log_level
# 配置日志输出到 stderr以免干扰 stdout 的路径输出
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stderr
)
logger = logging.getLogger("HeadlessAPI")
def get_latest_input_image() -> Optional[str]:
"""获取 input 目录中最新的图片文件"""
input_dir = Path("data/input")
if not input_dir.exists():
return None
extensions = ['.jpg', '.jpeg', '.png', '.bmp']
files = []
for ext in extensions:
files.extend(input_dir.glob(f"*{ext}"))
files.extend(input_dir.glob(f"*{ext.upper()}"))
if not files:
return None
# 按修改时间排序
latest_file = max(files, key=lambda p: p.stat().st_mtime)
return str(latest_file)
def run_pipeline(image_path: Optional[str] = None):
"""运行处理流水线"""
try:
# 1. 确定输入文件
if not image_path:
image_path = get_latest_input_image()
if not image_path:
print("ERROR: No input image found.", file=sys.stderr)
return None
logger.info(f"开始处理图片: {image_path}")
# 2. 初始化服务
config_manager = ConfigManager()
ocr_service = OCRService(config_manager)
order_service = OrderService(config_manager)
# 3. OCR 识别
start_time = time.perf_counter()
excel_intermediate = ocr_service.process_image(image_path)
if not excel_intermediate:
print(f"ERROR: OCR failed for {image_path}", file=sys.stderr)
return None
# 4. Excel 处理与清洗
final_excel = order_service.process_excel(excel_intermediate)
if not final_excel:
print(f"ERROR: Excel processing failed for {excel_intermediate}", file=sys.stderr)
return None
# 5. 单价校验 (输出到 stderr)
discrepancies = order_service.validate_unit_price(final_excel)
if discrepancies:
print(f"WARNING: Price validation found {len(discrepancies)} issues:", file=sys.stderr)
for d in discrepancies:
print(f" - {d}", file=sys.stderr)
else:
logger.info("单价校验通过")
duration = time.perf_counter() - start_time
logger.info(f"处理完成,耗时: {duration:.2f}s")
# 6. 输出最终结果路径到 stdout
# 确保是绝对路径
abs_path = os.path.abspath(final_excel)
print(abs_path)
return abs_path
except Exception as e:
import traceback
print(f"CRITICAL ERROR: {str(e)}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
return None
if __name__ == "__main__":
# 支持命令行传入图片路径
input_path = sys.argv[1] if len(sys.argv) > 1 else None
result = run_pipeline(input_path)
sys.exit(0 if result else 1)

View File

@ -2503,59 +2503,25 @@ def select_excel_file(log_widget):
def validate_unit_price_against_item_data(result_path: str, log_widget=None): def validate_unit_price_against_item_data(result_path: str, log_widget=None):
try: try:
import pandas as pd from app.services.order_service import OrderService
import os service = OrderService()
def _read_df(path): bad_results = service.validate_unit_price(result_path)
ap = os.path.abspath(path)
if ap.lower().endswith('.xlsx'): if bad_results:
return pd.read_excel(ap, engine='openpyxl')
else:
return pd.read_excel(ap, engine='xlrd')
item_path = os.path.join('templates', '商品资料.xlsx')
if not os.path.exists(item_path):
return
df_item = _read_df(item_path)
df_res = _read_df(result_path)
def _find_col(df, candidates, contains=None):
cols = list(df.columns)
for c in candidates:
if c in cols:
return c
if contains:
for c in cols:
if contains in str(c):
return c
return None
item_barcode_col = _find_col(df_item, ['商品条码','商品条码(小条码)','条码','barcode'], contains='条码')
item_price_col = _find_col(df_item, ['进货价','进货价(必填)'], contains='进货价')
res_barcode_col = _find_col(df_res, ['条码','barcode'], contains='条码')
res_price_col = _find_col(df_res, ['采购单价','unit_price','单价'], contains='单价')
if not all([item_barcode_col, item_price_col, res_barcode_col, res_price_col]):
return
item_map = df_item[[item_barcode_col, item_price_col]].dropna()
item_map[item_price_col] = pd.to_numeric(item_map[item_price_col], errors='coerce')
item_map = item_map.dropna()
imap = dict(zip(item_map[item_barcode_col].astype(str).str.strip(), item_map[item_price_col]))
df_res['_bc_'] = df_res[res_barcode_col].astype(str).str.strip()
df_res['_res_price_'] = pd.to_numeric(df_res[res_price_col], errors='coerce')
df_res['_item_price_'] = df_res['_bc_'].map(imap)
df_check = df_res.dropna(subset=['_res_price_','_item_price_'])
df_check['_diff_'] = (df_check['_res_price_'] - df_check['_item_price_']).abs()
bad = df_check[df_check['_diff_'] > 1.0]
if not bad.empty:
lines = []
for i in range(min(len(bad), 10)):
r = bad.iloc[i]
lines.append(f"条码 {r['_bc_']}: 采购单价={r['_res_price_']} vs 进货价={r['_item_price_']} 差异={r['_diff_']:.2f}")
import tkinter.messagebox as mb import tkinter.messagebox as mb
mb.showwarning("单价校验提示", f"存在{len(bad)}条单价与商品资料进货价差异超过1元:\n" + "\n".join(lines)) display_count = min(len(bad_results), 10)
msg = f"存在{len(bad_results)}条单价与商品资料进货价差异超过1元:\n" + "\n".join(bad_results[:display_count])
if len(bad_results) > 10:
msg += f"\n...(其余 {len(bad_results)-10} 条已省略)"
mb.showwarning("单价校验提示", msg)
if log_widget is not None: if log_widget is not None:
add_to_log(log_widget, f"单价校验发现{len(bad)}条差异>1元\n", "warning") add_to_log(log_widget, f"单价校验发现{len(bad_results)}条差异>1元\n", "warning")
else: else:
if log_widget is not None: if log_widget is not None:
add_to_log(log_widget, "单价校验通过(差异<=1元\n", "success") add_to_log(log_widget, "单价校验通过(差异<=1元\n", "success")
except Exception: except Exception as e:
pass if log_widget is not None:
add_to_log(log_widget, f"单价校验出错: {str(e)}\n", "error")
def clean_cache(log_widget): def clean_cache(log_widget):
"""清除处理缓存""" """清除处理缓存"""