feat: 商品记忆库 — 从OCR结果学习,逐步替代OCR识别

- 扩展 product_db.py: schema迁移(specification/source/confidence/usage_count/last_seen)
  + 学习逻辑(learn_from_product)、置信度系统、批量查询、导入导出、云端同步
- 注入处理管线: processor.py 在提取产品后调用 _apply_memory() 用记忆补全OCR
  + _is_spec_suspicious() 检测OCR规格质量,处理完后自动学习
- order_service.py 创建共享 ProductDatabase 实例
- dialog_utils.py 新增商品记忆库云端同步条目
- 新建 memory_editor.py: Treeview查看/编辑/搜索/删除/重新导入
- main_window.py 系统设置区新增"商品记忆库"按钮
- build_exe.py 添加 memory_editor 到 hidden_imports
@
This commit is contained in:
2026-05-05 02:40:48 +08:00
parent 5cf9a98d9a
commit d267a1d1fa
8 changed files with 656 additions and 44 deletions
+342 -26
View File
@@ -1,11 +1,18 @@
""" """
商品资料 SQLite 数据库 商品资料 SQLite 数据库 + 商品记忆库
将商品资料 (条码/名称/进货价/单位) 存储在 SQLite 中, 将商品资料 (条码/名称/进货价/单位/规格) 存储在 SQLite 中,
支持从 Excel 自动导入按条码快速查询。 支持从 Excel 自动导入按条码快速查询、以及从 OCR 处理结果中学习
记忆库功能:
- 处理完每单后自动学习商品数据
- 下次处理时用记忆库补全 OCR 缺失/错误的字段
- 通过置信度系统控制数据质量
- 支持云端同步
""" """
import os import os
import json
import sqlite3 import sqlite3
from datetime import datetime from datetime import datetime
from typing import Dict, List, Optional from typing import Dict, List, Optional
@@ -20,7 +27,7 @@ logger = get_logger(__name__)
class ProductDatabase: class ProductDatabase:
"""商品资料 SQLite 数据库""" """商品资料 SQLite 数据库 + 商品记忆库"""
SCHEMA = """ SCHEMA = """
CREATE TABLE IF NOT EXISTS products ( CREATE TABLE IF NOT EXISTS products (
@@ -28,10 +35,24 @@ class ProductDatabase:
name TEXT DEFAULT '', name TEXT DEFAULT '',
price REAL DEFAULT 0.0, price REAL DEFAULT 0.0,
unit TEXT DEFAULT '', unit TEXT DEFAULT '',
updated_at TEXT updated_at TEXT,
specification TEXT DEFAULT '',
source TEXT DEFAULT 'template',
confidence INTEGER DEFAULT 0,
usage_count INTEGER DEFAULT 0,
last_seen TEXT
); );
""" """
# 新增列定义(用于迁移)
_NEW_COLUMNS = {
'specification': "TEXT DEFAULT ''",
'source': "TEXT DEFAULT 'template'",
'confidence': 'INTEGER DEFAULT 0',
'usage_count': 'INTEGER DEFAULT 0',
'last_seen': 'TEXT',
}
def __init__(self, db_path: str, excel_source: str): def __init__(self, db_path: str, excel_source: str):
"""初始化数据库,如果 SQLite 不存在则自动从 Excel 导入 """初始化数据库,如果 SQLite 不存在则自动从 Excel 导入
@@ -49,6 +70,7 @@ class ProductDatabase:
def _ensure_db(self): def _ensure_db(self):
"""确保数据库存在,不存在则从 Excel 导入""" """确保数据库存在,不存在则从 Excel 导入"""
if os.path.exists(self.db_path): if os.path.exists(self.db_path):
self._migrate_schema()
return return
if not os.path.exists(self.excel_source): if not os.path.exists(self.excel_source):
@@ -71,8 +93,24 @@ class ProductDatabase:
finally: finally:
conn.close() conn.close()
def _migrate_schema(self):
"""幂等迁移:为已有数据库添加新列"""
conn = self._connect()
try:
cursor = conn.execute("PRAGMA table_info(products)")
existing_cols = {row[1] for row in cursor.fetchall()}
for col_name, col_type in self._NEW_COLUMNS.items():
if col_name not in existing_cols:
conn.execute(f"ALTER TABLE products ADD COLUMN {col_name} {col_type}")
logger.info(f"数据库迁移: 添加列 {col_name}")
conn.commit()
finally:
conn.close()
def import_from_excel(self, excel_path: str) -> int: def import_from_excel(self, excel_path: str) -> int:
"""从 Excel 导入商品资料 """从 Excel 导入商品资料source=template, confidence=100
Args: Args:
excel_path: Excel 文件路径 excel_path: Excel 文件路径
@@ -101,9 +139,10 @@ class ProductDatabase:
price_col = col price_col = col
break break
# 查找名称列单位列 (可选) # 查找名称列单位列、规格列 (可选)
name_col = ColumnMapper.find_column(list(df.columns), 'name') name_col = ColumnMapper.find_column(list(df.columns), 'name')
unit_col = ColumnMapper.find_column(list(df.columns), 'unit') unit_col = ColumnMapper.find_column(list(df.columns), 'unit')
spec_col = ColumnMapper.find_column(list(df.columns), 'specification')
now = datetime.now().isoformat() now = datetime.now().isoformat()
rows = [] rows = []
@@ -127,8 +166,11 @@ class ProductDatabase:
unit = str(row.get(unit_col, '')).strip() if unit_col else '' unit = str(row.get(unit_col, '')).strip() if unit_col else ''
if unit == 'nan': if unit == 'nan':
unit = '' unit = ''
spec = str(row.get(spec_col, '')).strip() if spec_col else ''
if spec == 'nan':
spec = ''
rows.append((barcode, name, price, unit, now)) rows.append((barcode, name, price, unit, now, spec, 'template', 100, 0, now))
if not rows: if not rows:
logger.warning(f"Excel 中未解析出有效记录: {excel_path}") logger.warning(f"Excel 中未解析出有效记录: {excel_path}")
@@ -137,8 +179,9 @@ class ProductDatabase:
conn = self._connect() conn = self._connect()
try: try:
conn.executemany( conn.executemany(
"INSERT OR REPLACE INTO products (barcode, name, price, unit, updated_at) " "INSERT OR REPLACE INTO products "
"VALUES (?, ?, ?, ?, ?)", "(barcode, name, price, unit, updated_at, specification, source, confidence, usage_count, last_seen) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
rows rows
) )
conn.commit() conn.commit()
@@ -161,15 +204,10 @@ class ProductDatabase:
conn.close() conn.close()
return self.import_from_excel(self.excel_source) return self.import_from_excel(self.excel_source)
# ── 基础查询(保持兼容) ──────────────────────────────────
def get_price(self, barcode: str) -> Optional[float]: def get_price(self, barcode: str) -> Optional[float]:
"""按条码查询进货价 """按条码查询进货价"""
Args:
barcode: 商品条码
Returns:
进货价,未找到返回 None
"""
conn = self._connect() conn = self._connect()
try: try:
cursor = conn.execute( cursor = conn.execute(
@@ -182,14 +220,7 @@ class ProductDatabase:
conn.close() conn.close()
def get_prices(self, barcodes: List[str]) -> Dict[str, float]: def get_prices(self, barcodes: List[str]) -> Dict[str, float]:
"""批量查询进货价 """批量查询进货价"""
Args:
barcodes: 条码列表
Returns:
{条码: 进货价} 字典,未找到的不包含
"""
if not barcodes: if not barcodes:
return {} return {}
@@ -212,3 +243,288 @@ class ProductDatabase:
return cursor.fetchone()[0] return cursor.fetchone()[0]
finally: finally:
conn.close() conn.close()
# ── 记忆库查询 ────────────────────────────────────────────
def get_memory(self, barcode: str) -> Optional[Dict]:
"""查询单条商品记忆"""
conn = self._connect()
conn.row_factory = sqlite3.Row
try:
cursor = conn.execute(
"SELECT * FROM products WHERE barcode = ?",
(str(barcode).strip(),)
)
row = cursor.fetchone()
if row:
return dict(row)
return None
finally:
conn.close()
def get_memories(self, barcodes: List[str]) -> Dict[str, Dict]:
"""批量查询商品记忆"""
if not barcodes:
return {}
conn = self._connect()
conn.row_factory = sqlite3.Row
try:
placeholders = ','.join('?' * len(barcodes))
cursor = conn.execute(
f"SELECT * FROM products WHERE barcode IN ({placeholders})",
[str(b).strip() for b in barcodes]
)
return {row['barcode']: dict(row) for row in cursor.fetchall()}
finally:
conn.close()
def get_all_memories(self) -> List[Dict]:
"""返回全部记录(UI 用)"""
conn = self._connect()
conn.row_factory = sqlite3.Row
try:
cursor = conn.execute(
"SELECT * FROM products ORDER BY usage_count DESC, barcode"
)
return [dict(row) for row in cursor.fetchall()]
finally:
conn.close()
# ── 学习逻辑 ──────────────────────────────────────────────
def learn_from_product(self, product: Dict, source: str = 'ocr') -> None:
"""从处理结果中学习单条商品数据
Args:
product: 商品字典 (barcode, name, specification, unit, price, ...)
source: 数据来源 ('template', 'ocr', 'user_confirmed')
"""
barcode = str(product.get('barcode', '')).strip()
if not barcode:
return
now = datetime.now().isoformat()
name = str(product.get('name', ''))
spec = str(product.get('specification', ''))
unit = str(product.get('unit', ''))
price = float(product.get('price', 0))
conn = self._connect()
try:
cursor = conn.execute(
"SELECT confidence, usage_count FROM products WHERE barcode = ?",
(barcode,)
)
row = cursor.fetchone()
if row is None:
# 新记录
conf = {'template': 100, 'user_confirmed': 90}.get(source, 50)
conn.execute(
"INSERT INTO products "
"(barcode, name, specification, unit, price, source, confidence, usage_count, last_seen, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)",
(barcode, name, spec, unit, price, source, conf, now, now)
)
else:
old_conf, old_count = row
new_count = old_count + 1
if source == 'template':
new_conf = 100
elif source == 'user_confirmed':
new_conf = 90
else: # ocr
new_conf = min(80, old_conf + 10) if old_conf < 80 else old_conf
if source in ('template', 'user_confirmed'):
# 高权威来源:全字段覆盖
conn.execute(
"UPDATE products SET name=?, specification=?, unit=?, price=?, "
"source=?, confidence=?, usage_count=?, last_seen=?, updated_at=? "
"WHERE barcode=?",
(name, spec, unit, price, source, new_conf, new_count, now, now, barcode)
)
else:
# OCR:仅填充空字段,不更新 price
conn.execute(
"UPDATE products SET "
"name = CASE WHEN name='' THEN ? ELSE name END, "
"specification = CASE WHEN specification='' THEN ? ELSE specification END, "
"unit = CASE WHEN unit='' THEN ? ELSE unit END, "
"source=?, confidence=?, usage_count=?, last_seen=?, updated_at=? "
"WHERE barcode=?",
(name, spec, unit, source, new_conf, new_count, now, now, barcode)
)
conn.commit()
finally:
conn.close()
def learn_from_products(self, products: List[Dict], source: str = 'ocr') -> int:
"""批量学习,返回更新条数"""
count = 0
for p in products:
try:
self.learn_from_product(p, source)
count += 1
except Exception as e:
logger.warning(f"学习商品记忆失败: {e}")
return count
def update_memory(self, barcode: str, fields: Dict) -> bool:
"""手动编辑记录(UI 用,source→user_confirmed, confidence→90"""
barcode = str(barcode).strip()
if not barcode:
return False
allowed = {'name', 'specification', 'unit', 'price'}
updates = {k: v for k, v in fields.items() if k in allowed}
if not updates:
return False
now = datetime.now().isoformat()
set_clause = ', '.join(f"{k}=?" for k in updates)
values = list(updates.values())
conn = self._connect()
try:
conn.execute(
f"UPDATE products SET {set_clause}, source='user_confirmed', confidence=90, "
"updated_at=? WHERE barcode=?",
values + [now, barcode]
)
conn.commit()
return conn.total_changes > 0
finally:
conn.close()
def delete_memory(self, barcode: str) -> bool:
"""删除记录"""
conn = self._connect()
try:
conn.execute("DELETE FROM products WHERE barcode=?", (str(barcode).strip(),))
conn.commit()
return conn.total_changes > 0
finally:
conn.close()
# ── 云端同步 ──────────────────────────────────────────────
def export_for_sync(self) -> Dict:
"""导出全部记录为 JSON-serializable dict(按条码索引)"""
conn = self._connect()
try:
cursor = conn.execute(
"SELECT barcode, name, specification, unit, price, source, "
"confidence, usage_count, last_seen FROM products"
)
result = {}
for row in cursor.fetchall():
result[row[0]] = {
'name': row[1],
'specification': row[2],
'unit': row[3],
'price': row[4],
'source': row[5],
'confidence': row[6],
'usage_count': row[7],
'last_seen': row[8],
}
return result
finally:
conn.close()
def import_from_sync(self, data: Dict) -> int:
"""从云端 JSON 导入,高置信度优先合并
Args:
data: {barcode: {name, specification, unit, price, source, confidence, ...}}
Returns:
导入/更新的记录数
"""
now = datetime.now().isoformat()
count = 0
conn = self._connect()
try:
for barcode, info in data.items():
barcode = str(barcode).strip()
if not barcode:
continue
name = str(info.get('name', ''))
spec = str(info.get('specification', ''))
unit = str(info.get('unit', ''))
price = float(info.get('price', 0))
remote_source = str(info.get('source', 'ocr'))
remote_conf = int(info.get('confidence', 50))
remote_count = int(info.get('usage_count', 1))
remote_seen = str(info.get('last_seen', now))
cursor = conn.execute(
"SELECT confidence FROM products WHERE barcode = ?",
(barcode,)
)
row = cursor.fetchone()
if row is None:
# 新记录,直接插入
conn.execute(
"INSERT INTO products "
"(barcode, name, specification, unit, price, source, confidence, usage_count, last_seen, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(barcode, name, spec, unit, price, remote_source, remote_conf, remote_count, remote_seen, now)
)
count += 1
else:
local_conf = row[0]
if remote_conf > local_conf:
# 云端置信度更高,覆盖
conn.execute(
"UPDATE products SET name=?, specification=?, unit=?, price=?, "
"source=?, confidence=?, usage_count=?, last_seen=?, updated_at=? "
"WHERE barcode=?",
(name, spec, unit, price, remote_source, remote_conf, remote_count, remote_seen, now, barcode)
)
count += 1
elif remote_conf == local_conf:
# 置信度相同,填充空字段
conn.execute(
"UPDATE products SET "
"name = CASE WHEN name='' THEN ? ELSE name END, "
"specification = CASE WHEN specification='' THEN ? ELSE specification END, "
"unit = CASE WHEN unit='' THEN ? ELSE unit END, "
"usage_count = MAX(usage_count, ?), "
"updated_at=? WHERE barcode=?",
(name, spec, unit, remote_count, now, barcode)
)
count += 1
conn.commit()
finally:
conn.close()
return count
def _export_memory_json(self, json_path: str = None) -> str:
"""导出记忆库为本地 JSON 文件
Args:
json_path: 输出路径,默认 data/product_memory.json
Returns:
写入的文件路径
"""
if json_path is None:
json_path = os.path.join(os.path.dirname(self.db_path), 'product_memory.json')
data = self.export_for_sync()
os.makedirs(os.path.dirname(json_path), exist_ok=True)
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.debug(f"商品记忆库已导出: {json_path} ({len(data)} 条)")
return json_path
+78 -1
View File
@@ -40,12 +40,13 @@ class ExcelProcessor:
提取条码、单价和数量,并按照采购单模板的格式填充 提取条码、单价和数量,并按照采购单模板的格式填充
""" """
def __init__(self, config): def __init__(self, config, product_db=None):
""" """
初始化Excel处理器 初始化Excel处理器
Args: Args:
config: 配置信息 config: 配置信息
product_db: 商品数据库实例(可选,由外部传入以共享)
""" """
self.config = config self.config = config
@@ -74,6 +75,18 @@ class ExcelProcessor:
# 加载单位转换器和配置 # 加载单位转换器和配置
self.unit_converter = UnitConverter() self.unit_converter = UnitConverter()
# 商品记忆库
if product_db is not None:
self.product_db = product_db
else:
from ..db.product_db import ProductDatabase
db_path = config.get_path('Paths', 'product_db', fallback='data/product_cache.db') if hasattr(config, 'get_path') else 'data/product_cache.db'
tpl_folder = config.get('Paths', 'template_folder', fallback='templates')
item_data = config.get('Templates', 'item_data', fallback='商品资料.xlsx')
tpl_path = os.path.join(tpl_folder, item_data)
self.product_db = ProductDatabase(db_path, tpl_path)
logger.info(f"初始化ExcelProcessor完成,模板文件: {self.template_path}") logger.info(f"初始化ExcelProcessor完成,模板文件: {self.template_path}")
except Exception as e: except Exception as e:
logger.error(f"初始化ExcelProcessor失败: {e}") logger.error(f"初始化ExcelProcessor失败: {e}")
@@ -371,6 +384,9 @@ class ExcelProcessor:
except Exception as e: except Exception as e:
logger.warning(f"通过金额和单价计算数量失败: {e}") logger.warning(f"通过金额和单价计算数量失败: {e}")
# 应用记忆库补全
product = self._apply_memory(product)
products.append(product) products.append(product)
except Exception as e: except Exception as e:
logger.error(f"提取第{idx+1}行商品信息时出错: {e}", exc_info=True) logger.error(f"提取第{idx+1}行商品信息时出错: {e}", exc_info=True)
@@ -379,6 +395,59 @@ class ExcelProcessor:
logger.info(f"提取到 {len(products)} 个商品信息") logger.info(f"提取到 {len(products)} 个商品信息")
return products return products
def _apply_memory(self, product: Dict) -> Dict:
"""查记忆库,补全 OCR 缺失/错误的字段"""
barcode = product.get('barcode', '')
if not barcode:
return product
try:
memory = self.product_db.get_memory(barcode)
except Exception:
return product
if memory is None or memory.get('confidence', 0) < 80:
return product
# 补全规格
ocr_spec = product.get('specification', '')
mem_spec = memory.get('specification', '') or ''
if mem_spec and (not ocr_spec or self._is_spec_suspicious(ocr_spec)):
product['specification'] = mem_spec
logger.info(f"记忆修正规格: {barcode} '{ocr_spec}' -> '{mem_spec}'")
# 补全名称
ocr_name = product.get('name', '')
mem_name = memory.get('name', '') or ''
if mem_name and not ocr_name:
product['name'] = mem_name
logger.info(f"记忆修正名称: {barcode} -> '{mem_name}'")
# 补全单位
ocr_unit = product.get('unit', '')
mem_unit = memory.get('unit', '') or ''
if mem_unit and not ocr_unit:
product['unit'] = mem_unit
logger.info(f"记忆修正单位: {barcode} -> '{mem_unit}'")
# 不改数量和单价(每单不同)
return product
def _is_spec_suspicious(self, spec: str) -> bool:
"""检测规格是否像 OCR 垃圾"""
if not spec:
return True
# IL*12I 和 1 混淆)
if re.search(r'^[Ii][Ll*]', spec):
return True
# 4.51*4L 被识别为 1
if re.search(r'\d+\.\d+1\*\d+', spec):
return True
# 包含非常规字符(排除常见规格字符)
if re.search(r'[^\d.*xX\-LlKkGgMm升毫瓶桶盒箱件提\s]', spec):
return True
return False
def fill_template(self, products: List[Dict], output_file_path: str) -> bool: def fill_template(self, products: List[Dict], output_file_path: str) -> bool:
""" """
填充采购单模板 填充采购单模板
@@ -599,6 +668,14 @@ class ExcelProcessor:
# 填充模板并保存 # 填充模板并保存
if self.fill_template(products, output_file): if self.fill_template(products, output_file):
# 从处理结果中学习商品记忆
try:
self.product_db.learn_from_products(products, source='ocr')
self.product_db._export_memory_json()
logger.info(f"已从处理结果学习 {len(products)} 条商品记忆")
except Exception as e:
logger.warning(f"学习商品记忆失败: {e}")
# 记录已处理文件 # 记录已处理文件
self.processed_files[file_path] = output_file self.processed_files[file_path] = output_file
self._save_processed_files() self._save_processed_files()
+19
View File
@@ -830,6 +830,12 @@ SYNC_FILES = [
"local": "templates/银豹-采购单模板.xls", "local": "templates/银豹-采购单模板.xls",
"type": "binary", "type": "binary",
}, },
{
"name": "商品记忆库",
"remote": "product_memory.json",
"local": "data/product_memory.json",
"type": "json",
},
] ]
@@ -1068,6 +1074,19 @@ def show_cloud_sync_dialog(parent=None):
ProcessorService(ConfigManager()).reload_processors() ProcessorService(ConfigManager()).reload_processors()
except Exception: except Exception:
pass pass
elif entry["remote"] == "product_memory.json":
try:
from app.core.db.product_db import ProductDatabase
cfg = ConfigManager()
db_path = cfg.get_path('Paths', 'product_db', fallback='data/product_cache.db') if hasattr(cfg, 'get_path') else 'data/product_cache.db'
tpl_folder = cfg.get('Paths', 'template_folder', fallback='templates')
item_data = cfg.get('Templates', 'item_data', fallback='商品资料.xlsx')
tpl_path = os.path.join(tpl_folder, item_data)
db = ProductDatabase(db_path, tpl_path)
count = db.import_from_sync(data)
logger.info(f"从云端导入商品记忆: {count}")
except Exception:
pass
def push_all(): def push_all():
ok, fail = 0, 0 ok, fail = 0, 0
+10 -10
View File
@@ -30,8 +30,15 @@ class OrderService:
logger.info("初始化OrderService") logger.info("初始化OrderService")
self.config = config or ConfigManager() self.config = config or ConfigManager()
# 创建共享的商品数据库实例
db_path = self.config.get_path('Paths', 'product_db', fallback='data/product_cache.db') if hasattr(self.config, 'get_path') else 'data/product_cache.db'
tpl_folder = self.config.get('Paths', 'template_folder', fallback='templates')
item_data = self.config.get('Templates', 'item_data', fallback='商品资料.xlsx')
tpl_path = os.path.join(tpl_folder, item_data)
self.product_db = ProductDatabase(db_path, tpl_path)
# 创建Excel处理器和采购单合并器 # 创建Excel处理器和采购单合并器
self.excel_processor = ExcelProcessor(self.config) self.excel_processor = ExcelProcessor(self.config, product_db=self.product_db)
self.order_merger = PurchaseOrderMerger(self.config) self.order_merger = PurchaseOrderMerger(self.config)
logger.info("OrderService初始化完成") logger.info("OrderService初始化完成")
@@ -195,18 +202,11 @@ class OrderService:
""" """
try: try:
import pandas as pd import pandas as pd
import os
from app.core.utils.file_utils import smart_read_excel from app.core.utils.file_utils import smart_read_excel
from app.core.handlers.column_mapper import ColumnMapper as CM from app.core.handlers.column_mapper import ColumnMapper as CM
config = ConfigManager() # 使用共享的商品数据库实例
template_folder = config.get('Paths', 'template_folder', fallback='templates') product_db = self.product_db
item_data = config.get('Templates', 'item_data', fallback='商品资料.xlsx')
item_path = os.path.join(template_folder, item_data)
product_db_path = config.get('Paths', 'product_db', fallback='data/product_cache.db')
# 使用 SQLite 查询商品进货价
product_db = ProductDatabase(product_db_path, item_path)
# 读取待校验的采购单 # 读取待校验的采购单
df_res = smart_read_excel(result_path) df_res = smart_read_excel(result_path)
+2
View File
@@ -28,6 +28,7 @@ from .action_handlers import (
merge_orders_with_status, process_excel_file_with_status, merge_orders_with_status, process_excel_file_with_status,
process_dropped_file, process_dropped_file,
) )
from .memory_editor import show_memory_editor
from .config_dialog import show_config_dialog from .config_dialog import show_config_dialog
from .barcode_editor import edit_barcode_mappings from .barcode_editor import edit_barcode_mappings
from .shortcuts import bind_keyboard_shortcuts from .shortcuts import bind_keyboard_shortcuts
@@ -256,6 +257,7 @@ def _create_right_panel(content_frame, theme, log_text, root):
create_modern_button(settings_buttons_frame, "系统设置", lambda: show_config_dialog(root, ConfigManager()), "primary", px_width=132, px_height=32).pack(anchor='w', pady=3) create_modern_button(settings_buttons_frame, "系统设置", lambda: show_config_dialog(root, ConfigManager()), "primary", px_width=132, px_height=32).pack(anchor='w', pady=3)
create_modern_button(settings_buttons_frame, "条码映射", lambda: edit_barcode_mappings(log_text), "primary", px_width=132, px_height=32).pack(anchor='w', pady=3) create_modern_button(settings_buttons_frame, "条码映射", lambda: edit_barcode_mappings(log_text), "primary", px_width=132, px_height=32).pack(anchor='w', pady=3)
create_modern_button(settings_buttons_frame, "云端同步", lambda: show_cloud_sync_dialog(root), "primary", px_width=132, px_height=32).pack(anchor='w', pady=3) create_modern_button(settings_buttons_frame, "云端同步", lambda: show_cloud_sync_dialog(root), "primary", px_width=132, px_height=32).pack(anchor='w', pady=3)
create_modern_button(settings_buttons_frame, "商品记忆库", lambda: show_memory_editor(root), "primary", px_width=132, px_height=32).pack(anchor='w', pady=3)
def _setup_drag_area(mid_container, theme, dnd_supported, log_text, status_bar): def _setup_drag_area(mid_container, theme, dnd_supported, log_text, status_bar):
+198
View File
@@ -0,0 +1,198 @@
"""商品记忆库查看/编辑对话框"""
import os
import tkinter as tk
from tkinter import ttk, messagebox, simpledialog
from app.config.settings import ConfigManager
from app.core.db.product_db import ProductDatabase
from .ui_widgets import center_window
def _get_product_db():
cfg = ConfigManager()
db_path = cfg.get_path('Paths', 'product_db', fallback='data/product_cache.db') if hasattr(cfg, 'get_path') else 'data/product_cache.db'
tpl_folder = cfg.get('Paths', 'template_folder', fallback='templates')
item_data = cfg.get('Templates', 'item_data', fallback='商品资料.xlsx')
tpl_path = os.path.join(tpl_folder, item_data)
return ProductDatabase(db_path, tpl_path)
def show_memory_editor(root):
"""显示商品记忆库编辑器"""
db = _get_product_db()
dlg = tk.Toplevel(root)
dlg.title("商品记忆库")
dlg.geometry("950x520")
center_window(dlg)
# ── 顶部搜索栏 ──
top = ttk.Frame(dlg)
top.pack(fill=tk.X, padx=8, pady=(8, 4))
ttk.Label(top, text="搜索:").pack(side=tk.LEFT)
search_var = tk.StringVar()
search_entry = ttk.Entry(top, textvariable=search_var, width=30)
search_entry.pack(side=tk.LEFT, padx=4)
# ── 统计标签 ──
stats_label = ttk.Label(top, text="")
stats_label.pack(side=tk.RIGHT)
# ── Treeview ──
columns = ("barcode", "name", "specification", "unit", "price", "source", "confidence", "usage_count", "last_seen")
tree = ttk.Treeview(dlg, columns=columns, show="headings", height=18)
headers = {
"barcode": ("条码", 120),
"name": ("名称", 180),
"specification": ("规格", 80),
"unit": ("单位", 50),
"price": ("单价", 70),
"source": ("来源", 80),
"confidence": ("置信度", 60),
"usage_count": ("使用次数", 70),
"last_seen": ("最后使用", 140),
}
for col, (text, width) in headers.items():
tree.heading(col, text=text)
tree.column(col, width=width, anchor="center")
# 置信度颜色标签
tree.tag_configure("high", foreground="#28a745") # >= 80 绿
tree.tag_configure("medium", foreground="#ffc107") # 50-79 黄
tree.tag_configure("low", foreground="#dc3545") # < 50 红
scrollbar = ttk.Scrollbar(dlg, orient=tk.VERTICAL, command=tree.yview)
tree.configure(yscrollcommand=scrollbar.set)
tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(8, 0), pady=4)
scrollbar.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 8), pady=4)
# ── 数据加载 ──
all_records = []
def load_data(filter_text=""):
nonlocal all_records
all_records = db.get_all_memories()
tree.delete(*tree.get_children())
filtered = all_records
if filter_text:
ft = filter_text.lower()
filtered = [r for r in all_records
if ft in str(r.get('barcode', '')).lower()
or ft in str(r.get('name', '')).lower()]
for r in filtered:
conf = r.get('confidence', 0) or 0
tag = "high" if conf >= 80 else ("medium" if conf >= 50 else "low")
last_seen = r.get('last_seen', '') or ''
if last_seen and len(last_seen) > 16:
last_seen = last_seen[:16]
source_display = {
'template': '模板',
'ocr': 'OCR',
'user_confirmed': '手动',
}.get(r.get('source', ''), r.get('source', ''))
tree.insert("", tk.END, values=(
r.get('barcode', ''),
r.get('name', ''),
r.get('specification', ''),
r.get('unit', ''),
f"{r.get('price', 0):.2f}" if r.get('price') else '',
source_display,
conf,
r.get('usage_count', 0) or 0,
last_seen,
), tags=(tag,))
stats_label.config(text=f"{len(filtered)} / {len(all_records)}")
def on_search(*_):
load_data(search_var.get())
search_var.trace_add("write", on_search)
# ── 按钮区 ──
btn_frame = ttk.Frame(dlg)
btn_frame.pack(fill=tk.X, padx=8, pady=(0, 8))
def edit_selected():
sel = tree.selection()
if not sel:
messagebox.showwarning("提示", "请先选择一条记录")
return
item = tree.item(sel[0])
vals = item['values']
barcode = vals[0]
# 弹出编辑对话框
edit_dlg = tk.Toplevel(dlg)
edit_dlg.title(f"编辑: {barcode}")
edit_dlg.geometry("380x260")
center_window(edit_dlg)
fields = [
("名称", "name", vals[1]),
("规格", "specification", vals[2]),
("单位", "unit", vals[3]),
("单价", "price", vals[4]),
]
entries = {}
for i, (label, key, val) in enumerate(fields):
ttk.Label(edit_dlg, text=label).grid(row=i, column=0, sticky='w', padx=8, pady=4)
var = tk.StringVar(value=str(val) if val else '')
ttk.Entry(edit_dlg, textvariable=var, width=30).grid(row=i, column=1, padx=8, pady=4)
entries[key] = var
def save_edit():
updates = {}
for key, var in entries.items():
v = var.get().strip()
if key == 'price':
try:
updates[key] = float(v) if v else 0
except ValueError:
updates[key] = 0
else:
updates[key] = v
db.update_memory(barcode, updates)
edit_dlg.destroy()
load_data(search_var.get())
ttk.Button(edit_dlg, text="保存", command=save_edit).grid(row=len(fields), column=0, columnspan=2, pady=12)
def delete_selected():
sel = tree.selection()
if not sel:
messagebox.showwarning("提示", "请先选择一条记录")
return
item = tree.item(sel[0])
barcode = item['values'][0]
if messagebox.askyesno("确认删除", f"确定要删除条码 {barcode} 的记忆记录吗?"):
db.delete_memory(barcode)
load_data(search_var.get())
def reimport_template():
if messagebox.askyesno("确认", "重新从商品资料导入将重置所有模板商品的置信度为100,确定继续吗?"):
count = db.reimport()
messagebox.showinfo("完成", f"已重新导入 {count} 条记录")
load_data(search_var.get())
ttk.Button(btn_frame, text="编辑", command=edit_selected).pack(side=tk.LEFT, padx=4)
ttk.Button(btn_frame, text="删除", command=delete_selected).pack(side=tk.LEFT, padx=4)
ttk.Button(btn_frame, text="重新导入模板", command=reimport_template).pack(side=tk.LEFT, padx=4)
ttk.Button(btn_frame, text="刷新", command=lambda: load_data(search_var.get())).pack(side=tk.LEFT, padx=4)
ttk.Button(btn_frame, text="关闭", command=dlg.destroy).pack(side=tk.RIGHT, padx=4)
# 双击编辑
tree.bind("<Double-1>", lambda e: edit_selected())
# 初始加载
load_data()
+1
View File
@@ -91,6 +91,7 @@ hidden_imports = [
'app.ui.config_dialog', 'app.ui.config_dialog',
'app.ui.shortcuts', 'app.ui.shortcuts',
'app.ui.main_window', 'app.ui.main_window',
'app.ui.memory_editor',
] ]
a = Analysis( a = Analysis(
+1 -2
View File
@@ -34,11 +34,10 @@ purchase_order = 银豹-采购单模板.xls
item_data = 商品资料.xlsx item_data = 商品资料.xlsx
[App] [App]
version = 2026.05.04.2128 version = 2026.05.05.0239
[Gitea] [Gitea]
base_url = https://gitea.94kan.cn base_url = https://gitea.94kan.cn
owner = houhuan owner = houhuan
repo = yixuan-sync-data repo = yixuan-sync-data
token = 50b61e43a141d606ae2529cd1755bc666d800e08 token = 50b61e43a141d606ae2529cd1755bc666d800e08