diff --git a/web/backend/services/db_schema.py b/web/backend/services/db_schema.py new file mode 100644 index 0000000..6dcac13 --- /dev/null +++ b/web/backend/services/db_schema.py @@ -0,0 +1,409 @@ +"""SQLite schema management and query functions for web backend. + +Tables: +- http_logs: HTTP request/response logging +- task_history: Background task tracking +- file_metadata: File operation records + +All functions are synchronous; the async db_pool.DBPool wraps them via run_in_executor. +""" + +import json +import os +import sqlite3 +from datetime import datetime, timedelta +from pathlib import Path + +_db_path = Path(__file__).resolve().parent.parent.parent.parent / "data" / "web_data.db" + + +def _ensure_db_dir(): + """Create the parent directory for the database if it doesn't exist.""" + _db_path.parent.mkdir(parents=True, exist_ok=True) + + +def init_db(): + """Create tables and indexes if they don't exist.""" + _ensure_db_dir() + conn = sqlite3.connect(_db_path) + try: + conn.executescript(""" + CREATE TABLE IF NOT EXISTS http_logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + method TEXT NOT NULL, + path TEXT NOT NULL, + status_code INTEGER, + duration_ms REAL, + user TEXT, + ip TEXT, + detail TEXT + ); + CREATE INDEX IF NOT EXISTS idx_http_logs_timestamp ON http_logs(timestamp); + CREATE INDEX IF NOT EXISTS idx_http_logs_status ON http_logs(status_code); + + CREATE TABLE IF NOT EXISTS task_history ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + status TEXT NOT NULL, + progress INTEGER DEFAULT 0, + message TEXT, + result_files TEXT, + error TEXT, + log_lines TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + completed_at TEXT + ); + CREATE INDEX IF NOT EXISTS idx_task_history_status ON task_history(status); + CREATE INDEX IF NOT EXISTS idx_task_history_created ON task_history(created_at); + + CREATE TABLE IF NOT EXISTS file_metadata ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + filename TEXT NOT NULL, + directory TEXT NOT NULL, + size INTEGER, + action TEXT NOT NULL, + user TEXT, + timestamp TEXT NOT NULL, + task_id TEXT + ); + CREATE INDEX IF NOT EXISTS idx_file_metadata_timestamp ON file_metadata(timestamp); + """) + conn.commit() + finally: + conn.close() + + +def cleanup_old_records(): + """Delete records older than 30 days from all tables.""" + cutoff = (datetime.now() - timedelta(days=30)).isoformat() + conn = sqlite3.connect(_db_path) + try: + conn.execute("DELETE FROM http_logs WHERE timestamp < ?", (cutoff,)) + conn.execute("DELETE FROM task_history WHERE created_at < ?", (cutoff,)) + conn.execute("DELETE FROM file_metadata WHERE timestamp < ?", (cutoff,)) + conn.commit() + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Insert functions +# --------------------------------------------------------------------------- + +def insert_http_log(method: str, path: str, status_code: int = None, + duration_ms: float = None, user: str = None, + ip: str = None, detail: str = None): + """Insert an HTTP log record.""" + conn = sqlite3.connect(_db_path) + try: + conn.execute( + "INSERT INTO http_logs (timestamp, method, path, status_code, duration_ms, user, ip, detail) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (datetime.now().isoformat(), method, path, status_code, + duration_ms, user, ip, detail), + ) + conn.commit() + finally: + conn.close() + + +def insert_task(task_id: str, name: str, status: str = "pending", + progress: int = 0, message: str = None, + result_files: str = None, error: str = None, + log_lines: str = None): + """Insert a new task record.""" + now = datetime.now().isoformat() + conn = sqlite3.connect(_db_path) + try: + conn.execute( + "INSERT INTO task_history (id, name, status, progress, message, " + "result_files, error, log_lines, created_at, updated_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (task_id, name, status, progress, message, + result_files, error, log_lines, now, now), + ) + conn.commit() + finally: + conn.close() + + +def update_task(task_id: str, **kwargs): + """Update specific fields of a task record. + + Allowed fields: name, status, progress, message, result_files, + error, log_lines, completed_at. + """ + allowed = { + "name", "status", "progress", "message", + "result_files", "error", "log_lines", "completed_at", + } + fields = {k: v for k, v in kwargs.items() if k in allowed} + if not fields: + return + + fields["updated_at"] = datetime.now().isoformat() + set_clause = ", ".join(f"{k} = ?" for k in fields) + values = list(fields.values()) + [task_id] + + conn = sqlite3.connect(_db_path) + try: + conn.execute( + f"UPDATE task_history SET {set_clause} WHERE id = ?", + values, + ) + conn.commit() + finally: + conn.close() + + +def insert_file_metadata(filename: str, directory: str, action: str, + size: int = None, user: str = None, + task_id: str = None): + """Insert a file operation record.""" + conn = sqlite3.connect(_db_path) + try: + conn.execute( + "INSERT INTO file_metadata (filename, directory, size, action, user, timestamp, task_id) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (filename, directory, size, action, user, + datetime.now().isoformat(), task_id), + ) + conn.commit() + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Query functions — HTTP logs +# --------------------------------------------------------------------------- + +def query_http_logs(method: str = None, path: str = None, + status_code: int = None, + start_time: str = None, end_time: str = None, + limit: int = 50, offset: int = 0) -> list[dict]: + """Query HTTP logs with optional filters and pagination.""" + conn = sqlite3.connect(_db_path) + conn.row_factory = sqlite3.Row + try: + clauses = [] + params = [] + if method: + clauses.append("method = ?") + params.append(method) + if path: + clauses.append("path LIKE ?") + params.append(f"%{path}%") + if status_code is not None: + clauses.append("status_code = ?") + params.append(status_code) + if start_time: + clauses.append("timestamp >= ?") + params.append(start_time) + if end_time: + clauses.append("timestamp <= ?") + params.append(end_time) + + where = (" WHERE " + " AND ".join(clauses)) if clauses else "" + params.extend([limit, offset]) + + rows = conn.execute( + f"SELECT * FROM http_logs{where} ORDER BY id DESC LIMIT ? OFFSET ?", + params, + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + +def query_http_log_stats() -> dict: + """Get HTTP log statistics for today.""" + today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).isoformat() + conn = sqlite3.connect(_db_path) + conn.row_factory = sqlite3.Row + try: + row = conn.execute( + "SELECT " + " COUNT(*) as total, " + " SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) as errors, " + " AVG(duration_ms) as avg_duration " + "FROM http_logs WHERE timestamp >= ?", + (today_start,), + ).fetchone() + return dict(row) if row else {"total": 0, "errors": 0, "avg_duration": 0} + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Query functions — Task history +# --------------------------------------------------------------------------- + +def query_task_history(status: str = None, name: str = None, + start_time: str = None, end_time: str = None, + limit: int = 50, offset: int = 0) -> list[dict]: + """Query task history with optional filters and pagination. + + Returns list of dicts with result_files and log_lines parsed from JSON. + """ + conn = sqlite3.connect(_db_path) + conn.row_factory = sqlite3.Row + try: + clauses = [] + params = [] + if status: + clauses.append("status = ?") + params.append(status) + if name: + clauses.append("name LIKE ?") + params.append(f"%{name}%") + if start_time: + clauses.append("created_at >= ?") + params.append(start_time) + if end_time: + clauses.append("created_at <= ?") + params.append(end_time) + + where = (" WHERE " + " AND ".join(clauses)) if clauses else "" + params.extend([limit, offset]) + + rows = conn.execute( + f"SELECT * FROM task_history{where} ORDER BY created_at DESC LIMIT ? OFFSET ?", + params, + ).fetchall() + + results = [] + for r in rows: + d = dict(r) + d["result_files"] = _parse_json_field(d.get("result_files")) + d["log_lines"] = _parse_json_field(d.get("log_lines")) + results.append(d) + return results + finally: + conn.close() + + +def query_task_by_id(task_id: str) -> dict | None: + """Get a single task by ID, with JSON fields parsed.""" + conn = sqlite3.connect(_db_path) + conn.row_factory = sqlite3.Row + try: + row = conn.execute( + "SELECT * FROM task_history WHERE id = ?", + (task_id,), + ).fetchone() + if not row: + return None + d = dict(row) + d["result_files"] = _parse_json_field(d.get("result_files")) + d["log_lines"] = _parse_json_field(d.get("log_lines")) + return d + finally: + conn.close() + + +def query_task_stats() -> dict: + """Get task statistics: counts by status and total.""" + conn = sqlite3.connect(_db_path) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute( + "SELECT status, COUNT(*) as count FROM task_history GROUP BY status" + ).fetchall() + stats = {r["status"]: r["count"] for r in rows} + stats["total"] = sum(stats.values()) + return stats + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Query functions — File metadata +# --------------------------------------------------------------------------- + +def query_file_history(filename: str = None, directory: str = None, + action: str = None, task_id: str = None, + start_time: str = None, end_time: str = None, + limit: int = 50, offset: int = 0) -> list[dict]: + """Query file operation history with optional filters and pagination.""" + conn = sqlite3.connect(_db_path) + conn.row_factory = sqlite3.Row + try: + clauses = [] + params = [] + if filename: + clauses.append("filename LIKE ?") + params.append(f"%{filename}%") + if directory: + clauses.append("directory = ?") + params.append(directory) + if action: + clauses.append("action = ?") + params.append(action) + if task_id: + clauses.append("task_id = ?") + params.append(task_id) + if start_time: + clauses.append("timestamp >= ?") + params.append(start_time) + if end_time: + clauses.append("timestamp <= ?") + params.append(end_time) + + where = (" WHERE " + " AND ".join(clauses)) if clauses else "" + params.extend([limit, offset]) + + rows = conn.execute( + f"SELECT * FROM file_metadata{where} ORDER BY id DESC LIMIT ? OFFSET ?", + params, + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + +def query_file_stats() -> list[dict]: + """Get file storage statistics by scanning actual directories. + + Returns a list of dicts with keys: directory, file_count, total_size. + Scans data/input/, data/output/, data/result/ relative to project root. + """ + project_root = Path(__file__).resolve().parent.parent.parent.parent + dirs_to_scan = ["data/input", "data/output", "data/result"] + stats = [] + + for rel_dir in dirs_to_scan: + dir_path = project_root / rel_dir + if not dir_path.exists(): + stats.append({"directory": rel_dir, "file_count": 0, "total_size": 0}) + continue + + file_count = 0 + total_size = 0 + for f in dir_path.rglob("*"): + if f.is_file(): + file_count += 1 + total_size += f.stat().st_size + + stats.append({ + "directory": rel_dir, + "file_count": file_count, + "total_size": total_size, + }) + + return stats + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _parse_json_field(value): + """Parse a JSON string to a Python list; return empty list on failure.""" + if not value: + return [] + try: + return json.loads(value) + except (json.JSONDecodeError, TypeError): + return []