"""SQLite schema management and query functions for web backend. Tables: - http_logs: HTTP request/response logging - task_history: Background task tracking - file_metadata: File operation records - file_relations: Input→Output→Result file chain tracking All functions are synchronous; the async db_pool.DBPool wraps them via run_in_executor. """ import json import os import sqlite3 from datetime import datetime, timedelta from pathlib import Path _db_path = Path(__file__).resolve().parent.parent.parent.parent / "data" / "web_data.db" def _ensure_db_dir(): """Create the parent directory for the database if it doesn't exist.""" _db_path.parent.mkdir(parents=True, exist_ok=True) def init_db(): """Create tables and indexes if they don't exist.""" _ensure_db_dir() conn = sqlite3.connect(_db_path) try: conn.executescript(""" CREATE TABLE IF NOT EXISTS http_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, method TEXT NOT NULL, path TEXT NOT NULL, status_code INTEGER, duration_ms REAL, user TEXT, ip TEXT, detail TEXT ); CREATE INDEX IF NOT EXISTS idx_http_logs_timestamp ON http_logs(timestamp); CREATE INDEX IF NOT EXISTS idx_http_logs_status ON http_logs(status_code); CREATE TABLE IF NOT EXISTS task_history ( id TEXT PRIMARY KEY, name TEXT NOT NULL, status TEXT NOT NULL, progress INTEGER DEFAULT 0, message TEXT, result_files TEXT, error TEXT, log_lines TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, completed_at TEXT ); CREATE INDEX IF NOT EXISTS idx_task_history_status ON task_history(status); CREATE INDEX IF NOT EXISTS idx_task_history_created ON task_history(created_at); CREATE TABLE IF NOT EXISTS file_metadata ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT NOT NULL, directory TEXT NOT NULL, size INTEGER, action TEXT NOT NULL, user TEXT, timestamp TEXT NOT NULL, task_id TEXT ); CREATE INDEX IF NOT EXISTS idx_file_metadata_timestamp ON file_metadata(timestamp); CREATE TABLE IF NOT EXISTS file_relations ( id INTEGER PRIMARY KEY AUTOINCREMENT, input_image TEXT, output_excel TEXT, result_purchase TEXT, status TEXT DEFAULT 'pending', created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_file_relations_input ON file_relations(input_image); CREATE INDEX IF NOT EXISTS idx_file_relations_output ON file_relations(output_excel); CREATE INDEX IF NOT EXISTS idx_file_relations_result ON file_relations(result_purchase); """) conn.commit() finally: conn.close() def cleanup_old_records(): """Delete records older than 30 days from all tables.""" cutoff = (datetime.now() - timedelta(days=30)).isoformat() conn = sqlite3.connect(_db_path) try: conn.execute("DELETE FROM http_logs WHERE timestamp < ?", (cutoff,)) conn.execute("DELETE FROM task_history WHERE created_at < ?", (cutoff,)) conn.execute("DELETE FROM file_metadata WHERE timestamp < ?", (cutoff,)) conn.execute("DELETE FROM file_relations WHERE updated_at < ?", (cutoff,)) conn.commit() finally: conn.close() # --------------------------------------------------------------------------- # Insert functions # --------------------------------------------------------------------------- def insert_http_log(method: str, path: str, status_code: int = None, duration_ms: float = None, user: str = None, ip: str = None, detail: str = None): """Insert an HTTP log record.""" conn = sqlite3.connect(_db_path) try: conn.execute( "INSERT INTO http_logs (timestamp, method, path, status_code, duration_ms, user, ip, detail) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (datetime.now().isoformat(), method, path, status_code, duration_ms, user, ip, detail), ) conn.commit() finally: conn.close() def insert_task(task_id: str, name: str, status: str = "pending", progress: int = 0, message: str = None, result_files: str = None, error: str = None, log_lines: str = None): """Insert a new task record.""" now = datetime.now().isoformat() conn = sqlite3.connect(_db_path) try: conn.execute( "INSERT INTO task_history (id, name, status, progress, message, " "result_files, error, log_lines, created_at, updated_at) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", (task_id, name, status, progress, message, result_files, error, log_lines, now, now), ) conn.commit() finally: conn.close() def update_task(task_id: str, **kwargs): """Update specific fields of a task record. Allowed fields: name, status, progress, message, result_files, error, log_lines, completed_at. """ allowed = { "name", "status", "progress", "message", "result_files", "error", "log_lines", "completed_at", } fields = {k: v for k, v in kwargs.items() if k in allowed} if not fields: return fields["updated_at"] = datetime.now().isoformat() set_clause = ", ".join(f"{k} = ?" for k in fields) values = list(fields.values()) + [task_id] conn = sqlite3.connect(_db_path) try: conn.execute( f"UPDATE task_history SET {set_clause} WHERE id = ?", values, ) conn.commit() finally: conn.close() def insert_file_metadata(filename: str, directory: str, action: str, size: int = None, user: str = None, task_id: str = None): """Insert a file operation record.""" conn = sqlite3.connect(_db_path) try: conn.execute( "INSERT INTO file_metadata (filename, directory, size, action, user, timestamp, task_id) " "VALUES (?, ?, ?, ?, ?, ?, ?)", (filename, directory, size, action, user, datetime.now().isoformat(), task_id), ) conn.commit() finally: conn.close() # --------------------------------------------------------------------------- # Query functions — HTTP logs # --------------------------------------------------------------------------- def query_http_logs(method: str = None, path: str = None, status_code: int = None, start_time: str = None, end_time: str = None, limit: int = 50, offset: int = 0) -> list[dict]: """Query HTTP logs with optional filters and pagination.""" conn = sqlite3.connect(_db_path) conn.row_factory = sqlite3.Row try: clauses = [] params = [] if method: clauses.append("method = ?") params.append(method) if path: clauses.append("path LIKE ?") params.append(f"%{path}%") if status_code is not None: clauses.append("status_code = ?") params.append(status_code) if start_time: clauses.append("timestamp >= ?") params.append(start_time) if end_time: clauses.append("timestamp <= ?") params.append(end_time) where = (" WHERE " + " AND ".join(clauses)) if clauses else "" params.extend([limit, offset]) rows = conn.execute( f"SELECT * FROM http_logs{where} ORDER BY id DESC LIMIT ? OFFSET ?", params, ).fetchall() return [dict(r) for r in rows] finally: conn.close() def query_http_log_stats() -> dict: """Get HTTP log statistics for today.""" today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).isoformat() conn = sqlite3.connect(_db_path) conn.row_factory = sqlite3.Row try: row = conn.execute( "SELECT " " COUNT(*) as total, " " SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) as errors, " " AVG(duration_ms) as avg_duration " "FROM http_logs WHERE timestamp >= ?", (today_start,), ).fetchone() return dict(row) if row else {"total": 0, "errors": 0, "avg_duration": 0} finally: conn.close() # --------------------------------------------------------------------------- # Query functions — Task history # --------------------------------------------------------------------------- def query_task_history(status: str = None, name: str = None, start_time: str = None, end_time: str = None, limit: int = 50, offset: int = 0) -> list[dict]: """Query task history with optional filters and pagination. Returns list of dicts with result_files and log_lines parsed from JSON. """ conn = sqlite3.connect(_db_path) conn.row_factory = sqlite3.Row try: clauses = [] params = [] if status: clauses.append("status = ?") params.append(status) if name: clauses.append("name LIKE ?") params.append(f"%{name}%") if start_time: clauses.append("created_at >= ?") params.append(start_time) if end_time: clauses.append("created_at <= ?") params.append(end_time) where = (" WHERE " + " AND ".join(clauses)) if clauses else "" params.extend([limit, offset]) rows = conn.execute( f"SELECT * FROM task_history{where} ORDER BY created_at DESC LIMIT ? OFFSET ?", params, ).fetchall() results = [] for r in rows: d = dict(r) d["result_files"] = _parse_json_field(d.get("result_files")) d["log_lines"] = _parse_json_field(d.get("log_lines")) results.append(d) return results finally: conn.close() def query_task_by_id(task_id: str) -> dict | None: """Get a single task by ID, with JSON fields parsed.""" conn = sqlite3.connect(_db_path) conn.row_factory = sqlite3.Row try: row = conn.execute( "SELECT * FROM task_history WHERE id = ?", (task_id,), ).fetchone() if not row: return None d = dict(row) d["result_files"] = _parse_json_field(d.get("result_files")) d["log_lines"] = _parse_json_field(d.get("log_lines")) return d finally: conn.close() def query_task_stats() -> dict: """Get task statistics: counts by status and total.""" conn = sqlite3.connect(_db_path) conn.row_factory = sqlite3.Row try: rows = conn.execute( "SELECT status, COUNT(*) as count FROM task_history GROUP BY status" ).fetchall() stats = {r["status"]: r["count"] for r in rows} stats["total"] = sum(stats.values()) return stats finally: conn.close() def delete_task(task_id: str) -> bool: """Delete a single task by ID. Returns True if deleted.""" conn = sqlite3.connect(_db_path) try: cur = conn.execute("DELETE FROM task_history WHERE id = ?", (task_id,)) conn.commit() return cur.rowcount > 0 finally: conn.close() def clear_task_history() -> int: """Delete all task history records. Returns number of deleted rows.""" conn = sqlite3.connect(_db_path) try: cur = conn.execute("DELETE FROM task_history") conn.commit() return cur.rowcount finally: conn.close() # --------------------------------------------------------------------------- # Query functions — File metadata # --------------------------------------------------------------------------- def query_file_history(filename: str = None, directory: str = None, action: str = None, task_id: str = None, start_time: str = None, end_time: str = None, limit: int = 50, offset: int = 0) -> list[dict]: """Query file operation history with optional filters and pagination.""" conn = sqlite3.connect(_db_path) conn.row_factory = sqlite3.Row try: clauses = [] params = [] if filename: clauses.append("filename LIKE ?") params.append(f"%{filename}%") if directory: clauses.append("directory = ?") params.append(directory) if action: clauses.append("action = ?") params.append(action) if task_id: clauses.append("task_id = ?") params.append(task_id) if start_time: clauses.append("timestamp >= ?") params.append(start_time) if end_time: clauses.append("timestamp <= ?") params.append(end_time) where = (" WHERE " + " AND ".join(clauses)) if clauses else "" params.extend([limit, offset]) rows = conn.execute( f"SELECT * FROM file_metadata{where} ORDER BY id DESC LIMIT ? OFFSET ?", params, ).fetchall() return [dict(r) for r in rows] finally: conn.close() def query_file_stats() -> list[dict]: """Get file storage statistics by scanning actual directories. Returns a list of dicts with keys: directory, file_count, total_size. Scans data/input/, data/output/, data/result/ relative to project root. """ project_root = Path(__file__).resolve().parent.parent.parent.parent dirs_to_scan = ["data/input", "data/output", "data/result"] stats = [] for rel_dir in dirs_to_scan: dir_path = project_root / rel_dir if not dir_path.exists(): stats.append({"directory": rel_dir, "file_count": 0, "total_size": 0}) continue file_count = 0 total_size = 0 for f in dir_path.rglob("*"): if f.is_file(): file_count += 1 total_size += f.stat().st_size stats.append({ "directory": rel_dir, "file_count": file_count, "total_size": total_size, }) return stats # --------------------------------------------------------------------------- # File relations — CRUD # --------------------------------------------------------------------------- def upsert_file_relation(input_image: str = None, output_excel: str = None, result_purchase: str = None, status: str = 'pending'): """Insert or update a file relation. Match strategy: - If input_image provided, try to find existing row by input_image - Else if output_excel provided, try to find by output_excel - Otherwise insert new row. """ now = datetime.now().isoformat() conn = sqlite3.connect(_db_path) conn.row_factory = sqlite3.Row try: existing = None if input_image: existing = conn.execute( "SELECT * FROM file_relations WHERE input_image = ?", (input_image,) ).fetchone() if not existing and output_excel: existing = conn.execute( "SELECT * FROM file_relations WHERE output_excel = ?", (output_excel,) ).fetchone() if not existing and result_purchase: existing = conn.execute( "SELECT * FROM file_relations WHERE result_purchase = ?", (result_purchase,) ).fetchone() if existing: updates = [] params = [] if input_image and not existing['input_image']: updates.append("input_image = ?") params.append(input_image) if output_excel and not existing['output_excel']: updates.append("output_excel = ?") params.append(output_excel) if result_purchase and not existing['result_purchase']: updates.append("result_purchase = ?") params.append(result_purchase) if status: updates.append("status = ?") params.append(status) updates.append("updated_at = ?") params.append(now) params.append(existing['id']) conn.execute( f"UPDATE file_relations SET {', '.join(updates)} WHERE id = ?", params, ) else: conn.execute( "INSERT INTO file_relations (input_image, output_excel, result_purchase, status, created_at, updated_at) " "VALUES (?, ?, ?, ?, ?, ?)", (input_image, output_excel, result_purchase, status, now, now), ) conn.commit() finally: conn.close() def query_file_relations(view: str = None, status: str = None, page: int = 1, page_size: int = 50, sort_by: str = None, sort_order: str = "desc", exists_only: bool = True) -> tuple[list[dict], int]: """Query file relations with optional view filter and pagination. view='orders': only rows with result_purchase, sorted by result_purchase view='tables': only rows with output_excel, sorted by output_excel view='images': only rows with input_image, sorted by input_image view=None: all rows exists_only=True: for a given view, only return rows where the primary file still exists on disk (input_image for images, output_excel for tables, result_purchase for orders) Returns (items, total). """ project_root = Path(__file__).resolve().parent.parent.parent.parent conn = sqlite3.connect(_db_path) conn.row_factory = sqlite3.Row try: clauses = [] params = [] if view == 'orders': clauses.append("result_purchase IS NOT NULL") order_by = "result_purchase DESC" elif view == 'tables': clauses.append("output_excel IS NOT NULL") order_by = "output_excel DESC" elif view == 'images': clauses.append("input_image IS NOT NULL") order_by = "input_image DESC" else: order_by = "id DESC" if status: clauses.append("status = ?") params.append(status) where = (" WHERE " + " AND ".join(clauses)) if clauses else "" # Sort if sort_by and sort_by in ('created_at', 'updated_at', 'input_image', 'output_excel', 'result_purchase', 'status'): sort_col = sort_by else: sort_col = order_by.split()[0] if order_by else 'id' sort_dir = 'DESC' if sort_order.lower() == 'desc' else 'ASC' # Fetch all matching rows (existence filter happens in Python) rows = conn.execute( f"SELECT * FROM file_relations{where} ORDER BY {sort_col} {sort_dir}", params, ).fetchall() items = [] for r in rows: d = dict(r) # Check file existence if d.get('input_image'): d['input_exists'] = (project_root / 'data' / 'input' / d['input_image']).exists() else: d['input_exists'] = False if d.get('output_excel'): d['output_exists'] = (project_root / 'data' / 'output' / d['output_excel']).exists() else: d['output_exists'] = False if d.get('result_purchase'): d['result_exists'] = (project_root / 'data' / 'result' / d['result_purchase']).exists() else: d['result_exists'] = False # Filter: when exists_only is True, only keep rows whose primary file exists if exists_only: if view == 'images' and not d['input_exists']: continue if view == 'tables' and not d['output_exists']: continue if view == 'orders' and not d['result_exists']: continue items.append(d) total = len(items) # Page (Python-side after existence filtering) start = (page - 1) * page_size items = items[start:start + page_size] return items, total finally: conn.close() def delete_file_relations(ids: list[int]): """Delete file relation records by IDs.""" if not ids: return conn = sqlite3.connect(_db_path) try: placeholders = ','.join('?' * len(ids)) conn.execute(f"DELETE FROM file_relations WHERE id IN ({placeholders})", ids) conn.commit() finally: conn.close() def sync_file_relations(): """Scan input/output/result directories and rebuild file_relations table. Matches files by stem: - input: {stem}.jpg/.png/.bmp - output: {stem}.xlsx or {stem}.xls - result: 采购单_{stem}.xls """ project_root = Path(__file__).resolve().parent.parent.parent.parent input_dir = project_root / 'data' / 'input' output_dir = project_root / 'data' / 'output' result_dir = project_root / 'data' / 'result' image_exts = {'.jpg', '.jpeg', '.png', '.bmp'} excel_exts = {'.xls', '.xlsx'} # Collect files by stem input_files = {} # stem -> filename if input_dir.exists(): for f in input_dir.iterdir(): if f.is_file() and f.suffix.lower() in image_exts: input_files[f.stem] = f.name output_files = {} if output_dir.exists(): for f in output_dir.iterdir(): if f.is_file() and f.suffix.lower() in excel_exts: output_files[f.stem] = f.name result_files = {} if result_dir.exists(): for f in result_dir.iterdir(): if f.is_file() and f.suffix.lower() in excel_exts: name = f.name # Strip 采购单_ prefix for matching if name.startswith('采购单_'): stem = name[len('采购单_'):-len(f.suffix)] elif name.startswith('合并采购单_'): continue # Skip merged files else: stem = f.stem result_files[stem] = name # Build relations all_stems = set(input_files.keys()) | set(output_files.keys()) | set(result_files.keys()) now = datetime.now().isoformat() conn = sqlite3.connect(_db_path) try: # Clear existing and rebuild conn.execute("DELETE FROM file_relations") for stem in sorted(all_stems): inp = input_files.get(stem) out = output_files.get(stem) res = result_files.get(stem) if res: status = 'done' elif out: status = 'ocr_done' else: status = 'pending' conn.execute( "INSERT INTO file_relations (input_image, output_excel, result_purchase, status, created_at, updated_at) " "VALUES (?, ?, ?, ?, ?, ?)", (inp, out, res, status, now, now), ) conn.commit() finally: conn.close() def reset_file_cache(files: list[dict]) -> dict: """Delete output/result files and reset relation status to pending. Each item: {input_image?, output_excel?, result_purchase?} Deletes the corresponding files from disk and resets status. """ project_root = Path(__file__).resolve().parent.parent.parent.parent output_dir = project_root / 'data' / 'output' result_dir = project_root / 'data' / 'result' deleted_files = 0 reset_count = 0 errors = [] conn = sqlite3.connect(_db_path) conn.row_factory = sqlite3.Row try: for item in files: input_image = item.get('input_image') output_excel = item.get('output_excel') result_purchase = item.get('result_purchase') # Delete output file from disk if output_excel: out_path = output_dir / output_excel if out_path.exists(): try: out_path.unlink() deleted_files += 1 except Exception as e: errors.append(f"{output_excel}: {e}") # Delete result file from disk if result_purchase: res_path = result_dir / result_purchase if res_path.exists(): try: res_path.unlink() deleted_files += 1 except Exception as e: errors.append(f"{result_purchase}: {e}") # Reset relation status to pending if input_image: conn.execute( "UPDATE file_relations SET output_excel = NULL, result_purchase = NULL, " "status = 'pending', updated_at = ? WHERE input_image = ?", (datetime.now().isoformat(), input_image), ) reset_count += conn.total_changes elif output_excel: conn.execute( "UPDATE file_relations SET output_excel = NULL, result_purchase = NULL, " "status = 'pending', updated_at = ? WHERE output_excel = ?", (datetime.now().isoformat(), output_excel), ) reset_count += conn.total_changes conn.commit() finally: conn.close() return {"deleted_files": deleted_files, "reset_relations": reset_count, "errors": errors} def query_file_relations_stats() -> dict: """Get detailed file statistics for Dashboard. Returns dict with: - input_images: count of image files in input/ - output_excel: count of excel files in output/ - unprocessed_images: images without corresponding output - unprocessed_excel: excel without corresponding result - completed_results: purchase order files in result/ - total_processed: relations with status done/merged """ project_root = Path(__file__).resolve().parent.parent.parent.parent input_dir = project_root / 'data' / 'input' output_dir = project_root / 'data' / 'output' result_dir = project_root / 'data' / 'result' image_exts = {'.jpg', '.jpeg', '.png', '.bmp'} excel_exts = {'.xls', '.xlsx'} # Count files input_images = 0 input_stems = set() if input_dir.exists(): for f in input_dir.iterdir(): if f.is_file() and f.suffix.lower() in image_exts: input_images += 1 input_stems.add(f.stem) output_excel = 0 output_stems = set() if output_dir.exists(): for f in output_dir.iterdir(): if f.is_file() and f.suffix.lower() in excel_exts: output_excel += 1 output_stems.add(f.stem) completed_results = 0 result_stems = set() if result_dir.exists(): for f in result_dir.iterdir(): if f.is_file() and f.suffix.lower() in excel_exts: if f.name.startswith('采购单_'): completed_results += 1 stem = f.name[len('采购单_'):-len(f.suffix)] result_stems.add(stem) unprocessed_images = len(input_stems - output_stems) unprocessed_excel = len(output_stems - result_stems) # Count from relations table conn = sqlite3.connect(_db_path) try: row = conn.execute( "SELECT COUNT(*) FROM file_relations WHERE status IN ('done', 'merged')" ).fetchone() total_processed = row[0] if row else 0 finally: conn.close() return { 'input_images': input_images, 'output_excel': output_excel, 'unprocessed_images': unprocessed_images, 'unprocessed_excel': unprocessed_excel, 'completed_results': completed_results, 'total_processed': total_processed, } # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _parse_json_field(value): """Parse a JSON string to a Python list; return empty list on failure.""" if not value: return [] try: return json.loads(value) except (json.JSONDecodeError, TypeError): return []