feat: add db_schema for http_logs, task_history, file_metadata tables

This commit is contained in:
2026-05-05 11:29:41 +08:00
parent 71c0ba9c96
commit 280b94ae1d
+409
View File
@@ -0,0 +1,409 @@
"""SQLite schema management and query functions for web backend.
Tables:
- http_logs: HTTP request/response logging
- task_history: Background task tracking
- file_metadata: File operation records
All functions are synchronous; the async db_pool.DBPool wraps them via run_in_executor.
"""
import json
import os
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
_db_path = Path(__file__).resolve().parent.parent.parent.parent / "data" / "web_data.db"
def _ensure_db_dir():
"""Create the parent directory for the database if it doesn't exist."""
_db_path.parent.mkdir(parents=True, exist_ok=True)
def init_db():
"""Create tables and indexes if they don't exist."""
_ensure_db_dir()
conn = sqlite3.connect(_db_path)
try:
conn.executescript("""
CREATE TABLE IF NOT EXISTS http_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
method TEXT NOT NULL,
path TEXT NOT NULL,
status_code INTEGER,
duration_ms REAL,
user TEXT,
ip TEXT,
detail TEXT
);
CREATE INDEX IF NOT EXISTS idx_http_logs_timestamp ON http_logs(timestamp);
CREATE INDEX IF NOT EXISTS idx_http_logs_status ON http_logs(status_code);
CREATE TABLE IF NOT EXISTS task_history (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
status TEXT NOT NULL,
progress INTEGER DEFAULT 0,
message TEXT,
result_files TEXT,
error TEXT,
log_lines TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
completed_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_task_history_status ON task_history(status);
CREATE INDEX IF NOT EXISTS idx_task_history_created ON task_history(created_at);
CREATE TABLE IF NOT EXISTS file_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
directory TEXT NOT NULL,
size INTEGER,
action TEXT NOT NULL,
user TEXT,
timestamp TEXT NOT NULL,
task_id TEXT
);
CREATE INDEX IF NOT EXISTS idx_file_metadata_timestamp ON file_metadata(timestamp);
""")
conn.commit()
finally:
conn.close()
def cleanup_old_records():
"""Delete records older than 30 days from all tables."""
cutoff = (datetime.now() - timedelta(days=30)).isoformat()
conn = sqlite3.connect(_db_path)
try:
conn.execute("DELETE FROM http_logs WHERE timestamp < ?", (cutoff,))
conn.execute("DELETE FROM task_history WHERE created_at < ?", (cutoff,))
conn.execute("DELETE FROM file_metadata WHERE timestamp < ?", (cutoff,))
conn.commit()
finally:
conn.close()
# ---------------------------------------------------------------------------
# Insert functions
# ---------------------------------------------------------------------------
def insert_http_log(method: str, path: str, status_code: int = None,
duration_ms: float = None, user: str = None,
ip: str = None, detail: str = None):
"""Insert an HTTP log record."""
conn = sqlite3.connect(_db_path)
try:
conn.execute(
"INSERT INTO http_logs (timestamp, method, path, status_code, duration_ms, user, ip, detail) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(datetime.now().isoformat(), method, path, status_code,
duration_ms, user, ip, detail),
)
conn.commit()
finally:
conn.close()
def insert_task(task_id: str, name: str, status: str = "pending",
progress: int = 0, message: str = None,
result_files: str = None, error: str = None,
log_lines: str = None):
"""Insert a new task record."""
now = datetime.now().isoformat()
conn = sqlite3.connect(_db_path)
try:
conn.execute(
"INSERT INTO task_history (id, name, status, progress, message, "
"result_files, error, log_lines, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(task_id, name, status, progress, message,
result_files, error, log_lines, now, now),
)
conn.commit()
finally:
conn.close()
def update_task(task_id: str, **kwargs):
"""Update specific fields of a task record.
Allowed fields: name, status, progress, message, result_files,
error, log_lines, completed_at.
"""
allowed = {
"name", "status", "progress", "message",
"result_files", "error", "log_lines", "completed_at",
}
fields = {k: v for k, v in kwargs.items() if k in allowed}
if not fields:
return
fields["updated_at"] = datetime.now().isoformat()
set_clause = ", ".join(f"{k} = ?" for k in fields)
values = list(fields.values()) + [task_id]
conn = sqlite3.connect(_db_path)
try:
conn.execute(
f"UPDATE task_history SET {set_clause} WHERE id = ?",
values,
)
conn.commit()
finally:
conn.close()
def insert_file_metadata(filename: str, directory: str, action: str,
size: int = None, user: str = None,
task_id: str = None):
"""Insert a file operation record."""
conn = sqlite3.connect(_db_path)
try:
conn.execute(
"INSERT INTO file_metadata (filename, directory, size, action, user, timestamp, task_id) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(filename, directory, size, action, user,
datetime.now().isoformat(), task_id),
)
conn.commit()
finally:
conn.close()
# ---------------------------------------------------------------------------
# Query functions — HTTP logs
# ---------------------------------------------------------------------------
def query_http_logs(method: str = None, path: str = None,
status_code: int = None,
start_time: str = None, end_time: str = None,
limit: int = 50, offset: int = 0) -> list[dict]:
"""Query HTTP logs with optional filters and pagination."""
conn = sqlite3.connect(_db_path)
conn.row_factory = sqlite3.Row
try:
clauses = []
params = []
if method:
clauses.append("method = ?")
params.append(method)
if path:
clauses.append("path LIKE ?")
params.append(f"%{path}%")
if status_code is not None:
clauses.append("status_code = ?")
params.append(status_code)
if start_time:
clauses.append("timestamp >= ?")
params.append(start_time)
if end_time:
clauses.append("timestamp <= ?")
params.append(end_time)
where = (" WHERE " + " AND ".join(clauses)) if clauses else ""
params.extend([limit, offset])
rows = conn.execute(
f"SELECT * FROM http_logs{where} ORDER BY id DESC LIMIT ? OFFSET ?",
params,
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def query_http_log_stats() -> dict:
"""Get HTTP log statistics for today."""
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).isoformat()
conn = sqlite3.connect(_db_path)
conn.row_factory = sqlite3.Row
try:
row = conn.execute(
"SELECT "
" COUNT(*) as total, "
" SUM(CASE WHEN status_code >= 400 THEN 1 ELSE 0 END) as errors, "
" AVG(duration_ms) as avg_duration "
"FROM http_logs WHERE timestamp >= ?",
(today_start,),
).fetchone()
return dict(row) if row else {"total": 0, "errors": 0, "avg_duration": 0}
finally:
conn.close()
# ---------------------------------------------------------------------------
# Query functions — Task history
# ---------------------------------------------------------------------------
def query_task_history(status: str = None, name: str = None,
start_time: str = None, end_time: str = None,
limit: int = 50, offset: int = 0) -> list[dict]:
"""Query task history with optional filters and pagination.
Returns list of dicts with result_files and log_lines parsed from JSON.
"""
conn = sqlite3.connect(_db_path)
conn.row_factory = sqlite3.Row
try:
clauses = []
params = []
if status:
clauses.append("status = ?")
params.append(status)
if name:
clauses.append("name LIKE ?")
params.append(f"%{name}%")
if start_time:
clauses.append("created_at >= ?")
params.append(start_time)
if end_time:
clauses.append("created_at <= ?")
params.append(end_time)
where = (" WHERE " + " AND ".join(clauses)) if clauses else ""
params.extend([limit, offset])
rows = conn.execute(
f"SELECT * FROM task_history{where} ORDER BY created_at DESC LIMIT ? OFFSET ?",
params,
).fetchall()
results = []
for r in rows:
d = dict(r)
d["result_files"] = _parse_json_field(d.get("result_files"))
d["log_lines"] = _parse_json_field(d.get("log_lines"))
results.append(d)
return results
finally:
conn.close()
def query_task_by_id(task_id: str) -> dict | None:
"""Get a single task by ID, with JSON fields parsed."""
conn = sqlite3.connect(_db_path)
conn.row_factory = sqlite3.Row
try:
row = conn.execute(
"SELECT * FROM task_history WHERE id = ?",
(task_id,),
).fetchone()
if not row:
return None
d = dict(row)
d["result_files"] = _parse_json_field(d.get("result_files"))
d["log_lines"] = _parse_json_field(d.get("log_lines"))
return d
finally:
conn.close()
def query_task_stats() -> dict:
"""Get task statistics: counts by status and total."""
conn = sqlite3.connect(_db_path)
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"SELECT status, COUNT(*) as count FROM task_history GROUP BY status"
).fetchall()
stats = {r["status"]: r["count"] for r in rows}
stats["total"] = sum(stats.values())
return stats
finally:
conn.close()
# ---------------------------------------------------------------------------
# Query functions — File metadata
# ---------------------------------------------------------------------------
def query_file_history(filename: str = None, directory: str = None,
action: str = None, task_id: str = None,
start_time: str = None, end_time: str = None,
limit: int = 50, offset: int = 0) -> list[dict]:
"""Query file operation history with optional filters and pagination."""
conn = sqlite3.connect(_db_path)
conn.row_factory = sqlite3.Row
try:
clauses = []
params = []
if filename:
clauses.append("filename LIKE ?")
params.append(f"%{filename}%")
if directory:
clauses.append("directory = ?")
params.append(directory)
if action:
clauses.append("action = ?")
params.append(action)
if task_id:
clauses.append("task_id = ?")
params.append(task_id)
if start_time:
clauses.append("timestamp >= ?")
params.append(start_time)
if end_time:
clauses.append("timestamp <= ?")
params.append(end_time)
where = (" WHERE " + " AND ".join(clauses)) if clauses else ""
params.extend([limit, offset])
rows = conn.execute(
f"SELECT * FROM file_metadata{where} ORDER BY id DESC LIMIT ? OFFSET ?",
params,
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def query_file_stats() -> list[dict]:
"""Get file storage statistics by scanning actual directories.
Returns a list of dicts with keys: directory, file_count, total_size.
Scans data/input/, data/output/, data/result/ relative to project root.
"""
project_root = Path(__file__).resolve().parent.parent.parent.parent
dirs_to_scan = ["data/input", "data/output", "data/result"]
stats = []
for rel_dir in dirs_to_scan:
dir_path = project_root / rel_dir
if not dir_path.exists():
stats.append({"directory": rel_dir, "file_count": 0, "total_size": 0})
continue
file_count = 0
total_size = 0
for f in dir_path.rglob("*"):
if f.is_file():
file_count += 1
total_size += f.stat().st_size
stats.append({
"directory": rel_dir,
"file_count": file_count,
"total_size": total_size,
})
return stats
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parse_json_field(value):
"""Parse a JSON string to a Python list; return empty list on failure."""
if not value:
return []
try:
return json.loads(value)
except (json.JSONDecodeError, TypeError):
return []