0721ed099c
feat: shadcn主题 + 文件关系追踪 + 处理流程修复 前端: - 全站应用 shadcn/ui 主题 (zinc灰调, Inter字体, 1px细边框, 无硬阴影) - 重写 global.css / Dashboard.vue / Login.vue / Layout.vue 样式 - 新增文件处理子页面: 采购单(Orders), 表格处理(Tables), 图片处理(Images) - 侧边栏使用 el-sub-menu 组织文件处理导航 后端: - 新增 file_relations 表追踪 input→output→result 链路 - 新增 /files/relations, /files/stats/detailed 等关系查询API - 新增 ocr-single, excel-single, pipeline-single, merge-batch 端点 - 处理流程增加跳过逻辑 (已处理文件自动跳过) - 全流程不再自动合并, 合并仅在采购单页面手动触发 Bug修复: - TaskManager: asyncio.create_task 在线程池中无事件循环 → 改用 _schedule() 调度 - PurchaseOrderMerger 缺少 config 参数 → 传入 ConfigManager() - FastAPI regex= 弃用 → 改为 pattern= - merger.process() 接收 Path 对象 → 转为字符串 @
717 lines
24 KiB
Python
717 lines
24 KiB
Python
"""SQLite schema management and query functions for web backend.
|
|
|
|
Tables:
|
|
- http_logs: HTTP request/response logging
|
|
- task_history: Background task tracking
|
|
- file_metadata: File operation records
|
|
- file_relations: Input→Output→Result file chain tracking
|
|
|
|
All functions are synchronous; the async db_pool.DBPool wraps them via run_in_executor.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
_db_path = Path(__file__).resolve().parent.parent.parent.parent / "data" / "web_data.db"
|
|
|
|
|
|
def _ensure_db_dir():
|
|
"""Create the parent directory for the database if it doesn't exist."""
|
|
_db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def init_db():
|
|
"""Create tables and indexes if they don't exist."""
|
|
_ensure_db_dir()
|
|
conn = sqlite3.connect(_db_path)
|
|
try:
|
|
conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS http_logs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL,
|
|
method TEXT NOT NULL,
|
|
path TEXT NOT NULL,
|
|
status_code INTEGER,
|
|
duration_ms REAL,
|
|
user TEXT,
|
|
ip TEXT,
|
|
detail TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_http_logs_timestamp ON http_logs(timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_http_logs_status ON http_logs(status_code);
|
|
|
|
CREATE TABLE IF NOT EXISTS task_history (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
progress INTEGER DEFAULT 0,
|
|
message TEXT,
|
|
result_files TEXT,
|
|
error TEXT,
|
|
log_lines TEXT,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL,
|
|
completed_at TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_task_history_status ON task_history(status);
|
|
CREATE INDEX IF NOT EXISTS idx_task_history_created ON task_history(created_at);
|
|
|
|
CREATE TABLE IF NOT EXISTS file_metadata (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
filename TEXT NOT NULL,
|
|
directory TEXT NOT NULL,
|
|
size INTEGER,
|
|
action TEXT NOT NULL,
|
|
user TEXT,
|
|
timestamp TEXT NOT NULL,
|
|
task_id TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_file_metadata_timestamp ON file_metadata(timestamp);
|
|
|
|
CREATE TABLE IF NOT EXISTS file_relations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
input_image TEXT,
|
|
output_excel TEXT,
|
|
result_purchase TEXT,
|
|
status TEXT DEFAULT 'pending',
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_file_relations_input ON file_relations(input_image);
|
|
CREATE INDEX IF NOT EXISTS idx_file_relations_output ON file_relations(output_excel);
|
|
CREATE INDEX IF NOT EXISTS idx_file_relations_result ON file_relations(result_purchase);
|
|
""")
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def cleanup_old_records():
|
|
"""Delete records older than 30 days from all tables."""
|
|
cutoff = (datetime.now() - timedelta(days=30)).isoformat()
|
|
conn = sqlite3.connect(_db_path)
|
|
try:
|
|
conn.execute("DELETE FROM http_logs WHERE timestamp < ?", (cutoff,))
|
|
conn.execute("DELETE FROM task_history WHERE created_at < ?", (cutoff,))
|
|
conn.execute("DELETE FROM file_metadata WHERE timestamp < ?", (cutoff,))
|
|
conn.execute("DELETE FROM file_relations WHERE updated_at < ?", (cutoff,))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Insert functions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def insert_http_log(method: str, path: str, status_code: int = None,
|
|
duration_ms: float = None, user: str = None,
|
|
ip: str = None, detail: str = None):
|
|
"""Insert an HTTP log record."""
|
|
conn = sqlite3.connect(_db_path)
|
|
try:
|
|
conn.execute(
|
|
"INSERT INTO http_logs (timestamp, method, path, status_code, duration_ms, user, ip, detail) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(datetime.now().isoformat(), method, path, status_code,
|
|
duration_ms, user, ip, detail),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def insert_task(task_id: str, name: str, status: str = "pending",
|
|
progress: int = 0, message: str = None,
|
|
result_files: str = None, error: str = None,
|
|
log_lines: str = None):
|
|
"""Insert a new task record."""
|
|
now = datetime.now().isoformat()
|
|
conn = sqlite3.connect(_db_path)
|
|
try:
|
|
conn.execute(
|
|
"INSERT INTO task_history (id, name, status, progress, message, "
|
|
"result_files, error, log_lines, created_at, updated_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(task_id, name, status, progress, message,
|
|
result_files, error, log_lines, now, now),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def update_task(task_id: str, **kwargs):
|
|
"""Update specific fields of a task record.
|
|
|
|
Allowed fields: name, status, progress, message, result_files,
|
|
error, log_lines, completed_at.
|
|
"""
|
|
allowed = {
|
|
"name", "status", "progress", "message",
|
|
"result_files", "error", "log_lines", "completed_at",
|
|
}
|
|
fields = {k: v for k, v in kwargs.items() if k in allowed}
|
|
if not fields:
|
|
return
|
|
|
|
fields["updated_at"] = datetime.now().isoformat()
|
|
set_clause = ", ".join(f"{k} = ?" for k in fields)
|
|
values = list(fields.values()) + [task_id]
|
|
|
|
conn = sqlite3.connect(_db_path)
|
|
try:
|
|
conn.execute(
|
|
f"UPDATE task_history SET {set_clause} WHERE id = ?",
|
|
values,
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def insert_file_metadata(filename: str, directory: str, action: str,
|
|
size: int = None, user: str = None,
|
|
task_id: str = None):
|
|
"""Insert a file operation record."""
|
|
conn = sqlite3.connect(_db_path)
|
|
try:
|
|
conn.execute(
|
|
"INSERT INTO file_metadata (filename, directory, size, action, user, timestamp, task_id) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(filename, directory, size, action, user,
|
|
datetime.now().isoformat(), task_id),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Query functions — HTTP logs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def query_http_logs(method: str = None, path: str = None,
|
|
status_code: int = None,
|
|
start_time: str = None, end_time: str = None,
|
|
limit: int = 50, offset: int = 0) -> list[dict]:
|
|
"""Query HTTP logs with optional filters and pagination."""
|
|
conn = sqlite3.connect(_db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
clauses = []
|
|
params = []
|
|
if method:
|
|
clauses.append("method = ?")
|
|
params.append(method)
|
|
if path:
|
|
clauses.append("path LIKE ?")
|
|
params.append(f"%{path}%")
|
|
if status_code is not None:
|
|
clauses.append("status_code = ?")
|
|
params.append(status_code)
|
|
if start_time:
|
|
clauses.append("timestamp >= ?")
|
|
params.append(start_time)
|
|
if end_time:
|
|
clauses.append("timestamp <= ?")
|
|
params.append(end_time)
|
|
|
|
where = (" WHERE " + " AND ".join(clauses)) if clauses else ""
|
|
params.extend([limit, offset])
|
|
|
|
rows = conn.execute(
|
|
f"SELECT * FROM http_logs{where} ORDER BY id DESC LIMIT ? OFFSET ?",
|
|
params,
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def query_http_log_stats() -> dict:
|
|
"""Get HTTP log statistics for today."""
|
|
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).isoformat()
|
|
conn = sqlite3.connect(_db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT "
|
|
" COUNT(*) as total, "
|
|
" SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) as errors, "
|
|
" AVG(duration_ms) as avg_duration "
|
|
"FROM http_logs WHERE timestamp >= ?",
|
|
(today_start,),
|
|
).fetchone()
|
|
return dict(row) if row else {"total": 0, "errors": 0, "avg_duration": 0}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Query functions — Task history
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def query_task_history(status: str = None, name: str = None,
|
|
start_time: str = None, end_time: str = None,
|
|
limit: int = 50, offset: int = 0) -> list[dict]:
|
|
"""Query task history with optional filters and pagination.
|
|
|
|
Returns list of dicts with result_files and log_lines parsed from JSON.
|
|
"""
|
|
conn = sqlite3.connect(_db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
clauses = []
|
|
params = []
|
|
if status:
|
|
clauses.append("status = ?")
|
|
params.append(status)
|
|
if name:
|
|
clauses.append("name LIKE ?")
|
|
params.append(f"%{name}%")
|
|
if start_time:
|
|
clauses.append("created_at >= ?")
|
|
params.append(start_time)
|
|
if end_time:
|
|
clauses.append("created_at <= ?")
|
|
params.append(end_time)
|
|
|
|
where = (" WHERE " + " AND ".join(clauses)) if clauses else ""
|
|
params.extend([limit, offset])
|
|
|
|
rows = conn.execute(
|
|
f"SELECT * FROM task_history{where} ORDER BY created_at DESC LIMIT ? OFFSET ?",
|
|
params,
|
|
).fetchall()
|
|
|
|
results = []
|
|
for r in rows:
|
|
d = dict(r)
|
|
d["result_files"] = _parse_json_field(d.get("result_files"))
|
|
d["log_lines"] = _parse_json_field(d.get("log_lines"))
|
|
results.append(d)
|
|
return results
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def query_task_by_id(task_id: str) -> dict | None:
|
|
"""Get a single task by ID, with JSON fields parsed."""
|
|
conn = sqlite3.connect(_db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT * FROM task_history WHERE id = ?",
|
|
(task_id,),
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
d = dict(row)
|
|
d["result_files"] = _parse_json_field(d.get("result_files"))
|
|
d["log_lines"] = _parse_json_field(d.get("log_lines"))
|
|
return d
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def query_task_stats() -> dict:
|
|
"""Get task statistics: counts by status and total."""
|
|
conn = sqlite3.connect(_db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
rows = conn.execute(
|
|
"SELECT status, COUNT(*) as count FROM task_history GROUP BY status"
|
|
).fetchall()
|
|
stats = {r["status"]: r["count"] for r in rows}
|
|
stats["total"] = sum(stats.values())
|
|
return stats
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Query functions — File metadata
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def query_file_history(filename: str = None, directory: str = None,
|
|
action: str = None, task_id: str = None,
|
|
start_time: str = None, end_time: str = None,
|
|
limit: int = 50, offset: int = 0) -> list[dict]:
|
|
"""Query file operation history with optional filters and pagination."""
|
|
conn = sqlite3.connect(_db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
clauses = []
|
|
params = []
|
|
if filename:
|
|
clauses.append("filename LIKE ?")
|
|
params.append(f"%{filename}%")
|
|
if directory:
|
|
clauses.append("directory = ?")
|
|
params.append(directory)
|
|
if action:
|
|
clauses.append("action = ?")
|
|
params.append(action)
|
|
if task_id:
|
|
clauses.append("task_id = ?")
|
|
params.append(task_id)
|
|
if start_time:
|
|
clauses.append("timestamp >= ?")
|
|
params.append(start_time)
|
|
if end_time:
|
|
clauses.append("timestamp <= ?")
|
|
params.append(end_time)
|
|
|
|
where = (" WHERE " + " AND ".join(clauses)) if clauses else ""
|
|
params.extend([limit, offset])
|
|
|
|
rows = conn.execute(
|
|
f"SELECT * FROM file_metadata{where} ORDER BY id DESC LIMIT ? OFFSET ?",
|
|
params,
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def query_file_stats() -> list[dict]:
|
|
"""Get file storage statistics by scanning actual directories.
|
|
|
|
Returns a list of dicts with keys: directory, file_count, total_size.
|
|
Scans data/input/, data/output/, data/result/ relative to project root.
|
|
"""
|
|
project_root = Path(__file__).resolve().parent.parent.parent.parent
|
|
dirs_to_scan = ["data/input", "data/output", "data/result"]
|
|
stats = []
|
|
|
|
for rel_dir in dirs_to_scan:
|
|
dir_path = project_root / rel_dir
|
|
if not dir_path.exists():
|
|
stats.append({"directory": rel_dir, "file_count": 0, "total_size": 0})
|
|
continue
|
|
|
|
file_count = 0
|
|
total_size = 0
|
|
for f in dir_path.rglob("*"):
|
|
if f.is_file():
|
|
file_count += 1
|
|
total_size += f.stat().st_size
|
|
|
|
stats.append({
|
|
"directory": rel_dir,
|
|
"file_count": file_count,
|
|
"total_size": total_size,
|
|
})
|
|
|
|
return stats
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# File relations — CRUD
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def upsert_file_relation(input_image: str = None, output_excel: str = None,
|
|
result_purchase: str = None, status: str = 'pending'):
|
|
"""Insert or update a file relation.
|
|
|
|
Match strategy:
|
|
- If input_image provided, try to find existing row by input_image
|
|
- Else if output_excel provided, try to find by output_excel
|
|
- Otherwise insert new row.
|
|
"""
|
|
now = datetime.now().isoformat()
|
|
conn = sqlite3.connect(_db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
existing = None
|
|
if input_image:
|
|
existing = conn.execute(
|
|
"SELECT * FROM file_relations WHERE input_image = ?", (input_image,)
|
|
).fetchone()
|
|
if not existing and output_excel:
|
|
existing = conn.execute(
|
|
"SELECT * FROM file_relations WHERE output_excel = ?", (output_excel,)
|
|
).fetchone()
|
|
if not existing and result_purchase:
|
|
existing = conn.execute(
|
|
"SELECT * FROM file_relations WHERE result_purchase = ?", (result_purchase,)
|
|
).fetchone()
|
|
|
|
if existing:
|
|
updates = []
|
|
params = []
|
|
if input_image and not existing['input_image']:
|
|
updates.append("input_image = ?")
|
|
params.append(input_image)
|
|
if output_excel and not existing['output_excel']:
|
|
updates.append("output_excel = ?")
|
|
params.append(output_excel)
|
|
if result_purchase and not existing['result_purchase']:
|
|
updates.append("result_purchase = ?")
|
|
params.append(result_purchase)
|
|
if status:
|
|
updates.append("status = ?")
|
|
params.append(status)
|
|
updates.append("updated_at = ?")
|
|
params.append(now)
|
|
params.append(existing['id'])
|
|
conn.execute(
|
|
f"UPDATE file_relations SET {', '.join(updates)} WHERE id = ?",
|
|
params,
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"INSERT INTO file_relations (input_image, output_excel, result_purchase, status, created_at, updated_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
|
(input_image, output_excel, result_purchase, status, now, now),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def query_file_relations(view: str = None, status: str = None,
|
|
page: int = 1, page_size: int = 50) -> tuple[list[dict], int]:
|
|
"""Query file relations with optional view filter and pagination.
|
|
|
|
view='orders': only rows with result_purchase, sorted by result_purchase
|
|
view='tables': only rows with output_excel, sorted by output_excel
|
|
view='images': only rows with input_image, sorted by input_image
|
|
view=None: all rows
|
|
|
|
Returns (items, total).
|
|
"""
|
|
conn = sqlite3.connect(_db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
clauses = []
|
|
params = []
|
|
if view == 'orders':
|
|
clauses.append("result_purchase IS NOT NULL")
|
|
order_by = "result_purchase DESC"
|
|
elif view == 'tables':
|
|
clauses.append("output_excel IS NOT NULL")
|
|
order_by = "output_excel DESC"
|
|
elif view == 'images':
|
|
clauses.append("input_image IS NOT NULL")
|
|
order_by = "input_image DESC"
|
|
else:
|
|
order_by = "id DESC"
|
|
|
|
if status:
|
|
clauses.append("status = ?")
|
|
params.append(status)
|
|
|
|
where = (" WHERE " + " AND ".join(clauses)) if clauses else ""
|
|
|
|
# Count
|
|
row = conn.execute(
|
|
f"SELECT COUNT(*) as cnt FROM file_relations{where}", params
|
|
).fetchone()
|
|
total = row[0] if row else 0
|
|
|
|
# Page
|
|
offset = (page - 1) * page_size
|
|
params.extend([page_size, offset])
|
|
rows = conn.execute(
|
|
f"SELECT * FROM file_relations{where} ORDER BY {order_by} LIMIT ? OFFSET ?",
|
|
params,
|
|
).fetchall()
|
|
|
|
items = []
|
|
project_root = Path(__file__).resolve().parent.parent.parent.parent
|
|
for r in rows:
|
|
d = dict(r)
|
|
# Check file existence
|
|
if d.get('input_image'):
|
|
d['input_exists'] = (project_root / 'data' / 'input' / d['input_image']).exists()
|
|
else:
|
|
d['input_exists'] = False
|
|
if d.get('output_excel'):
|
|
d['output_exists'] = (project_root / 'data' / 'output' / d['output_excel']).exists()
|
|
else:
|
|
d['output_exists'] = False
|
|
if d.get('result_purchase'):
|
|
d['result_exists'] = (project_root / 'data' / 'result' / d['result_purchase']).exists()
|
|
else:
|
|
d['result_exists'] = False
|
|
items.append(d)
|
|
|
|
return items, total
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def delete_file_relations(ids: list[int]):
|
|
"""Delete file relation records by IDs."""
|
|
if not ids:
|
|
return
|
|
conn = sqlite3.connect(_db_path)
|
|
try:
|
|
placeholders = ','.join('?' * len(ids))
|
|
conn.execute(f"DELETE FROM file_relations WHERE id IN ({placeholders})", ids)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def sync_file_relations():
|
|
"""Scan input/output/result directories and rebuild file_relations table.
|
|
|
|
Matches files by stem:
|
|
- input: {stem}.jpg/.png/.bmp
|
|
- output: {stem}.xlsx or {stem}.xls
|
|
- result: 采购单_{stem}.xls
|
|
"""
|
|
project_root = Path(__file__).resolve().parent.parent.parent.parent
|
|
input_dir = project_root / 'data' / 'input'
|
|
output_dir = project_root / 'data' / 'output'
|
|
result_dir = project_root / 'data' / 'result'
|
|
|
|
image_exts = {'.jpg', '.jpeg', '.png', '.bmp'}
|
|
excel_exts = {'.xls', '.xlsx'}
|
|
|
|
# Collect files by stem
|
|
input_files = {} # stem -> filename
|
|
if input_dir.exists():
|
|
for f in input_dir.iterdir():
|
|
if f.is_file() and f.suffix.lower() in image_exts:
|
|
input_files[f.stem] = f.name
|
|
|
|
output_files = {}
|
|
if output_dir.exists():
|
|
for f in output_dir.iterdir():
|
|
if f.is_file() and f.suffix.lower() in excel_exts:
|
|
output_files[f.stem] = f.name
|
|
|
|
result_files = {}
|
|
if result_dir.exists():
|
|
for f in result_dir.iterdir():
|
|
if f.is_file() and f.suffix.lower() in excel_exts:
|
|
name = f.name
|
|
# Strip 采购单_ prefix for matching
|
|
if name.startswith('采购单_'):
|
|
stem = name[len('采购单_'):-len(f.suffix)]
|
|
elif name.startswith('合并采购单_'):
|
|
continue # Skip merged files
|
|
else:
|
|
stem = f.stem
|
|
result_files[stem] = name
|
|
|
|
# Build relations
|
|
all_stems = set(input_files.keys()) | set(output_files.keys()) | set(result_files.keys())
|
|
now = datetime.now().isoformat()
|
|
|
|
conn = sqlite3.connect(_db_path)
|
|
try:
|
|
# Clear existing and rebuild
|
|
conn.execute("DELETE FROM file_relations")
|
|
|
|
for stem in sorted(all_stems):
|
|
inp = input_files.get(stem)
|
|
out = output_files.get(stem)
|
|
res = result_files.get(stem)
|
|
|
|
if res:
|
|
status = 'done'
|
|
elif out:
|
|
status = 'ocr_done'
|
|
else:
|
|
status = 'pending'
|
|
|
|
conn.execute(
|
|
"INSERT INTO file_relations (input_image, output_excel, result_purchase, status, created_at, updated_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
|
(inp, out, res, status, now, now),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def query_file_relations_stats() -> dict:
|
|
"""Get detailed file statistics for Dashboard.
|
|
|
|
Returns dict with:
|
|
- input_images: count of image files in input/
|
|
- output_excel: count of excel files in output/
|
|
- unprocessed_images: images without corresponding output
|
|
- unprocessed_excel: excel without corresponding result
|
|
- completed_results: purchase order files in result/
|
|
- total_processed: relations with status done/merged
|
|
"""
|
|
project_root = Path(__file__).resolve().parent.parent.parent.parent
|
|
input_dir = project_root / 'data' / 'input'
|
|
output_dir = project_root / 'data' / 'output'
|
|
result_dir = project_root / 'data' / 'result'
|
|
|
|
image_exts = {'.jpg', '.jpeg', '.png', '.bmp'}
|
|
excel_exts = {'.xls', '.xlsx'}
|
|
|
|
# Count files
|
|
input_images = 0
|
|
input_stems = set()
|
|
if input_dir.exists():
|
|
for f in input_dir.iterdir():
|
|
if f.is_file() and f.suffix.lower() in image_exts:
|
|
input_images += 1
|
|
input_stems.add(f.stem)
|
|
|
|
output_excel = 0
|
|
output_stems = set()
|
|
if output_dir.exists():
|
|
for f in output_dir.iterdir():
|
|
if f.is_file() and f.suffix.lower() in excel_exts:
|
|
output_excel += 1
|
|
output_stems.add(f.stem)
|
|
|
|
completed_results = 0
|
|
result_stems = set()
|
|
if result_dir.exists():
|
|
for f in result_dir.iterdir():
|
|
if f.is_file() and f.suffix.lower() in excel_exts:
|
|
if f.name.startswith('采购单_'):
|
|
completed_results += 1
|
|
stem = f.name[len('采购单_'):-len(f.suffix)]
|
|
result_stems.add(stem)
|
|
|
|
unprocessed_images = len(input_stems - output_stems)
|
|
unprocessed_excel = len(output_stems - result_stems)
|
|
|
|
# Count from relations table
|
|
conn = sqlite3.connect(_db_path)
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT COUNT(*) FROM file_relations WHERE status IN ('done', 'merged')"
|
|
).fetchone()
|
|
total_processed = row[0] if row else 0
|
|
finally:
|
|
conn.close()
|
|
|
|
return {
|
|
'input_images': input_images,
|
|
'output_excel': output_excel,
|
|
'unprocessed_images': unprocessed_images,
|
|
'unprocessed_excel': unprocessed_excel,
|
|
'completed_results': completed_results,
|
|
'total_processed': total_processed,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _parse_json_field(value):
|
|
"""Parse a JSON string to a Python list; return empty list on failure."""
|
|
if not value:
|
|
return []
|
|
try:
|
|
return json.loads(value)
|
|
except (json.JSONDecodeError, TypeError):
|
|
return []
|