fix: sync/barcode/memory overhaul + detailed logs + preview + result tracking

- Sync: fix GiteaSync constructor + add push()/pull() methods
- Barcode: two-tab layout matching GUI (mapping + special rules)
- Memory: spec→specification unification, manual add, confidence/price tracking
- Processing: TaskLogHandler captures detailed logs (barcode mapping, unit conversion)
- Preview: fullscreen dialog for file preview (image/Excel) in Orders/Tables/Images
- Detail: per-file log filtering in file pages
- Tasks: result files now per-task, add copy path button
- Config: reactive edited state + save_config fix
- Dashboard: sync task isolation, log limit 10

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-05 19:37:10 +08:00
parent c18039f790
commit 81bafaf557
20 changed files with 1610 additions and 502 deletions
+13 -12
View File
@@ -105,23 +105,24 @@ class ConfigManager:
def save_config(self) -> None:
"""保存配置到文件(API 密钥不写入文件)"""
try:
# 保存前临时清空 API 密钥,避免写入文件
saved_keys = {}
for option in ('api_key', 'secret_key'):
# 保存前临时清空 API 密钥,避免写入文件
saved_keys = {}
for option in ('api_key', 'secret_key'):
try:
saved_keys[option] = self.config.get('API', option, fallback='')
self.config.set('API', option, '')
except Exception:
saved_keys[option] = ''
self.config.set('API', option, '')
try:
with open(self.config_file, 'w', encoding='utf-8') as f:
self.config.write(f)
# 恢复内存中的值
for option, val in saved_keys.items():
self.config.set('API', option, val)
logger.info(f"配置已保存到: {self.config_file}")
except Exception as e:
logger.error(f"保存配置文件时出错: {e}")
finally:
# 恢复内存中的值(即使写入失败也恢复)
for option, val in saved_keys.items():
if val:
self.config.set('API', option, val)
def get(self, section: str, option: str, fallback: Any = None) -> Any:
"""获取配置值"""
+308 -229
View File
@@ -1,21 +1,18 @@
"""
商品资料 SQLite 数据库 + 商品记忆库
将商品资料 (条码/名称/进货价/单位/规格) 存储在 SQLite 中,
支持从 Excel 自动导入、按条码快速查询、以及从 OCR 处理结果中学习。
记忆库功能:
- 处理完每单后自动学习商品数据
- 下次处理时用记忆库补全 OCR 缺失/错误的字段
- 通过置信度系统控制数据质量
- 支持云端同步
- 处理每步后自动学习商品数据(置信度+一致性加速)
- OCR 字段缺失时用记忆库补全 (conf > 50 直接采用)
- 价格异常检测:偏差 > 2倍触发补全,偏差 > 50% 记录预警
- 批量预加载 → 内存操作 → 批量写回,保障性能
"""
import os
import json
import sqlite3
from datetime import datetime
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Tuple, Callable
import pandas as pd
@@ -40,26 +37,27 @@ class ProductDatabase:
source TEXT DEFAULT 'template',
confidence INTEGER DEFAULT 0,
usage_count INTEGER DEFAULT 0,
last_seen TEXT
last_seen TEXT,
avg_price REAL DEFAULT 0.0,
min_price REAL DEFAULT 0.0,
max_price REAL DEFAULT 0.0,
price_count INTEGER DEFAULT 0
);
"""
# 新增列定义(用于迁移)
_NEW_COLUMNS = {
'specification': "TEXT DEFAULT ''",
'source': "TEXT DEFAULT 'template'",
'confidence': 'INTEGER DEFAULT 0',
'usage_count': 'INTEGER DEFAULT 0',
'last_seen': 'TEXT',
'avg_price': 'REAL DEFAULT 0.0',
'min_price': 'REAL DEFAULT 0.0',
'max_price': 'REAL DEFAULT 0.0',
'price_count': 'INTEGER DEFAULT 0',
}
def __init__(self, db_path: str, excel_source: str):
"""初始化数据库,如果 SQLite 不存在则自动从 Excel 导入
Args:
db_path: SQLite 数据库文件路径
excel_source: 商品资料 Excel 文件路径
"""
self.db_path = db_path
self.excel_source = excel_source
self._ensure_db()
@@ -68,16 +66,13 @@ class ProductDatabase:
return sqlite3.connect(self.db_path)
def _ensure_db(self):
"""确保数据库存在,不存在则从 Excel 导入"""
if os.path.exists(self.db_path):
self._migrate_schema()
return
if not os.path.exists(self.excel_source):
logger.warning(f"商品资料 Excel 不存在,跳过导入: {self.excel_source}")
logger.warning(f"商品资料 Excel 不存在: {self.excel_source}")
self._create_empty_db()
return
logger.info(f"首次运行,从 Excel 导入商品资料: {self.excel_source}")
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
self._create_empty_db()
@@ -85,7 +80,6 @@ class ProductDatabase:
logger.info(f"商品资料导入完成: {count} 条记录")
def _create_empty_db(self):
"""创建空数据库"""
conn = self._connect()
try:
conn.executescript(self.SCHEMA)
@@ -94,52 +88,35 @@ class ProductDatabase:
conn.close()
def _migrate_schema(self):
"""幂等迁移:为已有数据库添加新列"""
conn = self._connect()
try:
cursor = conn.execute("PRAGMA table_info(products)")
existing_cols = {row[1] for row in cursor.fetchall()}
for col_name, col_type in self._NEW_COLUMNS.items():
if col_name not in existing_cols:
conn.execute(f"ALTER TABLE products ADD COLUMN {col_name} {col_type}")
logger.info(f"数据库迁移: 添加列 {col_name}")
conn.commit()
finally:
conn.close()
# ══════════════════════════════════════════════════════════════
# 导入
# ══════════════════════════════════════════════════════════════
def import_from_excel(self, excel_path: str) -> int:
"""从 Excel 导入商品资料(source=template, confidence=100
Args:
excel_path: Excel 文件路径
Returns:
导入的记录数
"""
df = smart_read_excel(excel_path)
if df is None or df.empty:
logger.warning(f"Excel 文件为空或读取失败: {excel_path}")
return 0
# 查找条码列
barcode_col = ColumnMapper.find_column(list(df.columns), 'barcode')
if not barcode_col:
logger.error(f"Excel 中未找到条码列: {list(df.columns)}")
return 0
# 查找进货价列
price_col = ColumnMapper.find_column(list(df.columns), 'unit_price')
# 进货价可能没有标准别名,补充查找
if not price_col:
for col in df.columns:
col_str = str(col).strip()
if '进货价' in col_str:
if '进货价' in str(col).strip():
price_col = col
break
# 查找名称列、单位列、规格列 (可选)
name_col = ColumnMapper.find_column(list(df.columns), 'name')
unit_col = ColumnMapper.find_column(list(df.columns), 'unit')
spec_col = ColumnMapper.find_column(list(df.columns), 'specification')
@@ -150,7 +127,6 @@ class ProductDatabase:
barcode = str(row.get(barcode_col, '')).strip()
if not barcode or barcode == 'nan':
continue
price = 0.0
if price_col:
try:
@@ -159,43 +135,32 @@ class ProductDatabase:
price = float(p)
except (ValueError, TypeError):
pass
name = str(row.get(name_col, '')).strip() if name_col else ''
if name == 'nan':
name = ''
if name == 'nan': name = ''
unit = str(row.get(unit_col, '')).strip() if unit_col else ''
if unit == 'nan':
unit = ''
if unit == 'nan': unit = ''
spec = str(row.get(spec_col, '')).strip() if spec_col else ''
if spec == 'nan':
spec = ''
rows.append((barcode, name, price, unit, now, spec, 'template', 100, 0, now))
if spec == 'nan': spec = ''
# template 源置信度 50
rows.append((barcode, name, price, unit, now, spec, 'template', 50, 0, now,
price, price, price, 1 if price > 0 else 0))
if not rows:
logger.warning(f"Excel 中未解析出有效记录: {excel_path}")
return 0
conn = self._connect()
try:
conn.executemany(
"INSERT OR REPLACE INTO products "
"(barcode, name, price, unit, updated_at, specification, source, confidence, usage_count, last_seen) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
rows
)
"(barcode, name, price, unit, updated_at, specification, source, confidence, "
"usage_count, last_seen, avg_price, min_price, max_price, price_count) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
rows)
conn.commit()
finally:
conn.close()
return len(rows)
def reimport(self) -> int:
"""重新从 Excel 导入(清空现有数据后重新导入)
Returns:
导入的记录数
"""
conn = self._connect()
try:
conn.execute("DELETE FROM products")
@@ -204,203 +169,343 @@ class ProductDatabase:
conn.close()
return self.import_from_excel(self.excel_source)
# ── 基础查询(保持兼容) ──────────────────────────────────
# ══════════════════════════════════════════════════════════════
# 查询
# ══════════════════════════════════════════════════════════════
def get_price(self, barcode: str) -> Optional[float]:
"""按条码查询进货价"""
conn = self._connect()
try:
cursor = conn.execute(
"SELECT price FROM products WHERE barcode = ?",
(str(barcode).strip(),)
)
row = cursor.fetchone()
return row[0] if row else None
row = conn.execute("SELECT avg_price FROM products WHERE barcode=?",
(str(barcode).strip(),)).fetchone()
return row[0] if row and row[0] else None
finally:
conn.close()
def get_prices(self, barcodes: List[str]) -> Dict[str, float]:
"""批量查询进货价"""
if not barcodes:
return {}
conn = self._connect()
try:
placeholders = ','.join('?' * len(barcodes))
cursor = conn.execute(
f"SELECT barcode, price FROM products WHERE barcode IN ({placeholders})",
[str(b).strip() for b in barcodes]
)
return {row[0]: row[1] for row in cursor.fetchall()}
rows = conn.execute(
f"SELECT barcode, avg_price FROM products WHERE barcode IN ({placeholders})",
[str(b).strip() for b in barcodes]).fetchall()
return {r[0]: r[1] for r in rows if r[1]}
finally:
conn.close()
def count(self) -> int:
"""返回商品总数"""
conn = self._connect()
try:
cursor = conn.execute("SELECT COUNT(*) FROM products")
return cursor.fetchone()[0]
return conn.execute("SELECT COUNT(*) FROM products").fetchone()[0]
finally:
conn.close()
# ── 记忆库查询 ────────────────────────────────────────────
def get_memory(self, barcode: str) -> Optional[Dict]:
"""查询单条商品记忆"""
conn = self._connect()
conn.row_factory = sqlite3.Row
try:
cursor = conn.execute(
"SELECT * FROM products WHERE barcode = ?",
(str(barcode).strip(),)
)
row = cursor.fetchone()
if row:
return dict(row)
return None
row = conn.execute("SELECT * FROM products WHERE barcode=?",
(str(barcode).strip(),)).fetchone()
return dict(row) if row else None
finally:
conn.close()
def get_memories(self, barcodes: List[str]) -> Dict[str, Dict]:
"""批量查询商品记忆"""
if not barcodes:
return {}
conn = self._connect()
conn.row_factory = sqlite3.Row
try:
placeholders = ','.join('?' * len(barcodes))
cursor = conn.execute(
rows = conn.execute(
f"SELECT * FROM products WHERE barcode IN ({placeholders})",
[str(b).strip() for b in barcodes]
)
return {row['barcode']: dict(row) for row in cursor.fetchall()}
[str(b).strip() for b in barcodes]).fetchall()
return {r['barcode']: dict(r) for r in rows}
finally:
conn.close()
def get_all_memories(self) -> List[Dict]:
"""返回全部记录(UI 用)"""
conn = self._connect()
conn.row_factory = sqlite3.Row
try:
cursor = conn.execute(
"SELECT * FROM products ORDER BY usage_count DESC, barcode"
)
return [dict(row) for row in cursor.fetchall()]
return [dict(row) for row in
conn.execute("SELECT * FROM products ORDER BY usage_count DESC, barcode").fetchall()]
finally:
conn.close()
# ── 学习逻辑 ──────────────────────────────────────────────
# ══════════════════════════════════════════════════════════════
# 批量预加载 — 性能核心
# ══════════════════════════════════════════════════════════════
def learn_from_product(self, product: Dict, source: str = 'ocr') -> None:
"""从处理结果中学习单条商品数据
def load_batch(self, barcodes: List[str]) -> Dict[str, Dict]:
"""批量预加载条码记忆到 dict — 单次 SQL,后续纯内存操作"""
if not barcodes:
return {}
conn = self._connect()
conn.row_factory = sqlite3.Row
try:
placeholders = ','.join('?' * len(barcodes))
rows = conn.execute(
f"SELECT * FROM products WHERE barcode IN ({placeholders})",
[str(b).strip() for b in barcodes]).fetchall()
return {r['barcode']: dict(r) for r in rows}
finally:
conn.close()
Args:
product: 商品字典 (barcode, name, specification, unit, price, ...)
source: 数据来源 ('template', 'ocr', 'user_confirmed')
# ══════════════════════════════════════════════════════════════
# 学习逻辑 — 一致性加速 + 价格区间
# ══════════════════════════════════════════════════════════════
def learn_from_product(self, product: Dict, source: str = 'ocr',
memory: Dict[str, Dict] = None,
add_log: Callable = None) -> Optional[str]:
"""
从处理结果中学习,返回日志字符串。
memory: 可选的预加载批量内存,传入则零 DB 查询。
"""
barcode = str(product.get('barcode', '')).strip()
if not barcode:
return
return None
now = datetime.now().isoformat()
name = str(product.get('name', ''))
spec = str(product.get('specification', ''))
unit = str(product.get('unit', ''))
price = float(product.get('price', 0))
now = datetime.now().isoformat()
# 查现有记录(优先从内存查)
if memory is not None and barcode in memory:
row = memory[barcode]
old_name = row.get('name', '')
old_spec = row.get('specification', '')
old_unit = row.get('unit', '')
old_conf = row.get('confidence', 0)
old_count = row.get('usage_count', 0)
old_avg = row.get('avg_price', 0) or 0
old_min = row.get('min_price') or price
old_max = row.get('max_price') or price
pc = row.get('price_count', 0) or 0
exists = True
else:
conn = self._connect()
try:
cursor = conn.execute(
"SELECT name, specification, unit, confidence, usage_count, "
"avg_price, min_price, max_price, price_count FROM products WHERE barcode=?",
(barcode,)).fetchone()
finally:
conn.close()
if cursor is None:
exists = False
else:
old_name, old_spec, old_unit, old_conf, old_count, old_avg, old_min, old_max, pc = cursor
old_avg = old_avg or 0
pc = pc or 0
old_min = old_min if old_min is not None else price
old_max = old_max if old_max is not None else price
exists = True
new_count = old_count + 1 if exists else 1
# ── 置信度 ──
if source == 'user_confirmed':
new_conf = 90
elif source == 'template':
new_conf = 50
elif exists and old_conf < 50:
# 一致性加速
spec_match = bool(spec and old_spec and spec == old_spec)
unit_match = bool(unit and old_unit and unit == old_unit)
if spec_match and unit_match:
boost = 10
elif unit_match:
boost = 5
else:
boost = 3
new_conf = min(50, old_conf + boost)
elif exists:
new_conf = old_conf # > 50 稳定不变
else:
new_conf = 10 # 新 OCR 记录
# ── 价格区间 ──
if price > 0:
new_pc = (pc if exists else 0) + 1
new_avg = ((old_avg * (new_pc - 1)) + price) / new_pc if exists else price
new_min = min(old_min, price) if exists else price
new_max = max(old_max, price) if exists else price
else:
new_avg = old_avg if exists else 0
new_min = old_min if exists else 0
new_max = old_max if exists else 0
new_pc = pc if exists else 0
# ── 写入 ──
conn = self._connect()
try:
cursor = conn.execute(
"SELECT confidence, usage_count FROM products WHERE barcode = ?",
(barcode,)
)
row = cursor.fetchone()
if row is None:
# 新记录
conf = {'template': 100, 'user_confirmed': 90}.get(source, 50)
if not exists:
conn.execute(
"INSERT INTO products "
"(barcode, name, specification, unit, price, source, confidence, usage_count, last_seen, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)",
(barcode, name, spec, unit, price, source, conf, now, now)
)
"INSERT INTO products (barcode, name, specification, unit, price, "
"source, confidence, usage_count, last_seen, updated_at, "
"avg_price, min_price, max_price, price_count) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(barcode, name, spec, unit, price, source, new_conf, 1, now, now,
new_avg, new_min, new_max, new_pc))
log = f"记忆库新增: {barcode} {name} 源={source} 可信度={new_conf}"
else:
old_conf, old_count = row
new_count = old_count + 1
if source == 'template':
new_conf = 100
elif source == 'user_confirmed':
new_conf = 90
else: # ocr
new_conf = min(80, old_conf + 10) if old_conf < 80 else old_conf
if source in ('template', 'user_confirmed'):
# 高权威来源:全字段覆盖
# 高可信度源全字段覆盖;低可信度仅填空
if source in ('template', 'user_confirmed') or new_conf > 50:
conn.execute(
"UPDATE products SET name=?, specification=?, unit=?, price=?, "
"source=?, confidence=?, usage_count=?, last_seen=?, updated_at=? "
"WHERE barcode=?",
(name, spec, unit, price, source, new_conf, new_count, now, now, barcode)
)
"source=?, confidence=?, usage_count=?, last_seen=?, updated_at=?, "
"avg_price=?, min_price=?, max_price=?, price_count=? WHERE barcode=?",
(name or old_name, spec or old_spec, unit or old_unit, price,
source, new_conf, new_count, now, now,
new_avg, new_min, new_max, new_pc, barcode))
else:
# OCR:仅填充空字段,不更新 price
conn.execute(
"UPDATE products SET "
"name = CASE WHEN name='' THEN ? ELSE name END, "
"specification = CASE WHEN specification='' THEN ? ELSE specification END, "
"unit = CASE WHEN unit='' THEN ? ELSE unit END, "
"source=?, confidence=?, usage_count=?, last_seen=?, updated_at=? "
"WHERE barcode=?",
(name, spec, unit, source, new_conf, new_count, now, now, barcode)
)
"name=CASE WHEN name='' THEN ? ELSE name END, "
"specification=CASE WHEN specification='' THEN ? ELSE specification END, "
"unit=CASE WHEN unit='' THEN ? ELSE unit END, "
"source=?, confidence=?, usage_count=?, last_seen=?, updated_at=?, "
"avg_price=?, min_price=?, max_price=?, price_count=? WHERE barcode=?",
(name, spec, unit, source, new_conf, new_count, now, now,
new_avg, new_min, new_max, new_pc, barcode))
log = f"记忆库更新: {barcode} 可信度{old_conf if exists else 0}{new_conf}"
if price > 0:
log += f" 均价{new_avg:.4f}({new_pc}次)"
conn.commit()
# 更新内存 dict(如果传入了)
if memory is not None and barcode in memory:
memory[barcode].update({
'confidence': new_conf, 'usage_count': new_count,
'avg_price': new_avg, 'min_price': new_min,
'max_price': new_max, 'price_count': new_pc,
'name': name or old_name,
'specification': spec or old_spec,
'unit': unit or old_unit,
})
if add_log:
add_log(log)
return log
finally:
conn.close()
def learn_from_products(self, products: List[Dict], source: str = 'ocr') -> int:
"""批量学习,返回更新条数"""
def learn_from_products(self, products: List[Dict], source: str = 'ocr',
add_log: Callable = None) -> int:
"""批量学习 — 先批量预加载,再逐条处理,返回更新条数"""
barcodes = [str(p.get('barcode', '')) for p in products if p.get('barcode')]
memory = self.load_batch(barcodes)
count = 0
for p in products:
try:
self.learn_from_product(p, source)
count += 1
result = self.learn_from_product(p, source, memory=memory, add_log=add_log)
if result:
count += 1
except Exception as e:
logger.warning(f"学习商品记忆失败: {e}")
return count
# ══════════════════════════════════════════════════════════════
# 记忆辅助 — OCR 补全
# ══════════════════════════════════════════════════════════════
def _price_anomaly(self, product: Dict, mem: Dict) -> bool:
"""价格异常:> 2倍偏差"""
price = float(product.get('price', 0))
avg = mem.get('avg_price', 0)
if not price or not avg:
return False
return price > avg * 2 or price < avg * 0.5
def fill_from_memory(self, barcode: str, ocr_result: Dict,
memory: Dict[str, Dict] = None) -> Tuple[Dict, str]:
"""用记忆库补全 OCR 缺失字段。返回 (补全后的dict, 日志字符串)"""
if memory:
mem = memory.get(barcode)
else:
mem = self.get_memory(barcode)
if not mem or mem.get('confidence', 0) < 10:
return ocr_result, ""
logs = []
result = dict(ocr_result)
conf = mem.get('confidence', 0)
has_spec = result.get('specification')
has_unit = result.get('unit')
price = float(result.get('price', 0))
if conf > 50 and not has_spec and mem.get('specification'):
result['specification'] = mem['specification']
logs.append(f"规格补全(可信{conf}): {barcode}{mem['specification']}")
elif not has_spec and mem.get('specification') and self._price_anomaly(result, mem):
result['specification'] = mem['specification']
logs.append(f"价格异常→规格补全: {barcode} 本次{price:.2f} vs 均价{mem['avg_price']:.2f}{mem['specification']}")
if conf > 50 and not has_unit and mem.get('unit'):
result['unit'] = mem['unit']
logs.append(f"单位补全(可信{conf}): {barcode}{mem['unit']}")
elif not has_unit and mem.get('unit') and self._price_anomaly(result, mem):
result['unit'] = mem['unit']
logs.append(f"价格异常→单位补全: {barcode}{mem['unit']}")
return result, "; ".join(logs)
def price_warning(self, barcode: str, price: float,
memory: Dict[str, Dict] = None) -> Optional[str]:
"""价格预警。> 50% 偏差告警"""
if memory:
mem = memory.get(barcode)
else:
mem = self.get_memory(barcode)
if not mem or not mem.get('avg_price'):
return None
avg = mem['avg_price']
min_p = mem.get('min_price', avg)
max_p = mem.get('max_price', avg)
pc = mem.get('price_count', 0)
if price > avg * 1.5 or price < avg * 0.5:
return (f"单价预警: {barcode} 本次{price:.4f}元 vs "
f"历史均价{avg:.4f} (范围{min_p:.4f}~{max_p:.4f}, {pc}次)")
return None
# ══════════════════════════════════════════════════════════════
# 手动编辑
# ══════════════════════════════════════════════════════════════
def update_memory(self, barcode: str, fields: Dict) -> bool:
"""手动编辑记录(UI 用,source→user_confirmed, confidence→90"""
barcode = str(barcode).strip()
if not barcode:
return False
allowed = {'name', 'specification', 'unit', 'price'}
allowed = {'name', 'specification', 'unit', 'price', 'confidence'}
updates = {k: v for k, v in fields.items() if k in allowed}
if not updates:
return False
now = datetime.now().isoformat()
set_clause = ', '.join(f"{k}=?" for k in updates)
values = list(updates.values())
extra_sql = ", source='user_confirmed'"
if 'confidence' not in updates:
extra_sql += ", confidence=90"
conn = self._connect()
try:
conn.execute(
f"UPDATE products SET {set_clause}, source='user_confirmed', confidence=90, "
"updated_at=? WHERE barcode=?",
values + [now, barcode]
)
f"UPDATE products SET {set_clause}{extra_sql}, updated_at=? WHERE barcode=?",
values + [now, barcode])
conn.commit()
return conn.total_changes > 0
finally:
conn.close()
def delete_memory(self, barcode: str) -> bool:
"""删除记录"""
conn = self._connect()
try:
conn.execute("DELETE FROM products WHERE barcode=?", (str(barcode).strip(),))
@@ -409,51 +514,39 @@ class ProductDatabase:
finally:
conn.close()
# ── 云端同步 ──────────────────────────────────────────────
# ══════════════════════════════════════════════════════════════
# 云端同步
# ══════════════════════════════════════════════════════════════
def export_for_sync(self) -> Dict:
"""导出全部记录为 JSON-serializable dict(按条码索引)"""
conn = self._connect()
try:
cursor = conn.execute(
"SELECT barcode, name, specification, unit, price, source, "
"confidence, usage_count, last_seen FROM products"
)
"confidence, usage_count, last_seen, avg_price, min_price, max_price, price_count "
"FROM products")
result = {}
for row in cursor.fetchall():
result[row[0]] = {
'name': row[1],
'specification': row[2],
'unit': row[3],
'price': row[4],
'source': row[5],
'confidence': row[6],
'usage_count': row[7],
'last_seen': row[8],
'name': row[1], 'specification': row[2], 'unit': row[3],
'price': row[4], 'source': row[5], 'confidence': row[6],
'usage_count': row[7], 'last_seen': row[8],
'avg_price': row[9], 'min_price': row[10],
'max_price': row[11], 'price_count': row[12],
}
return result
finally:
conn.close()
def import_from_sync(self, data: Dict) -> int:
"""从云端 JSON 导入,高置信度优先合并
Args:
data: {barcode: {name, specification, unit, price, source, confidence, ...}}
Returns:
导入/更新的记录数
"""
now = datetime.now().isoformat()
count = 0
conn = self._connect()
try:
for barcode, info in data.items():
barcode = str(barcode).strip()
if not barcode:
continue
name = str(info.get('name', ''))
spec = str(info.get('specification', ''))
unit = str(info.get('unit', ''))
@@ -462,69 +555,55 @@ class ProductDatabase:
remote_conf = int(info.get('confidence', 50))
remote_count = int(info.get('usage_count', 1))
remote_seen = str(info.get('last_seen', now))
remote_avg = float(info.get('avg_price', price))
remote_min = float(info.get('min_price', price))
remote_max = float(info.get('max_price', price))
remote_pc = int(info.get('price_count', 1))
cursor = conn.execute(
"SELECT confidence FROM products WHERE barcode = ?",
(barcode,)
)
row = cursor.fetchone()
row = conn.execute("SELECT confidence FROM products WHERE barcode=?",
(barcode,)).fetchone()
if row is None:
# 新记录,直接插入
conn.execute(
"INSERT INTO products "
"(barcode, name, specification, unit, price, source, confidence, usage_count, last_seen, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(barcode, name, spec, unit, price, remote_source, remote_conf, remote_count, remote_seen, now)
)
"INSERT INTO products (barcode, name, specification, unit, price, "
"source, confidence, usage_count, last_seen, updated_at, "
"avg_price, min_price, max_price, price_count) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(barcode, name, spec, unit, price, remote_source, remote_conf,
remote_count, remote_seen, now,
remote_avg, remote_min, remote_max, remote_pc))
count += 1
else:
local_conf = row[0]
if remote_conf > local_conf:
# 云端置信度更高,覆盖
conn.execute(
"UPDATE products SET name=?, specification=?, unit=?, price=?, "
"source=?, confidence=?, usage_count=?, last_seen=?, updated_at=? "
"WHERE barcode=?",
(name, spec, unit, price, remote_source, remote_conf, remote_count, remote_seen, now, barcode)
)
"source=?, confidence=?, usage_count=?, last_seen=?, updated_at=?, "
"avg_price=?, min_price=?, max_price=?, price_count=? WHERE barcode=?",
(name, spec, unit, price, remote_source, remote_conf,
remote_count, remote_seen, now,
remote_avg, remote_min, remote_max, remote_pc, barcode))
count += 1
elif remote_conf == local_conf:
# 置信度相同,填充空字段
conn.execute(
"UPDATE products SET "
"name = CASE WHEN name='' THEN ? ELSE name END, "
"specification = CASE WHEN specification='' THEN ? ELSE specification END, "
"unit = CASE WHEN unit='' THEN ? ELSE unit END, "
"usage_count = MAX(usage_count, ?), "
"updated_at=? WHERE barcode=?",
(name, spec, unit, remote_count, now, barcode)
)
"name=CASE WHEN name='' THEN ? ELSE name END, "
"specification=CASE WHEN specification='' THEN ? ELSE specification END, "
"unit=CASE WHEN unit='' THEN ? ELSE unit END, "
"usage_count=MAX(usage_count, ?), updated_at=? WHERE barcode=?",
(name, spec, unit, remote_count, now, barcode))
count += 1
conn.commit()
finally:
conn.close()
return count
def _export_memory_json(self, json_path: str = None) -> str:
"""导出记忆库为本地 JSON 文件
Args:
json_path: 输出路径,默认 data/product_memory.json
Returns:
写入的文件路径
"""
def _export_memory_json(self, json_path=None):
"""导出记忆库为 JSON(兼容旧代码调用)"""
import os as _os
if json_path is None:
json_path = os.path.join(os.path.dirname(self.db_path), 'product_memory.json')
json_path = _os.path.join(_os.path.dirname(self.db_path), 'product_memory.json')
data = self.export_for_sync()
os.makedirs(os.path.dirname(json_path), exist_ok=True)
_os.makedirs(_os.path.dirname(json_path), exist_ok=True)
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.debug(f"商品记忆库已导出: {json_path} ({len(data)} 条)")
return json_path
+63
View File
@@ -165,6 +165,69 @@ class GiteaSync:
existing_sha = self.file_exists(remote_path)
return self.push_file(remote_path, content, message, sha=existing_sha)
def push(self) -> str:
"""推送本地数据到云端:product_cache.json + barcode_mappings.json"""
import os
from pathlib import Path
project_root = Path(__file__).resolve().parent.parent.parent.parent
results = []
# 1. Product cache
from app.core.db.product_db import ProductDatabase
excel_source = str(project_root / "templates" / "商品资料.xlsx")
db_path = str(project_root / "data" / "product_cache.db")
product_db = ProductDatabase(db_path, excel_source)
product_data = product_db.export_for_sync()
sha = self.push_json("product_cache.json", product_data, "sync: update product cache")
results.append(f"product_cache: {'ok' if sha else 'skip'}")
# 2. Barcode mappings
barcode_path = project_root / "config" / "barcode_mappings.json"
if barcode_path.exists():
with open(barcode_path, "r", encoding="utf-8") as f:
barcode_data = json.loads(f.read())
sha = self.push_json("barcode_mappings.json", barcode_data, "sync: update barcode mappings")
results.append(f"barcode_mappings: {'ok' if sha else 'skip'}")
return "; ".join(results) if results else "无数据需要同步"
def pull(self) -> str:
"""从云端拉取数据并写入本地文件"""
import os
from pathlib import Path
project_root = Path(__file__).resolve().parent.parent.parent.parent
results = []
# 1. Product cache
result = self.pull_json("product_cache.json")
if result is not None:
data, sha = result
from app.core.db.product_db import ProductDatabase
excel_source = str(project_root / "templates" / "商品资料.xlsx")
db_path = str(project_root / "data" / "product_cache.db")
os.makedirs(os.path.dirname(db_path), exist_ok=True)
product_db = ProductDatabase(db_path, excel_source)
count = product_db.import_from_sync(data)
results.append(f"product_cache: 导入 {count}")
else:
results.append("product_cache: 云端无数据")
# 2. Barcode mappings
barcode_result = self.pull_json("barcode_mappings.json")
if barcode_result is not None:
barcode_data, sha = barcode_result
barcode_path = project_root / "config" / "barcode_mappings.json"
barcode_path.parent.mkdir(parents=True, exist_ok=True)
with open(barcode_path, "w", encoding="utf-8") as f:
json.dump(barcode_data, f, ensure_ascii=False, indent=2)
results.append(f"barcode_mappings: 已更新")
else:
results.append("barcode_mappings: 云端无数据")
return "; ".join(results) if results else "无数据需要同步"
@classmethod
def from_config(cls, config) -> Optional["GiteaSync"]:
"""从 ConfigManager 创建实例