From 81bafaf55798f43440363544d01ce69c5d0a7450 Mon Sep 17 00:00:00 2001 From: houhuan Date: Tue, 5 May 2026 19:37:10 +0800 Subject: [PATCH] fix: sync/barcode/memory overhaul + detailed logs + preview + result tracking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Sync: fix GiteaSync constructor + add push()/pull() methods - Barcode: two-tab layout matching GUI (mapping + special rules) - Memory: spec→specification unification, manual add, confidence/price tracking - Processing: TaskLogHandler captures detailed logs (barcode mapping, unit conversion) - Preview: fullscreen dialog for file preview (image/Excel) in Orders/Tables/Images - Detail: per-file log filtering in file pages - Tasks: result files now per-task, add copy path button - Config: reactive edited state + save_config fix - Dashboard: sync task isolation, log limit 10 Co-Authored-By: Claude Opus 4.7 --- app/config/settings.py | 25 +- app/core/db/product_db.py | 537 ++++++++++++++---------- app/core/utils/cloud_sync.py | 63 +++ config.ini | 2 +- web/backend/routers/barcodes.py | 74 +++- web/backend/routers/files.py | 51 ++- web/backend/routers/memory.py | 42 +- web/backend/routers/processing.py | 212 +++++++++- web/backend/routers/sync.py | 67 ++- web/backend/services/db_schema.py | 12 +- web/frontend/src/stores/processing.ts | 6 +- web/frontend/src/views/Barcodes.vue | 400 +++++++++++++----- web/frontend/src/views/Config.vue | 2 +- web/frontend/src/views/Dashboard.vue | 15 +- web/frontend/src/views/Memory.vue | 129 ++++-- web/frontend/src/views/Sync.vue | 4 +- web/frontend/src/views/Tasks.vue | 33 +- web/frontend/src/views/files/Images.vue | 144 ++++++- web/frontend/src/views/files/Orders.vue | 154 ++++++- web/frontend/src/views/files/Tables.vue | 140 +++++- 20 files changed, 1610 insertions(+), 502 deletions(-) diff --git a/app/config/settings.py b/app/config/settings.py index 4836fa1..17fad3b 100644 --- a/app/config/settings.py +++ b/app/config/settings.py @@ -105,23 +105,24 @@ class ConfigManager: def save_config(self) -> None: """保存配置到文件(API 密钥不写入文件)""" - try: - # 保存前临时清空 API 密钥,避免写入文件 - saved_keys = {} - for option in ('api_key', 'secret_key'): + # 保存前临时清空 API 密钥,避免写入文件 + saved_keys = {} + for option in ('api_key', 'secret_key'): + try: saved_keys[option] = self.config.get('API', option, fallback='') - self.config.set('API', option, '') + except Exception: + saved_keys[option] = '' + self.config.set('API', option, '') + try: with open(self.config_file, 'w', encoding='utf-8') as f: self.config.write(f) - - # 恢复内存中的值 - for option, val in saved_keys.items(): - self.config.set('API', option, val) - logger.info(f"配置已保存到: {self.config_file}") - except Exception as e: - logger.error(f"保存配置文件时出错: {e}") + finally: + # 恢复内存中的值(即使写入失败也恢复) + for option, val in saved_keys.items(): + if val: + self.config.set('API', option, val) def get(self, section: str, option: str, fallback: Any = None) -> Any: """获取配置值""" diff --git a/app/core/db/product_db.py b/app/core/db/product_db.py index ee46ecd..8ae2fb8 100644 --- a/app/core/db/product_db.py +++ b/app/core/db/product_db.py @@ -1,21 +1,18 @@ """ 商品资料 SQLite 数据库 + 商品记忆库 -将商品资料 (条码/名称/进货价/单位/规格) 存储在 SQLite 中, -支持从 Excel 自动导入、按条码快速查询、以及从 OCR 处理结果中学习。 - 记忆库功能: -- 处理完每单后自动学习商品数据 -- 下次处理时用记忆库补全 OCR 缺失/错误的字段 -- 通过置信度系统控制数据质量 -- 支持云端同步 +- 处理每步后自动学习商品数据(置信度+一致性加速) +- OCR 字段缺失时用记忆库补全 (conf > 50 直接采用) +- 价格异常检测:偏差 > 2倍触发补全,偏差 > 50% 记录预警 +- 批量预加载 → 内存操作 → 批量写回,保障性能 """ import os import json import sqlite3 from datetime import datetime -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple, Callable import pandas as pd @@ -40,26 +37,27 @@ class ProductDatabase: source TEXT DEFAULT 'template', confidence INTEGER DEFAULT 0, usage_count INTEGER DEFAULT 0, - last_seen TEXT + last_seen TEXT, + avg_price REAL DEFAULT 0.0, + min_price REAL DEFAULT 0.0, + max_price REAL DEFAULT 0.0, + price_count INTEGER DEFAULT 0 ); """ - # 新增列定义(用于迁移) _NEW_COLUMNS = { 'specification': "TEXT DEFAULT ''", 'source': "TEXT DEFAULT 'template'", 'confidence': 'INTEGER DEFAULT 0', 'usage_count': 'INTEGER DEFAULT 0', 'last_seen': 'TEXT', + 'avg_price': 'REAL DEFAULT 0.0', + 'min_price': 'REAL DEFAULT 0.0', + 'max_price': 'REAL DEFAULT 0.0', + 'price_count': 'INTEGER DEFAULT 0', } def __init__(self, db_path: str, excel_source: str): - """初始化数据库,如果 SQLite 不存在则自动从 Excel 导入 - - Args: - db_path: SQLite 数据库文件路径 - excel_source: 商品资料 Excel 文件路径 - """ self.db_path = db_path self.excel_source = excel_source self._ensure_db() @@ -68,16 +66,13 @@ class ProductDatabase: return sqlite3.connect(self.db_path) def _ensure_db(self): - """确保数据库存在,不存在则从 Excel 导入""" if os.path.exists(self.db_path): self._migrate_schema() return - if not os.path.exists(self.excel_source): - logger.warning(f"商品资料 Excel 不存在,跳过导入: {self.excel_source}") + logger.warning(f"商品资料 Excel 不存在: {self.excel_source}") self._create_empty_db() return - logger.info(f"首次运行,从 Excel 导入商品资料: {self.excel_source}") os.makedirs(os.path.dirname(self.db_path), exist_ok=True) self._create_empty_db() @@ -85,7 +80,6 @@ class ProductDatabase: logger.info(f"商品资料导入完成: {count} 条记录") def _create_empty_db(self): - """创建空数据库""" conn = self._connect() try: conn.executescript(self.SCHEMA) @@ -94,52 +88,35 @@ class ProductDatabase: conn.close() def _migrate_schema(self): - """幂等迁移:为已有数据库添加新列""" conn = self._connect() try: cursor = conn.execute("PRAGMA table_info(products)") existing_cols = {row[1] for row in cursor.fetchall()} - for col_name, col_type in self._NEW_COLUMNS.items(): if col_name not in existing_cols: conn.execute(f"ALTER TABLE products ADD COLUMN {col_name} {col_type}") logger.info(f"数据库迁移: 添加列 {col_name}") - conn.commit() finally: conn.close() + # ══════════════════════════════════════════════════════════════ + # 导入 + # ══════════════════════════════════════════════════════════════ + def import_from_excel(self, excel_path: str) -> int: - """从 Excel 导入商品资料(source=template, confidence=100) - - Args: - excel_path: Excel 文件路径 - - Returns: - 导入的记录数 - """ df = smart_read_excel(excel_path) if df is None or df.empty: - logger.warning(f"Excel 文件为空或读取失败: {excel_path}") return 0 - - # 查找条码列 barcode_col = ColumnMapper.find_column(list(df.columns), 'barcode') if not barcode_col: - logger.error(f"Excel 中未找到条码列: {list(df.columns)}") return 0 - - # 查找进货价列 price_col = ColumnMapper.find_column(list(df.columns), 'unit_price') - # 进货价可能没有标准别名,补充查找 if not price_col: for col in df.columns: - col_str = str(col).strip() - if '进货价' in col_str: + if '进货价' in str(col).strip(): price_col = col break - - # 查找名称列、单位列、规格列 (可选) name_col = ColumnMapper.find_column(list(df.columns), 'name') unit_col = ColumnMapper.find_column(list(df.columns), 'unit') spec_col = ColumnMapper.find_column(list(df.columns), 'specification') @@ -150,7 +127,6 @@ class ProductDatabase: barcode = str(row.get(barcode_col, '')).strip() if not barcode or barcode == 'nan': continue - price = 0.0 if price_col: try: @@ -159,43 +135,32 @@ class ProductDatabase: price = float(p) except (ValueError, TypeError): pass - name = str(row.get(name_col, '')).strip() if name_col else '' - if name == 'nan': - name = '' + if name == 'nan': name = '' unit = str(row.get(unit_col, '')).strip() if unit_col else '' - if unit == 'nan': - unit = '' + if unit == 'nan': unit = '' spec = str(row.get(spec_col, '')).strip() if spec_col else '' - if spec == 'nan': - spec = '' - - rows.append((barcode, name, price, unit, now, spec, 'template', 100, 0, now)) + if spec == 'nan': spec = '' + # template 源置信度 50 + rows.append((barcode, name, price, unit, now, spec, 'template', 50, 0, now, + price, price, price, 1 if price > 0 else 0)) if not rows: - logger.warning(f"Excel 中未解析出有效记录: {excel_path}") return 0 - conn = self._connect() try: conn.executemany( "INSERT OR REPLACE INTO products " - "(barcode, name, price, unit, updated_at, specification, source, confidence, usage_count, last_seen) " - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - rows - ) + "(barcode, name, price, unit, updated_at, specification, source, confidence, " + "usage_count, last_seen, avg_price, min_price, max_price, price_count) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + rows) conn.commit() finally: conn.close() - return len(rows) def reimport(self) -> int: - """重新从 Excel 导入(清空现有数据后重新导入) - - Returns: - 导入的记录数 - """ conn = self._connect() try: conn.execute("DELETE FROM products") @@ -204,203 +169,343 @@ class ProductDatabase: conn.close() return self.import_from_excel(self.excel_source) - # ── 基础查询(保持兼容) ────────────────────────────────── + # ══════════════════════════════════════════════════════════════ + # 查询 + # ══════════════════════════════════════════════════════════════ def get_price(self, barcode: str) -> Optional[float]: - """按条码查询进货价""" conn = self._connect() try: - cursor = conn.execute( - "SELECT price FROM products WHERE barcode = ?", - (str(barcode).strip(),) - ) - row = cursor.fetchone() - return row[0] if row else None + row = conn.execute("SELECT avg_price FROM products WHERE barcode=?", + (str(barcode).strip(),)).fetchone() + return row[0] if row and row[0] else None finally: conn.close() def get_prices(self, barcodes: List[str]) -> Dict[str, float]: - """批量查询进货价""" if not barcodes: return {} - conn = self._connect() try: placeholders = ','.join('?' * len(barcodes)) - cursor = conn.execute( - f"SELECT barcode, price FROM products WHERE barcode IN ({placeholders})", - [str(b).strip() for b in barcodes] - ) - return {row[0]: row[1] for row in cursor.fetchall()} + rows = conn.execute( + f"SELECT barcode, avg_price FROM products WHERE barcode IN ({placeholders})", + [str(b).strip() for b in barcodes]).fetchall() + return {r[0]: r[1] for r in rows if r[1]} finally: conn.close() def count(self) -> int: - """返回商品总数""" conn = self._connect() try: - cursor = conn.execute("SELECT COUNT(*) FROM products") - return cursor.fetchone()[0] + return conn.execute("SELECT COUNT(*) FROM products").fetchone()[0] finally: conn.close() - # ── 记忆库查询 ──────────────────────────────────────────── - def get_memory(self, barcode: str) -> Optional[Dict]: - """查询单条商品记忆""" conn = self._connect() conn.row_factory = sqlite3.Row try: - cursor = conn.execute( - "SELECT * FROM products WHERE barcode = ?", - (str(barcode).strip(),) - ) - row = cursor.fetchone() - if row: - return dict(row) - return None + row = conn.execute("SELECT * FROM products WHERE barcode=?", + (str(barcode).strip(),)).fetchone() + return dict(row) if row else None finally: conn.close() def get_memories(self, barcodes: List[str]) -> Dict[str, Dict]: - """批量查询商品记忆""" if not barcodes: return {} - conn = self._connect() conn.row_factory = sqlite3.Row try: placeholders = ','.join('?' * len(barcodes)) - cursor = conn.execute( + rows = conn.execute( f"SELECT * FROM products WHERE barcode IN ({placeholders})", - [str(b).strip() for b in barcodes] - ) - return {row['barcode']: dict(row) for row in cursor.fetchall()} + [str(b).strip() for b in barcodes]).fetchall() + return {r['barcode']: dict(r) for r in rows} finally: conn.close() def get_all_memories(self) -> List[Dict]: - """返回全部记录(UI 用)""" conn = self._connect() conn.row_factory = sqlite3.Row try: - cursor = conn.execute( - "SELECT * FROM products ORDER BY usage_count DESC, barcode" - ) - return [dict(row) for row in cursor.fetchall()] + return [dict(row) for row in + conn.execute("SELECT * FROM products ORDER BY usage_count DESC, barcode").fetchall()] finally: conn.close() - # ── 学习逻辑 ────────────────────────────────────────────── + # ══════════════════════════════════════════════════════════════ + # 批量预加载 — 性能核心 + # ══════════════════════════════════════════════════════════════ - def learn_from_product(self, product: Dict, source: str = 'ocr') -> None: - """从处理结果中学习单条商品数据 + def load_batch(self, barcodes: List[str]) -> Dict[str, Dict]: + """批量预加载条码记忆到 dict — 单次 SQL,后续纯内存操作""" + if not barcodes: + return {} + conn = self._connect() + conn.row_factory = sqlite3.Row + try: + placeholders = ','.join('?' * len(barcodes)) + rows = conn.execute( + f"SELECT * FROM products WHERE barcode IN ({placeholders})", + [str(b).strip() for b in barcodes]).fetchall() + return {r['barcode']: dict(r) for r in rows} + finally: + conn.close() - Args: - product: 商品字典 (barcode, name, specification, unit, price, ...) - source: 数据来源 ('template', 'ocr', 'user_confirmed') + # ══════════════════════════════════════════════════════════════ + # 学习逻辑 — 一致性加速 + 价格区间 + # ══════════════════════════════════════════════════════════════ + + def learn_from_product(self, product: Dict, source: str = 'ocr', + memory: Dict[str, Dict] = None, + add_log: Callable = None) -> Optional[str]: + """ + 从处理结果中学习,返回日志字符串。 + memory: 可选的预加载批量内存,传入则零 DB 查询。 """ barcode = str(product.get('barcode', '')).strip() if not barcode: - return + return None - now = datetime.now().isoformat() name = str(product.get('name', '')) spec = str(product.get('specification', '')) unit = str(product.get('unit', '')) price = float(product.get('price', 0)) + now = datetime.now().isoformat() + # 查现有记录(优先从内存查) + if memory is not None and barcode in memory: + row = memory[barcode] + old_name = row.get('name', '') + old_spec = row.get('specification', '') + old_unit = row.get('unit', '') + old_conf = row.get('confidence', 0) + old_count = row.get('usage_count', 0) + old_avg = row.get('avg_price', 0) or 0 + old_min = row.get('min_price') or price + old_max = row.get('max_price') or price + pc = row.get('price_count', 0) or 0 + exists = True + else: + conn = self._connect() + try: + cursor = conn.execute( + "SELECT name, specification, unit, confidence, usage_count, " + "avg_price, min_price, max_price, price_count FROM products WHERE barcode=?", + (barcode,)).fetchone() + finally: + conn.close() + if cursor is None: + exists = False + else: + old_name, old_spec, old_unit, old_conf, old_count, old_avg, old_min, old_max, pc = cursor + old_avg = old_avg or 0 + pc = pc or 0 + old_min = old_min if old_min is not None else price + old_max = old_max if old_max is not None else price + exists = True + + new_count = old_count + 1 if exists else 1 + + # ── 置信度 ── + if source == 'user_confirmed': + new_conf = 90 + elif source == 'template': + new_conf = 50 + elif exists and old_conf < 50: + # 一致性加速 + spec_match = bool(spec and old_spec and spec == old_spec) + unit_match = bool(unit and old_unit and unit == old_unit) + if spec_match and unit_match: + boost = 10 + elif unit_match: + boost = 5 + else: + boost = 3 + new_conf = min(50, old_conf + boost) + elif exists: + new_conf = old_conf # > 50 稳定不变 + else: + new_conf = 10 # 新 OCR 记录 + + # ── 价格区间 ── + if price > 0: + new_pc = (pc if exists else 0) + 1 + new_avg = ((old_avg * (new_pc - 1)) + price) / new_pc if exists else price + new_min = min(old_min, price) if exists else price + new_max = max(old_max, price) if exists else price + else: + new_avg = old_avg if exists else 0 + new_min = old_min if exists else 0 + new_max = old_max if exists else 0 + new_pc = pc if exists else 0 + + # ── 写入 ── conn = self._connect() try: - cursor = conn.execute( - "SELECT confidence, usage_count FROM products WHERE barcode = ?", - (barcode,) - ) - row = cursor.fetchone() - - if row is None: - # 新记录 - conf = {'template': 100, 'user_confirmed': 90}.get(source, 50) + if not exists: conn.execute( - "INSERT INTO products " - "(barcode, name, specification, unit, price, source, confidence, usage_count, last_seen, updated_at) " - "VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)", - (barcode, name, spec, unit, price, source, conf, now, now) - ) + "INSERT INTO products (barcode, name, specification, unit, price, " + "source, confidence, usage_count, last_seen, updated_at, " + "avg_price, min_price, max_price, price_count) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (barcode, name, spec, unit, price, source, new_conf, 1, now, now, + new_avg, new_min, new_max, new_pc)) + log = f"记忆库新增: {barcode} {name} 源={source} 可信度={new_conf}" else: - old_conf, old_count = row - new_count = old_count + 1 - - if source == 'template': - new_conf = 100 - elif source == 'user_confirmed': - new_conf = 90 - else: # ocr - new_conf = min(80, old_conf + 10) if old_conf < 80 else old_conf - - if source in ('template', 'user_confirmed'): - # 高权威来源:全字段覆盖 + # 高可信度源全字段覆盖;低可信度仅填空 + if source in ('template', 'user_confirmed') or new_conf > 50: conn.execute( "UPDATE products SET name=?, specification=?, unit=?, price=?, " - "source=?, confidence=?, usage_count=?, last_seen=?, updated_at=? " - "WHERE barcode=?", - (name, spec, unit, price, source, new_conf, new_count, now, now, barcode) - ) + "source=?, confidence=?, usage_count=?, last_seen=?, updated_at=?, " + "avg_price=?, min_price=?, max_price=?, price_count=? WHERE barcode=?", + (name or old_name, spec or old_spec, unit or old_unit, price, + source, new_conf, new_count, now, now, + new_avg, new_min, new_max, new_pc, barcode)) else: - # OCR:仅填充空字段,不更新 price conn.execute( "UPDATE products SET " - "name = CASE WHEN name='' THEN ? ELSE name END, " - "specification = CASE WHEN specification='' THEN ? ELSE specification END, " - "unit = CASE WHEN unit='' THEN ? ELSE unit END, " - "source=?, confidence=?, usage_count=?, last_seen=?, updated_at=? " - "WHERE barcode=?", - (name, spec, unit, source, new_conf, new_count, now, now, barcode) - ) + "name=CASE WHEN name='' THEN ? ELSE name END, " + "specification=CASE WHEN specification='' THEN ? ELSE specification END, " + "unit=CASE WHEN unit='' THEN ? ELSE unit END, " + "source=?, confidence=?, usage_count=?, last_seen=?, updated_at=?, " + "avg_price=?, min_price=?, max_price=?, price_count=? WHERE barcode=?", + (name, spec, unit, source, new_conf, new_count, now, now, + new_avg, new_min, new_max, new_pc, barcode)) + log = f"记忆库更新: {barcode} 可信度{old_conf if exists else 0}→{new_conf}" + if price > 0: + log += f" 均价{new_avg:.4f}({new_pc}次)" + conn.commit() + + # 更新内存 dict(如果传入了) + if memory is not None and barcode in memory: + memory[barcode].update({ + 'confidence': new_conf, 'usage_count': new_count, + 'avg_price': new_avg, 'min_price': new_min, + 'max_price': new_max, 'price_count': new_pc, + 'name': name or old_name, + 'specification': spec or old_spec, + 'unit': unit or old_unit, + }) + + if add_log: + add_log(log) + return log finally: conn.close() - def learn_from_products(self, products: List[Dict], source: str = 'ocr') -> int: - """批量学习,返回更新条数""" + def learn_from_products(self, products: List[Dict], source: str = 'ocr', + add_log: Callable = None) -> int: + """批量学习 — 先批量预加载,再逐条处理,返回更新条数""" + barcodes = [str(p.get('barcode', '')) for p in products if p.get('barcode')] + memory = self.load_batch(barcodes) count = 0 for p in products: try: - self.learn_from_product(p, source) - count += 1 + result = self.learn_from_product(p, source, memory=memory, add_log=add_log) + if result: + count += 1 except Exception as e: logger.warning(f"学习商品记忆失败: {e}") return count + # ══════════════════════════════════════════════════════════════ + # 记忆辅助 — OCR 补全 + # ══════════════════════════════════════════════════════════════ + + def _price_anomaly(self, product: Dict, mem: Dict) -> bool: + """价格异常:> 2倍偏差""" + price = float(product.get('price', 0)) + avg = mem.get('avg_price', 0) + if not price or not avg: + return False + return price > avg * 2 or price < avg * 0.5 + + def fill_from_memory(self, barcode: str, ocr_result: Dict, + memory: Dict[str, Dict] = None) -> Tuple[Dict, str]: + """用记忆库补全 OCR 缺失字段。返回 (补全后的dict, 日志字符串)""" + if memory: + mem = memory.get(barcode) + else: + mem = self.get_memory(barcode) + + if not mem or mem.get('confidence', 0) < 10: + return ocr_result, "" + + logs = [] + result = dict(ocr_result) + conf = mem.get('confidence', 0) + + has_spec = result.get('specification') + has_unit = result.get('unit') + price = float(result.get('price', 0)) + + if conf > 50 and not has_spec and mem.get('specification'): + result['specification'] = mem['specification'] + logs.append(f"规格补全(可信{conf}): {barcode} → {mem['specification']}") + elif not has_spec and mem.get('specification') and self._price_anomaly(result, mem): + result['specification'] = mem['specification'] + logs.append(f"价格异常→规格补全: {barcode} 本次{price:.2f} vs 均价{mem['avg_price']:.2f} → {mem['specification']}") + + if conf > 50 and not has_unit and mem.get('unit'): + result['unit'] = mem['unit'] + logs.append(f"单位补全(可信{conf}): {barcode} → {mem['unit']}") + elif not has_unit and mem.get('unit') and self._price_anomaly(result, mem): + result['unit'] = mem['unit'] + logs.append(f"价格异常→单位补全: {barcode} → {mem['unit']}") + + return result, "; ".join(logs) + + def price_warning(self, barcode: str, price: float, + memory: Dict[str, Dict] = None) -> Optional[str]: + """价格预警。> 50% 偏差告警""" + if memory: + mem = memory.get(barcode) + else: + mem = self.get_memory(barcode) + if not mem or not mem.get('avg_price'): + return None + avg = mem['avg_price'] + min_p = mem.get('min_price', avg) + max_p = mem.get('max_price', avg) + pc = mem.get('price_count', 0) + if price > avg * 1.5 or price < avg * 0.5: + return (f"单价预警: {barcode} 本次{price:.4f}元 vs " + f"历史均价{avg:.4f} (范围{min_p:.4f}~{max_p:.4f}, {pc}次)") + return None + + # ══════════════════════════════════════════════════════════════ + # 手动编辑 + # ══════════════════════════════════════════════════════════════ + def update_memory(self, barcode: str, fields: Dict) -> bool: - """手动编辑记录(UI 用,source→user_confirmed, confidence→90)""" barcode = str(barcode).strip() if not barcode: return False - - allowed = {'name', 'specification', 'unit', 'price'} + allowed = {'name', 'specification', 'unit', 'price', 'confidence'} updates = {k: v for k, v in fields.items() if k in allowed} if not updates: return False - now = datetime.now().isoformat() set_clause = ', '.join(f"{k}=?" for k in updates) values = list(updates.values()) - + extra_sql = ", source='user_confirmed'" + if 'confidence' not in updates: + extra_sql += ", confidence=90" conn = self._connect() try: conn.execute( - f"UPDATE products SET {set_clause}, source='user_confirmed', confidence=90, " - "updated_at=? WHERE barcode=?", - values + [now, barcode] - ) + f"UPDATE products SET {set_clause}{extra_sql}, updated_at=? WHERE barcode=?", + values + [now, barcode]) conn.commit() return conn.total_changes > 0 finally: conn.close() def delete_memory(self, barcode: str) -> bool: - """删除记录""" conn = self._connect() try: conn.execute("DELETE FROM products WHERE barcode=?", (str(barcode).strip(),)) @@ -409,51 +514,39 @@ class ProductDatabase: finally: conn.close() - # ── 云端同步 ────────────────────────────────────────────── + # ══════════════════════════════════════════════════════════════ + # 云端同步 + # ══════════════════════════════════════════════════════════════ def export_for_sync(self) -> Dict: - """导出全部记录为 JSON-serializable dict(按条码索引)""" conn = self._connect() try: cursor = conn.execute( "SELECT barcode, name, specification, unit, price, source, " - "confidence, usage_count, last_seen FROM products" - ) + "confidence, usage_count, last_seen, avg_price, min_price, max_price, price_count " + "FROM products") result = {} for row in cursor.fetchall(): result[row[0]] = { - 'name': row[1], - 'specification': row[2], - 'unit': row[3], - 'price': row[4], - 'source': row[5], - 'confidence': row[6], - 'usage_count': row[7], - 'last_seen': row[8], + 'name': row[1], 'specification': row[2], 'unit': row[3], + 'price': row[4], 'source': row[5], 'confidence': row[6], + 'usage_count': row[7], 'last_seen': row[8], + 'avg_price': row[9], 'min_price': row[10], + 'max_price': row[11], 'price_count': row[12], } return result finally: conn.close() def import_from_sync(self, data: Dict) -> int: - """从云端 JSON 导入,高置信度优先合并 - - Args: - data: {barcode: {name, specification, unit, price, source, confidence, ...}} - - Returns: - 导入/更新的记录数 - """ now = datetime.now().isoformat() count = 0 - conn = self._connect() try: for barcode, info in data.items(): barcode = str(barcode).strip() if not barcode: continue - name = str(info.get('name', '')) spec = str(info.get('specification', '')) unit = str(info.get('unit', '')) @@ -462,69 +555,55 @@ class ProductDatabase: remote_conf = int(info.get('confidence', 50)) remote_count = int(info.get('usage_count', 1)) remote_seen = str(info.get('last_seen', now)) + remote_avg = float(info.get('avg_price', price)) + remote_min = float(info.get('min_price', price)) + remote_max = float(info.get('max_price', price)) + remote_pc = int(info.get('price_count', 1)) - cursor = conn.execute( - "SELECT confidence FROM products WHERE barcode = ?", - (barcode,) - ) - row = cursor.fetchone() - + row = conn.execute("SELECT confidence FROM products WHERE barcode=?", + (barcode,)).fetchone() if row is None: - # 新记录,直接插入 conn.execute( - "INSERT INTO products " - "(barcode, name, specification, unit, price, source, confidence, usage_count, last_seen, updated_at) " - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - (barcode, name, spec, unit, price, remote_source, remote_conf, remote_count, remote_seen, now) - ) + "INSERT INTO products (barcode, name, specification, unit, price, " + "source, confidence, usage_count, last_seen, updated_at, " + "avg_price, min_price, max_price, price_count) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (barcode, name, spec, unit, price, remote_source, remote_conf, + remote_count, remote_seen, now, + remote_avg, remote_min, remote_max, remote_pc)) count += 1 else: local_conf = row[0] if remote_conf > local_conf: - # 云端置信度更高,覆盖 conn.execute( "UPDATE products SET name=?, specification=?, unit=?, price=?, " - "source=?, confidence=?, usage_count=?, last_seen=?, updated_at=? " - "WHERE barcode=?", - (name, spec, unit, price, remote_source, remote_conf, remote_count, remote_seen, now, barcode) - ) + "source=?, confidence=?, usage_count=?, last_seen=?, updated_at=?, " + "avg_price=?, min_price=?, max_price=?, price_count=? WHERE barcode=?", + (name, spec, unit, price, remote_source, remote_conf, + remote_count, remote_seen, now, + remote_avg, remote_min, remote_max, remote_pc, barcode)) count += 1 elif remote_conf == local_conf: - # 置信度相同,填充空字段 conn.execute( "UPDATE products SET " - "name = CASE WHEN name='' THEN ? ELSE name END, " - "specification = CASE WHEN specification='' THEN ? ELSE specification END, " - "unit = CASE WHEN unit='' THEN ? ELSE unit END, " - "usage_count = MAX(usage_count, ?), " - "updated_at=? WHERE barcode=?", - (name, spec, unit, remote_count, now, barcode) - ) + "name=CASE WHEN name='' THEN ? ELSE name END, " + "specification=CASE WHEN specification='' THEN ? ELSE specification END, " + "unit=CASE WHEN unit='' THEN ? ELSE unit END, " + "usage_count=MAX(usage_count, ?), updated_at=? WHERE barcode=?", + (name, spec, unit, remote_count, now, barcode)) count += 1 - conn.commit() finally: conn.close() - return count - def _export_memory_json(self, json_path: str = None) -> str: - """导出记忆库为本地 JSON 文件 - - Args: - json_path: 输出路径,默认 data/product_memory.json - - Returns: - 写入的文件路径 - """ + def _export_memory_json(self, json_path=None): + """导出记忆库为 JSON(兼容旧代码调用)""" + import os as _os if json_path is None: - json_path = os.path.join(os.path.dirname(self.db_path), 'product_memory.json') - + json_path = _os.path.join(_os.path.dirname(self.db_path), 'product_memory.json') data = self.export_for_sync() - os.makedirs(os.path.dirname(json_path), exist_ok=True) - + _os.makedirs(_os.path.dirname(json_path), exist_ok=True) with open(json_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) - - logger.debug(f"商品记忆库已导出: {json_path} ({len(data)} 条)") return json_path diff --git a/app/core/utils/cloud_sync.py b/app/core/utils/cloud_sync.py index ff929a4..6de3b9e 100644 --- a/app/core/utils/cloud_sync.py +++ b/app/core/utils/cloud_sync.py @@ -165,6 +165,69 @@ class GiteaSync: existing_sha = self.file_exists(remote_path) return self.push_file(remote_path, content, message, sha=existing_sha) + def push(self) -> str: + """推送本地数据到云端:product_cache.json + barcode_mappings.json""" + import os + from pathlib import Path + + project_root = Path(__file__).resolve().parent.parent.parent.parent + + results = [] + # 1. Product cache + from app.core.db.product_db import ProductDatabase + excel_source = str(project_root / "templates" / "商品资料.xlsx") + db_path = str(project_root / "data" / "product_cache.db") + product_db = ProductDatabase(db_path, excel_source) + product_data = product_db.export_for_sync() + sha = self.push_json("product_cache.json", product_data, "sync: update product cache") + results.append(f"product_cache: {'ok' if sha else 'skip'}") + + # 2. Barcode mappings + barcode_path = project_root / "config" / "barcode_mappings.json" + if barcode_path.exists(): + with open(barcode_path, "r", encoding="utf-8") as f: + barcode_data = json.loads(f.read()) + sha = self.push_json("barcode_mappings.json", barcode_data, "sync: update barcode mappings") + results.append(f"barcode_mappings: {'ok' if sha else 'skip'}") + + return "; ".join(results) if results else "无数据需要同步" + + def pull(self) -> str: + """从云端拉取数据并写入本地文件""" + import os + from pathlib import Path + + project_root = Path(__file__).resolve().parent.parent.parent.parent + + results = [] + # 1. Product cache + result = self.pull_json("product_cache.json") + if result is not None: + data, sha = result + from app.core.db.product_db import ProductDatabase + excel_source = str(project_root / "templates" / "商品资料.xlsx") + db_path = str(project_root / "data" / "product_cache.db") + os.makedirs(os.path.dirname(db_path), exist_ok=True) + product_db = ProductDatabase(db_path, excel_source) + count = product_db.import_from_sync(data) + results.append(f"product_cache: 导入 {count} 条") + else: + results.append("product_cache: 云端无数据") + + # 2. Barcode mappings + barcode_result = self.pull_json("barcode_mappings.json") + if barcode_result is not None: + barcode_data, sha = barcode_result + barcode_path = project_root / "config" / "barcode_mappings.json" + barcode_path.parent.mkdir(parents=True, exist_ok=True) + with open(barcode_path, "w", encoding="utf-8") as f: + json.dump(barcode_data, f, ensure_ascii=False, indent=2) + results.append(f"barcode_mappings: 已更新") + else: + results.append("barcode_mappings: 云端无数据") + + return "; ".join(results) if results else "无数据需要同步" + @classmethod def from_config(cls, config) -> Optional["GiteaSync"]: """从 ConfigManager 创建实例 diff --git a/config.ini b/config.ini index 589d7db..8c29384 100644 --- a/config.ini +++ b/config.ini @@ -27,7 +27,7 @@ skip_existing = true [File] allowed_extensions = .jpg,.jpeg,.png,.bmp excel_extension = .xlsx -max_file_size_mb = 4 +max_file_size_mb = 5 [Templates] purchase_order = 银豹-采购单模板.xls diff --git a/web/backend/routers/barcodes.py b/web/backend/routers/barcodes.py index ff9b92a..d107548 100644 --- a/web/backend/routers/barcodes.py +++ b/web/backend/routers/barcodes.py @@ -17,13 +17,22 @@ _mappings_file = _project_root / "config" / "barcode_mappings.json" class BarcodeMapping(BaseModel): barcode: str - target: str + target: Optional[str] = None description: Optional[str] = None + # Special rule fields + multiplier: Optional[int] = None + target_unit: Optional[str] = None + fixed_price: Optional[float] = None + specification: Optional[str] = None class BarcodeMappingUpdate(BaseModel): target: Optional[str] = None description: Optional[str] = None + multiplier: Optional[int] = None + target_unit: Optional[str] = None + fixed_price: Optional[float] = None + specification: Optional[str] = None def _load_mappings() -> Dict: @@ -51,12 +60,29 @@ async def list_barcodes( if isinstance(info, dict): target = info.get("map_to", info.get("target", "")) desc = info.get("description", "") + item = { + "barcode": barcode, + "target": target, + "description": desc, + "multiplier": info.get("multiplier"), + "target_unit": info.get("target_unit"), + "fixed_price": info.get("fixed_price"), + "specification": info.get("specification"), + } else: - target = str(info) - desc = "" - if search and search not in barcode and search not in target and search not in desc: + item = { + "barcode": barcode, + "target": str(info), + "description": "", + "multiplier": None, + "target_unit": None, + "fixed_price": None, + "specification": None, + } + s = search.lower() if search else "" + if s and s not in barcode.lower() and s not in item["target"].lower() and s not in (desc or "").lower(): continue - items.append({"barcode": barcode, "target": target, "description": desc}) + items.append(item) return {"items": items, "total": len(items)} @@ -82,9 +108,22 @@ async def create_barcode( mappings = _load_mappings() if body.barcode in mappings: raise HTTPException(409, f"条码 {body.barcode} 已存在") - mappings[body.barcode] = {"map_to": body.target, "description": body.description or ""} + + entry: dict = {"description": body.description or ""} + if body.multiplier: + entry["multiplier"] = body.multiplier + if body.target_unit: + entry["target_unit"] = body.target_unit + if body.fixed_price is not None: + entry["fixed_price"] = body.fixed_price + if body.specification: + entry["specification"] = body.specification + else: + entry["map_to"] = body.target or "" + + mappings[body.barcode] = entry _save_mappings(mappings) - return {"message": f"已创建映射 {body.barcode} → {body.target}"} + return {"message": f"已创建规则 {body.barcode}"} @router.put("/{barcode}") @@ -95,20 +134,35 @@ async def update_barcode( ): mappings = _load_mappings() if barcode not in mappings: - raise HTTPException(404, f"未找到条码映射 {barcode}") + raise HTTPException(404, f"未找到条码规则 {barcode}") existing = mappings[barcode] if not isinstance(existing, dict): existing = {"map_to": str(existing), "description": ""} - if body.target is not None: + # Check if this is a special rule (has multiplier) or being converted to one + if body.multiplier is not None: + # Convert to special rule: remove map_to, add multiplier fields + existing.pop("map_to", None) + existing["multiplier"] = body.multiplier + if body.target_unit is not None: + existing["target_unit"] = body.target_unit + if body.fixed_price is not None: + existing["fixed_price"] = body.fixed_price + if body.specification is not None: + existing["specification"] = body.specification + elif body.target is not None: + # Convert to simple mapping: remove special fields, add map_to + for k in ("multiplier", "target_unit", "fixed_price", "specification"): + existing.pop(k, None) existing["map_to"] = body.target + if body.description is not None: existing["description"] = body.description mappings[barcode] = existing _save_mappings(mappings) - return {"message": f"已更新映射 {barcode}"} + return {"message": f"已更新规则 {barcode}"} @router.delete("/{barcode}") diff --git a/web/backend/routers/files.py b/web/backend/routers/files.py index 3b527f0..e97dbc7 100644 --- a/web/backend/routers/files.py +++ b/web/backend/routers/files.py @@ -7,7 +7,7 @@ from pathlib import Path from typing import List, Optional from fastapi import APIRouter, HTTPException, UploadFile, File, Depends, Query, Request -from fastapi.responses import FileResponse +from fastapi.responses import FileResponse, JSONResponse from pydantic import BaseModel from ..auth.dependencies import get_current_user, get_current_user_flexible @@ -267,10 +267,13 @@ async def get_file_relations( status: Optional[str] = None, page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=200), + sort_by: Optional[str] = None, + sort_order: str = "desc", current_user: dict = Depends(get_current_user), ): """Query file relations with optional view filter.""" - items, total = query_file_relations(view=view, status=status, page=page, page_size=page_size) + items, total = query_file_relations(view=view, status=status, page=page, page_size=page_size, + sort_by=sort_by, sort_order=sort_order) return {"items": items, "total": total} @@ -299,3 +302,47 @@ async def delete_relations( """Delete file relation records by IDs.""" delete_file_relations(body.ids) return {"message": f"已删除 {len(body.ids)} 条关系记录"} + + +# --------------------------------------------------------------------------- +# File preview +# --------------------------------------------------------------------------- + +@router.get("/preview/{directory}/{filename:path}") +async def preview_file( + directory: str, + filename: str, + current_user: dict = Depends(get_current_user), +): + """Preview file content: images served directly, Excel returned as JSON grid.""" + # Security: only allow specific directories + if directory not in ("input", "output", "result"): + raise HTTPException(403, "不允许访问该目录") + + dir_map = {"input": _input_dir, "output": _output_dir, "result": _result_dir} + file_path = dir_map[directory] / filename + if not file_path.is_file(): + raise HTTPException(404, f"文件不存在: {filename}") + + ext = file_path.suffix.lower() + # Images: serve directly + if ext in ('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp'): + return FileResponse(str(file_path)) + + # Excel: read and return as JSON grid + if ext in ('.xls', '.xlsx'): + try: + import pandas as pd + from fastapi.responses import JSONResponse + df = pd.read_excel(str(file_path), header=None) + # Fill NaN with empty string + df = df.fillna('') + rows = [] + for _, row in df.iterrows(): + rows.append([str(v) if v != '' else '' for v in row]) + # Limit to first 200 rows + return JSONResponse({"type": "excel", "rows": rows[:200], "total_rows": len(rows)}) + except Exception as e: + raise HTTPException(500, f"读取文件失败: {e}") + + raise HTTPException(400, f"不支持预览的文件类型: {ext}") diff --git a/web/backend/routers/memory.py b/web/backend/routers/memory.py index 389c9e5..f04b4dc 100644 --- a/web/backend/routers/memory.py +++ b/web/backend/routers/memory.py @@ -18,18 +18,31 @@ _excel_source = str(_project_root / "templates" / "商品资料.xlsx") class MemoryItem(BaseModel): barcode: str name: str - spec: Optional[str] = None + specification: Optional[str] = None unit: Optional[str] = None price: Optional[float] = None + avg_price: Optional[float] = None + min_price: Optional[float] = None + max_price: Optional[float] = None + price_count: int = 0 confidence: int = 0 source: str = "ocr" last_used: Optional[str] = None use_count: int = 0 +class MemoryCreate(BaseModel): + barcode: str + name: Optional[str] = "" + specification: Optional[str] = None + unit: Optional[str] = None + price: Optional[float] = None + confidence: int = 50 + + class MemoryUpdate(BaseModel): name: Optional[str] = None - spec: Optional[str] = None + specification: Optional[str] = None unit: Optional[str] = None price: Optional[float] = None confidence: Optional[int] = None @@ -51,9 +64,13 @@ def _row_to_item(row: Dict) -> MemoryItem: return MemoryItem( barcode=row.get("barcode", ""), name=row.get("name", ""), - spec=row.get("spec"), + specification=row.get("specification"), unit=row.get("unit"), price=row.get("price"), + avg_price=row.get("avg_price"), + min_price=row.get("min_price"), + max_price=row.get("max_price"), + price_count=row.get("price_count", 0), confidence=row.get("confidence", 0), source=row.get("source", "ocr"), last_used=row.get("last_used"), @@ -99,6 +116,25 @@ async def get_memory( return product +@router.post("") +async def create_memory( + body: MemoryCreate, + current_user: dict = Depends(get_current_user), +): + db = _get_db() + existing = db.get_memory(body.barcode) + if existing: + raise HTTPException(409, f"条码 {body.barcode} 已存在,请使用编辑功能") + db.learn_from_product({ + "barcode": body.barcode, + "name": body.name or "", + "specification": body.specification or "", + "unit": body.unit or "", + "price": body.price or 0, + }, source="user_confirmed") + return {"message": f"已创建记忆记录 {body.barcode}"} + + @router.put("/{barcode}") async def update_memory( barcode: str, diff --git a/web/backend/routers/processing.py b/web/backend/routers/processing.py index e0a11d8..69f316c 100644 --- a/web/backend/routers/processing.py +++ b/web/backend/routers/processing.py @@ -1,8 +1,10 @@ """Processing endpoints: OCR, Excel conversion, merge, and full pipeline.""" import asyncio +import logging import os import sys +import threading import traceback from pathlib import Path from typing import Optional, List @@ -18,6 +20,66 @@ router = APIRouter(prefix="/api/processing", tags=["processing"]) _wrapper = ServiceWrapper(max_workers=3) +# ── Thread-safe log capture ── +_tlocal = threading.local() + + +class TaskLogHandler(logging.Handler): + """Capture all log records during task execution and forward to tm.add_log()""" + + def emit(self, record: logging.LogRecord): + ctx = getattr(_tlocal, 'ctx', None) + if ctx: + tm = ctx.get('tm') + task_id = ctx.get('task_id') + if tm and task_id: + msg = self.format(record) + if any(skip in msg for skip in ['DEBUG:', 'urllib3', 'charset_normalizer']): + return + tm.add_log(task_id, msg) + + +_log_handler = TaskLogHandler() +_log_handler.setLevel(logging.DEBUG) +_log_handler.setFormatter(logging.Formatter('%(message)s')) +_root_logger = logging.getLogger() +_configured = False + + +def _setup_log_capture(): + global _configured + if not _configured: + _root_logger.addHandler(_log_handler) + _configured = True + + +def _start_log_capture(tm, task_id: str): + _setup_log_capture() + _root_logger.setLevel(logging.DEBUG) + _tlocal.ctx = {'tm': tm, 'task_id': task_id} + + +def _stop_log_capture(): + _tlocal.ctx = None + + +def _add_result_file(name: str): + files = getattr(_tlocal, 'result_files', None) + if files is not None: + files.append(name) + + +def _wrap_with_capture(tm, task_id, func): + """Wrap a do_work function with log capture setup/teardown.""" + def wrapped(): + _start_log_capture(tm, task_id) + _tlocal.result_files = [] + try: + return func() + finally: + _stop_log_capture() + return wrapped + _project_root = Path(__file__).resolve().parent.parent.parent.parent _input_dir = _project_root / "data" / "input" _output_dir = _project_root / "data" / "output" @@ -74,6 +136,92 @@ def _run_background(coro): asyncio.ensure_future(coro) +def _run_background_with_log(coro, tm, task_id: str): + """Schedule a coroutine with log capture during execution.""" + + async def _wrapped(): + _start_log_capture(tm, task_id) + try: + await coro + finally: + _stop_log_capture() + + asyncio.ensure_future(_wrapped()) + + +def _get_product_db(): + from app.core.db.product_db import ProductDatabase + return ProductDatabase( + str(_project_root / 'data' / 'product_cache.db'), + str(_project_root / 'templates' / '商品资料.xlsx') + ) + + +def _learn_products_from_excel(excel_path: Path, tm, task_id, source: str = 'ocr'): + """从处理后的Excel文件学习商品数据到记忆库""" + try: + from app.core.utils.file_utils import smart_read_excel + df = smart_read_excel(str(excel_path)) + if df is None or df.empty: + return + except Exception: + return + + from app.core.handlers.column_mapper import ColumnMapper + barcode_col = ColumnMapper.find_column(list(df.columns), 'barcode') + if not barcode_col: + return + name_col = ColumnMapper.find_column(list(df.columns), 'name') + spec_col = ColumnMapper.find_column(list(df.columns), 'specification') + unit_col = ColumnMapper.find_column(list(df.columns), 'unit') + price_col = ColumnMapper.find_column(list(df.columns), 'unit_price') or ColumnMapper.find_column(list(df.columns), 'price') + + db = _get_product_db() + barcodes = [str(r.get(barcode_col, '')).strip() for _, r in df.iterrows() if str(r.get(barcode_col, '')).strip()] + memory = db.load_batch(barcodes) + + learned = 0 + for _, row in df.iterrows(): + barcode = str(row.get(barcode_col, '')).strip() + if not barcode or barcode == 'nan': + continue + price = 0.0 + if price_col: + try: + p = row.get(price_col) + if p is not None and str(p).strip() not in ('', 'nan', 'None'): + price = float(p) + except (ValueError, TypeError): + pass + + product = { + 'barcode': barcode, + 'name': str(row.get(name_col, '')).strip() if name_col else '', + 'specification': str(row.get(spec_col, '')).strip() if spec_col else '', + 'unit': str(row.get(unit_col, '')).strip() if unit_col else '', + 'price': price, + } + + # 1. 记忆辅助补全 + filled, fill_log = db.fill_from_memory(barcode, product, memory) + if fill_log: + tm.add_log(task_id, f" {fill_log}") + + # 2. 价格预警 + warn = db.price_warning(barcode, price, memory) + if warn: + tm.add_log(task_id, f" {warn}") + + # 3. 学习 + log = db.learn_from_product(filled, source=source, memory=memory, add_log=None) + if log: + tm.add_log(task_id, f" {log}") + learned += 1 + + if learned: + tm.add_log(task_id, f"[记忆库] 从 {excel_path.name} 学习了 {learned} 条商品数据") + + # --------------------------------------------------------------------------- # Batch endpoints # --------------------------------------------------------------------------- @@ -117,16 +265,23 @@ async def ocr_batch( for ext in ['.xlsx', '.xls']: candidate = _output_dir / f"{out_stem}{ext}" if candidate.exists(): - upsert_file_relation(input_image=f.name, output_excel=candidate.name, status='ocr_done') + upsert_file_relation(input_image=f.name, output_excel=candidate.name, status='ocr_done'); _add_result_file(candidate.name) + _add_result_file(candidate.name) break tm.add_log(task.id, f"[OCR] 完成: {f.name}") + # Learn products into memory from OCR output + out_file = _output_dir / f"{out_stem}.xlsx" + if not out_file.exists(): + out_file = _output_dir / f"{out_stem}.xls" + if out_file.exists(): + _learn_products_from_excel(out_file, tm, task.id, source='ocr') except Exception as e: tm.add_log(task.id, f"[OCR] 失败: {f.name} - {e}") - result_files = [f.name for f in _output_dir.iterdir() if f.is_file()] + result_files = list(getattr(_tlocal, 'result_files', [])) tm.set_completed(task.id, result_files=result_files, message=f"OCR完成,共处理 {total} 个文件") - await _wrapper.run_sync(do_work) + await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message="OCR任务已创建") @@ -162,7 +317,7 @@ async def process_excel( result_path = _result_dir / result_name if result_path.exists(): tm.add_log(task.id, f"[跳过] {f.name} 已处理过 → {result_name}") - upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done') + upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done'); _add_result_file(result_name) continue tm.update_progress(task.id, int((i / total) * 100), f"正在处理: {f.name}") @@ -171,15 +326,19 @@ async def process_excel( svc.process_excel(str(f)) # Find result file if result_path.exists(): - upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done') + upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done'); _add_result_file(result_name) + _add_result_file(result_name) tm.add_log(task.id, f"[Excel] 完成: {f.name}") + # Learn products into memory from purchase order result + if result_path.exists(): + _learn_products_from_excel(result_path, tm, task.id, source='ocr') except Exception as e: tm.add_log(task.id, f"[Excel] 失败: {f.name} - {e}") - result_files = [f.name for f in _result_dir.iterdir() if f.is_file()] + result_files = list(getattr(_tlocal, 'result_files', [])) tm.set_completed(task.id, result_files=result_files, message=f"Excel处理完成,共 {total} 个文件") - await _wrapper.run_sync(do_work) + await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message="Excel处理任务已创建") @@ -224,7 +383,7 @@ async def merge_orders( tm.add_log(task.id, f"[合并] 失败: {e}") tm.set_failed(task.id, str(e)) - await _wrapper.run_sync(do_work) + await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message="合并任务已创建") @@ -271,9 +430,14 @@ async def full_pipeline( for ext in ['.xlsx', '.xls']: candidate = _output_dir / f"{out_stem}{ext}" if candidate.exists(): - upsert_file_relation(input_image=f.name, output_excel=candidate.name, status='ocr_done') + upsert_file_relation(input_image=f.name, output_excel=candidate.name, status='ocr_done'); _add_result_file(candidate.name) break tm.add_log(task.id, f"[OCR] 完成: {f.name}") + out_file = _output_dir / f"{out_stem}.xlsx" + if not out_file.exists(): + out_file = _output_dir / f"{out_stem}.xls" + if out_file.exists(): + _learn_products_from_excel(out_file, tm, task.id, source='ocr') except Exception as e: tm.add_log(task.id, f"[OCR] 失败: {f.name} - {e}") @@ -292,7 +456,7 @@ async def full_pipeline( result_path = _result_dir / result_name if result_path.exists(): tm.add_log(task.id, f"[跳过] {f.name} 已处理过 → {result_name}") - upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done') + upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done'); _add_result_file(result_name) tm.update_progress(task.id, pct, f"跳过: {f.name}") continue @@ -300,19 +464,21 @@ async def full_pipeline( try: order_svc.process_excel(str(f)) if result_path.exists(): - upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done') + upsert_file_relation(output_excel=f.name, result_purchase=result_name, status='done'); _add_result_file(result_name) tm.add_log(task.id, f"[Excel] 完成: {f.name}") + if result_path.exists(): + _learn_products_from_excel(result_path, tm, task.id, source='ocr') except Exception as e: tm.add_log(task.id, f"[Excel] 失败: {f.name} - {e}") - result_files = [f.name for f in _result_dir.iterdir() if f.is_file()] + result_files = list(getattr(_tlocal, 'result_files', [])) tm.set_completed(task.id, result_files=result_files, message="全流程处理完成(不含合并)") except Exception as e: tb = traceback.format_exc() tm.add_log(task.id, f"[错误] {tb}") tm.set_failed(task.id, str(e)) - await _wrapper.run_sync(do_work) + await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message="全流程任务已创建") @@ -349,16 +515,16 @@ async def ocr_single( for ext in ['.xlsx', '.xls']: candidate = _output_dir / f"{stem}{ext}" if candidate.exists(): - upsert_file_relation(input_image=body.filename, output_excel=candidate.name, status='ocr_done') + upsert_file_relation(input_image=body.filename, output_excel=candidate.name, status='ocr_done'); _add_result_file(candidate.name) break tm.add_log(task.id, f"[OCR] 完成: {body.filename}") - result_files = [f.name for f in _output_dir.iterdir() if f.is_file()] + result_files = list(getattr(_tlocal, 'result_files', [])) tm.set_completed(task.id, result_files=result_files, message=f"OCR完成: {body.filename}") except Exception as e: tm.add_log(task.id, f"[OCR] 失败: {e}") tm.set_failed(task.id, str(e)) - await _wrapper.run_sync(do_work) + await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message=f"OCR任务已创建: {body.filename}") @@ -390,13 +556,13 @@ async def excel_single( if (_result_dir / result_name).exists(): upsert_file_relation(output_excel=body.filename, result_purchase=result_name, status='done') tm.add_log(task.id, f"[Excel] 完成: {body.filename}") - result_files = [f.name for f in _result_dir.iterdir() if f.is_file()] + result_files = list(getattr(_tlocal, 'result_files', [])) tm.set_completed(task.id, result_files=result_files, message=f"Excel处理完成: {body.filename}") except Exception as e: tm.add_log(task.id, f"[Excel] 失败: {e}") tm.set_failed(task.id, str(e)) - await _wrapper.run_sync(do_work) + await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message=f"Excel处理任务已创建: {body.filename}") @@ -432,13 +598,13 @@ async def pipeline_single( if out_xlsx.exists() or out_xls.exists(): out_name = out_xlsx.name if out_xlsx.exists() else out_xls.name tm.add_log(task.id, f"[跳过] 已OCR过 → {out_name}") - upsert_file_relation(input_image=body.filename, output_excel=out_name, status='ocr_done') + upsert_file_relation(input_image=body.filename, output_excel=out_name, status='ocr_done'); _add_result_file(out_name) else: ocr_svc.process_image(str(file_path)) for ext in ['.xlsx', '.xls']: candidate = _output_dir / f"{stem}{ext}" if candidate.exists(): - upsert_file_relation(input_image=body.filename, output_excel=candidate.name, status='ocr_done') + upsert_file_relation(input_image=body.filename, output_excel=candidate.name, status='ocr_done'); _add_result_file(candidate.name) break tm.add_log(task.id, f"[OCR] 完成") @@ -464,14 +630,14 @@ async def pipeline_single( else: tm.add_log(task.id, f"[错误] OCR未生成Excel文件") - result_files = [f.name for f in _result_dir.iterdir() if f.is_file()] + result_files = list(getattr(_tlocal, 'result_files', [])) tm.set_completed(task.id, result_files=result_files, message=f"全流程完成: {body.filename}") except Exception as e: tb = traceback.format_exc() tm.add_log(task.id, f"[错误] {tb}") tm.set_failed(task.id, str(e)) - await _wrapper.run_sync(do_work) + await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message=f"全流程任务已创建: {body.filename}") @@ -511,7 +677,7 @@ async def merge_batch( tm.add_log(task.id, f"[合并] 失败: {e}") tm.set_failed(task.id, str(e)) - await _wrapper.run_sync(do_work) + await _wrapper.run_sync(_wrap_with_capture(tm, task.id, do_work)) _run_background(_bg()) return TaskResponse(task_id=task.id, status="accepted", message="批量合并任务已创建") diff --git a/web/backend/routers/sync.py b/web/backend/routers/sync.py index 2f70935..4b5c449 100644 --- a/web/backend/routers/sync.py +++ b/web/backend/routers/sync.py @@ -1,5 +1,6 @@ """Cloud sync endpoints (Gitea-based).""" +from concurrent.futures import ThreadPoolExecutor from pathlib import Path from fastapi import APIRouter, HTTPException, Depends, Request @@ -23,7 +24,30 @@ def _get_sync(): from app.core.utils.cloud_sync import GiteaSync from app.config.settings import ConfigManager cfg = ConfigManager() - return GiteaSync(cfg) + return GiteaSync.from_config(cfg) + + +def _run_sync_in_thread(tm, task_id, action_name, sync_method): + """Run a blocking sync operation in a thread.""" + + def _run(): + try: + tm.update_progress(task_id, 10, "正在初始化同步...") + sync = _get_sync() + if sync is None: + tm.set_failed(task_id, "Gitea 配置不完整,请先在系统配置中设置 base_url/owner/repo/token") + return + tm.update_progress(task_id, 30, f"正在{action_name}文件...") + tm.add_log(task_id, f"[{action_name}] 开始{action_name}") + result = sync_method(sync) + tm.add_log(task_id, f"[{action_name}] 完成: {result}") + tm.set_completed(task_id, message=f"{action_name}完成") + except Exception as e: + tm.set_failed(task_id, str(e)) + + pool = ThreadPoolExecutor(max_workers=1) + pool.submit(_run) + pool.shutdown(wait=False) @router.post("/push", response_model=SyncResponse) @@ -33,21 +57,7 @@ async def sync_push( ): tm = request.state.task_manager task = tm.create_task("推送到云端") - - async def _run(): - try: - tm.update_progress(task.id, 10, "正在初始化同步...") - sync = _get_sync() - tm.update_progress(task.id, 30, "正在推送文件...") - tm.add_log(task.id, "[Push] 开始推送") - result = sync.push() - tm.add_log(task.id, f"[Push] 完成: {result}") - tm.set_completed(task.id, message="推送完成") - except Exception as e: - tm.set_failed(task.id, str(e)) - - import asyncio - asyncio.create_task(_run()) + _run_sync_in_thread(tm, task.id, "Push", lambda s: s.push()) return SyncResponse(task_id=task.id, status="accepted", message="推送任务已创建") @@ -58,21 +68,7 @@ async def sync_pull( ): tm = request.state.task_manager task = tm.create_task("从云端拉取") - - async def _run(): - try: - tm.update_progress(task.id, 10, "正在初始化同步...") - sync = _get_sync() - tm.update_progress(task.id, 30, "正在拉取文件...") - tm.add_log(task.id, "[Pull] 开始拉取") - result = sync.pull() - tm.add_log(task.id, f"[Pull] 完成: {result}") - tm.set_completed(task.id, message="拉取完成") - except Exception as e: - tm.set_failed(task.id, str(e)) - - import asyncio - asyncio.create_task(_run()) + _run_sync_in_thread(tm, task.id, "Pull", lambda s: s.pull()) return SyncResponse(task_id=task.id, status="accepted", message="拉取任务已创建") @@ -83,10 +79,11 @@ async def sync_status( try: from app.config.settings import ConfigManager cfg = ConfigManager() - base_url = cfg.get("Gitea", "base_url", fallback="") - owner = cfg.get("Gitea", "owner", fallback="") - repo = cfg.get("Gitea", "repo", fallback="") - enabled = bool(base_url and owner and repo) + base_url = cfg.get("Gitea", "base_url", fallback="").strip() + owner = cfg.get("Gitea", "owner", fallback="").strip() + repo = cfg.get("Gitea", "repo", fallback="").strip() + token = cfg.get("Gitea", "token", fallback="").strip() + enabled = bool(base_url and owner and repo and token) repo_url = f"{base_url}/{owner}/{repo}" if enabled else "" return {"enabled": enabled, "repo_url": repo_url} except Exception: diff --git a/web/backend/services/db_schema.py b/web/backend/services/db_schema.py index 5bc9073..dc559e8 100644 --- a/web/backend/services/db_schema.py +++ b/web/backend/services/db_schema.py @@ -475,7 +475,8 @@ def upsert_file_relation(input_image: str = None, output_excel: str = None, def query_file_relations(view: str = None, status: str = None, - page: int = 1, page_size: int = 50) -> tuple[list[dict], int]: + page: int = 1, page_size: int = 50, + sort_by: str = None, sort_order: str = "desc") -> tuple[list[dict], int]: """Query file relations with optional view filter and pagination. view='orders': only rows with result_purchase, sorted by result_purchase @@ -508,6 +509,13 @@ def query_file_relations(view: str = None, status: str = None, where = (" WHERE " + " AND ".join(clauses)) if clauses else "" + # Sort + if sort_by and sort_by in ('created_at', 'updated_at', 'input_image', 'output_excel', 'result_purchase', 'status'): + sort_col = sort_by + else: + sort_col = order_by.split()[0] if order_by else 'id' + sort_dir = 'DESC' if sort_order.lower() == 'desc' else 'ASC' + # Count row = conn.execute( f"SELECT COUNT(*) as cnt FROM file_relations{where}", params @@ -518,7 +526,7 @@ def query_file_relations(view: str = None, status: str = None, offset = (page - 1) * page_size params.extend([page_size, offset]) rows = conn.execute( - f"SELECT * FROM file_relations{where} ORDER BY {order_by} LIMIT ? OFFSET ?", + f"SELECT * FROM file_relations{where} ORDER BY {sort_col} {sort_dir} LIMIT ? OFFSET ?", params, ).fetchall() diff --git a/web/frontend/src/stores/processing.ts b/web/frontend/src/stores/processing.ts index b13e293..010fd21 100644 --- a/web/frontend/src/stores/processing.ts +++ b/web/frontend/src/stores/processing.ts @@ -17,6 +17,7 @@ export const useProcessingStore = defineStore('processing', () => { const currentTask = ref(null) const tasks = ref([]) const logs = ref([]) + const taskSource = ref('') let ws: WebSocket | null = null @@ -67,9 +68,10 @@ export const useProcessingStore = defineStore('processing', () => { } } - async function startTask(endpoint: string, body?: any) { + async function startTask(endpoint: string, body?: any, source: string = 'processing') { const res = await api.post(endpoint, body || {}) const taskId = res.data.task_id + taskSource.value = source currentTask.value = { task_id: taskId, name: res.data.message || '', @@ -90,5 +92,5 @@ export const useProcessingStore = defineStore('processing', () => { return res.data } - return { currentTask, tasks, logs, connectWebSocket, disconnectWebSocket, startTask, pollTaskStatus } + return { currentTask, tasks, logs, taskSource, connectWebSocket, disconnectWebSocket, startTask, pollTaskStatus } }) diff --git a/web/frontend/src/views/Barcodes.vue b/web/frontend/src/views/Barcodes.vue index 3bad38f..ed886df 100644 --- a/web/frontend/src/views/Barcodes.vue +++ b/web/frontend/src/views/Barcodes.vue @@ -7,98 +7,207 @@
- {{ items.length }} - 映射规则 + {{ mappingItems.length + specialItems.length }} + 总规则数 +
+ +
+
+ +
+
+ {{ mappingItems.length }} + 条码映射 +
+
+
+
+ +
+
+ {{ specialItems.length }} + 特殊处理
- +
-
-

条码映射管理

-
- - - - 刷新 - 新增映射 -
-
+ + + +
+ + + +
+ 刷新 + 新增映射 +
+
- - - - - - - - - - - - - - - + + + + + + + + + + + + + + + +
+ + + +
+ + + +
+ 刷新 + 新增特殊处理 +
+
+ + + + + + + + + + + + + + + + + + +
+
- - - - - + + + + + - + - + + + + + + + + + + + + + + + + + + + + + + + + + +