diff --git a/web/backend/routers/processing.py b/web/backend/routers/processing.py index dcb3988..5bbf390 100644 --- a/web/backend/routers/processing.py +++ b/web/backend/routers/processing.py @@ -234,6 +234,7 @@ async def ocr_batch( """Run OCR on all images in input/.""" tm = _get_task_manager(request) task = tm.create_task("批量OCR识别") + task.metadata = {"endpoint": "/api/processing/ocr-batch", "body": {}} image_exts = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'} files = _list_input_files(filter_ext=list(image_exts)) @@ -296,6 +297,7 @@ async def process_excel( """Convert OCR output Excel files to standardized purchase orders.""" tm = _get_task_manager(request) task = tm.create_task("Excel标准化处理") + task.metadata = {"endpoint": "/api/processing/excel", "body": body.dict()} excel_exts = {'.xls', '.xlsx'} if body.files: @@ -354,6 +356,7 @@ async def merge_orders( """Merge selected purchase order files into one PosPal template.""" tm = _get_task_manager(request) task = tm.create_task("合并采购单") + task.metadata = {"endpoint": "/api/processing/merge", "body": body.dict()} # If specific files provided, use them; otherwise merge all if body.filenames: @@ -399,6 +402,7 @@ async def full_pipeline( """Run the full pipeline: OCR -> Excel -> Result (NO merge).""" tm = _get_task_manager(request) task = tm.create_task("一键全流程处理") + task.metadata = {"endpoint": "/api/processing/pipeline", "body": body.dict()} async def _bg(): def do_work(): @@ -501,6 +505,7 @@ async def ocr_single( """OCR a single image file.""" tm = _get_task_manager(request) task = tm.create_task(f"OCR: {body.filename}") + task.metadata = {"endpoint": "/api/processing/ocr-single", "body": body.dict()} file_path = _input_dir / body.filename if not file_path.is_file(): @@ -544,6 +549,7 @@ async def excel_single( """Process a single Excel file to purchase order.""" tm = _get_task_manager(request) task = tm.create_task(f"Excel处理: {body.filename}") + task.metadata = {"endpoint": "/api/processing/excel-single", "body": body.dict()} file_path = _output_dir / body.filename if not file_path.is_file(): @@ -582,6 +588,7 @@ async def pipeline_single( """Full pipeline for a single image: OCR -> Excel -> Result (no merge).""" tm = _get_task_manager(request) task = tm.create_task(f"全流程: {body.filename}") + task.metadata = {"endpoint": "/api/processing/pipeline-single", "body": body.dict()} file_path = _input_dir / body.filename if not file_path.is_file(): @@ -659,6 +666,7 @@ async def merge_batch( """Merge selected purchase order files into one PosPal template.""" tm = _get_task_manager(request) task = tm.create_task("批量合并采购单") + task.metadata = {"endpoint": "/api/processing/merge-batch", "body": body.dict()} file_paths = [_result_dir / f for f in body.filenames if (_result_dir / f).is_file()] if not file_paths: diff --git a/web/backend/routers/tasks.py b/web/backend/routers/tasks.py index 19b151a..2a0e732 100644 --- a/web/backend/routers/tasks.py +++ b/web/backend/routers/tasks.py @@ -121,7 +121,34 @@ async def retry_task( """Retry a failed task by re-invoking its processing endpoint. Only tasks with status ``failed`` may be retried. + For in-memory tasks with metadata, the original endpoint and request body + are used to faithfully reproduce the original call. For historical DB-only + tasks, the endpoint is looked up from ``_RETRY_ROUTE_MAP`` by task name. """ + tm = request.state.task_manager + + # --- Strategy 1: in-memory task with metadata --- + new_task = tm.retry_task(task_id) + if new_task is not None: + meta = new_task.metadata or {} + endpoint = meta.get("endpoint") + body = meta.get("body", {}) + if endpoint: + base_url = f"http://{request.url.hostname}:{request.url.port}" + url = f"{base_url}{endpoint}" + auth_header = request.headers.get("authorization") + headers: dict[str, str] = {} + if auth_header: + headers["authorization"] = auth_header + async with httpx.AsyncClient() as client: + resp = await client.post(url, json=body, headers=headers) + return {"task_id": new_task.id, "status": "retried", "original_response": resp.json()} + + # Metadata present but no endpoint — fall through to DB strategy + # (the new task was already created; caller can track it) + return {"task_id": new_task.id, "status": "retried"} + + # --- Strategy 2: DB-only historical task (no in-memory record) --- loop = asyncio.get_event_loop() task = await loop.run_in_executor( None, lambda: db_schema.query_task_by_id(task_id), @@ -142,18 +169,18 @@ async def retry_task( detail=f"未知的任务类型: {task_name}", ) - # Build the internal URL to the processing endpoint. + # Create a new in-memory task to track the retry. + new_task = tm.create_task(task_name) + base_url = f"http://{request.url.hostname}:{request.url.port}" url = f"{base_url}{endpoint}" - # Forward the Authorization header so the processing endpoint can - # authenticate the request. auth_header = request.headers.get("authorization") - headers: dict[str, str] = {} + headers = {} if auth_header: headers["authorization"] = auth_header async with httpx.AsyncClient() as client: resp = await client.post(url, headers=headers) - return resp.json() + return {"task_id": new_task.id, "status": "retried", "original_response": resp.json()} diff --git a/web/backend/services/task_manager.py b/web/backend/services/task_manager.py index 80187e0..58007fa 100644 --- a/web/backend/services/task_manager.py +++ b/web/backend/services/task_manager.py @@ -28,9 +28,10 @@ class Task: result_files: List[str] = field(default_factory=list) error: Optional[str] = None log_lines: List[str] = field(default_factory=list) + metadata: Optional[dict] = None def to_dict(self) -> dict: - return { + d = { "task_id": self.id, "name": self.name, "status": self.status.value, @@ -40,6 +41,9 @@ class Task: "error": self.error, "log_lines": self.log_lines[-100:], } + if self.metadata: + d["metadata"] = self.metadata + return d class TaskManager: @@ -135,6 +139,21 @@ class TaskManager: ) self._schedule(self._broadcast(task_id)) + def retry_task(self, task_id: str) -> Optional[Task]: + """Create a new task to retry a failed task with its original parameters. + + Returns the new task if the original was failed and retryable, else None. + The caller is responsible for dispatching the actual work based on + ``new_task.metadata``. + """ + original = self._tasks.get(task_id) + if not original or original.status != TaskStatus.FAILED: + return None + new_task = self.create_task(original.name) + if original.metadata: + new_task.metadata = dict(original.metadata) + return new_task + def set_failed(self, task_id: str, error: str): task = self._tasks.get(task_id) if not task: diff --git a/web/frontend/src/stores/processing.ts b/web/frontend/src/stores/processing.ts index 5fc8dd5..da75443 100644 --- a/web/frontend/src/stores/processing.ts +++ b/web/frontend/src/stores/processing.ts @@ -1,5 +1,5 @@ import { defineStore } from 'pinia' -import { ref } from 'vue' +import { ref, computed } from 'vue' import api from '../api' export interface TaskInfo { @@ -13,44 +13,64 @@ export interface TaskInfo { log_lines: string[] } +interface TaskConnection { + ws: WebSocket | null + reconnectAttempts: number + reconnectTimer: ReturnType | null +} + export const useProcessingStore = defineStore('processing', () => { - const currentTask = ref(null) + // --- Multi-task tracking --- + const activeTasks = ref(new Map()) + + const activeTaskList = computed(() => + Array.from(activeTasks.value.values()) + ) + + const currentTask = computed(() => + activeTaskList.value[0] ?? null + ) + + // --- Legacy compatibility --- const tasks = ref([]) const logs = ref([]) const taskSource = ref('') - let ws: WebSocket | null = null - let reconnectAttempts = 0 - let reconnectTimer: ReturnType | null = null - let currentTaskId: string | null = null + // --- Per-task WebSocket management --- + const taskConnections = new Map() const MAX_RECONNECT = 5 function connectWebSocket(taskId: string) { - disconnectWebSocket() - currentTaskId = taskId - reconnectAttempts = 0 + disconnectTaskWS(taskId) + taskConnections.set(taskId, { ws: null, reconnectAttempts: 0, reconnectTimer: null }) doConnect(taskId) } function doConnect(taskId: string) { + const conn = taskConnections.get(taskId) + if (!conn) return + const token = localStorage.getItem('token') const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:' const host = window.location.host const url = `${protocol}//${host}/ws/task/${taskId}?token=${token}` - ws = new WebSocket(url) + const socket = new WebSocket(url) + conn.ws = socket - ws.onopen = () => { - reconnectAttempts = 0 + socket.onopen = () => { + conn.reconnectAttempts = 0 } - ws.onmessage = (event) => { + socket.onmessage = (event) => { try { const data = JSON.parse(event.data) - if (data.error) return // ignore error messages from ws - currentTask.value = data - logs.value = data.log_lines || [] + if (data.error) return + // Update activeTasks map + activeTasks.value.set(data.task_id, data) + + // Legacy: update tasks list const idx = tasks.value.findIndex(t => t.task_id === data.task_id) if (idx >= 0) { tasks.value[idx] = data @@ -58,30 +78,33 @@ export const useProcessingStore = defineStore('processing', () => { tasks.value.unshift(data) } + // Legacy: update logs for the current (most recent) task + if (currentTask.value?.task_id === data.task_id) { + logs.value = data.log_lines || [] + } + if (data.status === 'completed' || data.status === 'failed') { - setTimeout(() => disconnectWebSocket(), 2000) + setTimeout(() => disconnectTaskWS(data.task_id), 2000) } } catch {} } - ws.onerror = () => { + socket.onerror = () => { // Error will be followed by onclose, which handles reconnection } - ws.onclose = (event) => { - ws = null - // Auto-reconnect if task is still running and not manually disconnected - const task = currentTask.value + socket.onclose = () => { + conn.ws = null + const task = activeTasks.value.get(taskId) if ( - currentTaskId === taskId && task && (task.status === 'pending' || task.status === 'running') && - reconnectAttempts < MAX_RECONNECT + conn.reconnectAttempts < MAX_RECONNECT ) { - const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 10000) - reconnectAttempts++ - reconnectTimer = setTimeout(() => { - if (currentTaskId === taskId) { + const delay = Math.min(1000 * Math.pow(2, conn.reconnectAttempts), 10000) + conn.reconnectAttempts++ + conn.reconnectTimer = setTimeout(() => { + if (taskConnections.has(taskId)) { doConnect(taskId) } }, delay) @@ -89,24 +112,58 @@ export const useProcessingStore = defineStore('processing', () => { } } + function disconnectTaskWS(taskId: string) { + const conn = taskConnections.get(taskId) + if (!conn) return + conn.reconnectAttempts = MAX_RECONNECT // prevent reconnect + if (conn.reconnectTimer) { + clearTimeout(conn.reconnectTimer) + conn.reconnectTimer = null + } + if (conn.ws) { + conn.ws.close() + conn.ws = null + } + taskConnections.delete(taskId) + } + + /** Disconnect all task WebSockets (backward compat) */ function disconnectWebSocket() { - currentTaskId = null - reconnectAttempts = MAX_RECONNECT // prevent reconnect - if (reconnectTimer) { - clearTimeout(reconnectTimer) - reconnectTimer = null + for (const taskId of Array.from(taskConnections.keys())) { + disconnectTaskWS(taskId) } - if (ws) { - ws.close() - ws = null + } + + function removeTask(taskId: string) { + disconnectTaskWS(taskId) + activeTasks.value.delete(taskId) + const idx = tasks.value.findIndex(t => t.task_id === taskId) + if (idx >= 0) tasks.value.splice(idx, 1) + } + + async function retryTask(taskId: string) { + const res = await api.post(`/api/tasks/${taskId}/retry`) + const newTaskId: string = res.data.task_id + const taskInfo: TaskInfo = { + task_id: newTaskId, + name: res.data.message || '', + status: 'pending', + progress: 0, + message: '', + result_files: [], + error: null, + log_lines: [], } + activeTasks.value.set(newTaskId, taskInfo) + connectWebSocket(newTaskId) + return newTaskId } async function startTask(endpoint: string, body?: any, source: string = 'processing') { const res = await api.post(endpoint, body || {}) const taskId = res.data.task_id taskSource.value = source - currentTask.value = { + const taskInfo: TaskInfo = { task_id: taskId, name: res.data.message || '', status: 'pending', @@ -116,10 +173,23 @@ export const useProcessingStore = defineStore('processing', () => { error: null, log_lines: [], } + activeTasks.value.set(taskId, taskInfo) logs.value = [] connectWebSocket(taskId) return taskId } - return { currentTask, tasks, logs, taskSource, connectWebSocket, disconnectWebSocket, startTask } + return { + activeTasks, + activeTaskList, + currentTask, + tasks, + logs, + taskSource, + connectWebSocket, + disconnectWebSocket, + startTask, + removeTask, + retryTask, + } }) diff --git a/web/frontend/src/views/Dashboard.vue b/web/frontend/src/views/Dashboard.vue index 959c6ff..d608948 100644 --- a/web/frontend/src/views/Dashboard.vue +++ b/web/frontend/src/views/Dashboard.vue @@ -35,27 +35,34 @@
- +

处理进度

- - {{ statusText }} + + {{ visibleTasks.length }} 个任务
-
-
-
-
+
+
+
+ {{ task.name }} + {{ statusLabel(task.status) }} +
+ +
{{ task.message }}
+ + + +
+ 重试 + 关闭 +
+ +
+
{{ log }}
-
-
- {{ currentTask.progress }}% - {{ currentTask.message }}
@@ -71,7 +78,10 @@

处理日志

- 清空 +
+ 查看全部日志 + 清空 +
@@ -207,41 +217,10 @@ const detailedStats = ref({ total_processed: 0, }) -const currentTask = computed(() => { - if (ps.taskSource !== 'sync') return ps.currentTask - return null -}) -const logs = computed(() => ps.logs.slice(0, 10)) - -const statusType = computed(() => { - const m: Record = { - pending: 'info', - running: 'warning', - completed: 'success', - failed: 'danger', - } - return m[currentTask.value?.status || ''] || 'info' -}) - -const statusColor = computed(() => { - const m: Record = { - pending: '#a1a1aa', - running: '#f97316', - completed: '#22c55e', - failed: '#ef4444', - } - return m[currentTask.value?.status || ''] || '#a1a1aa' -}) - -const statusText = computed(() => { - const m: Record = { - pending: '等待中', - running: '运行中', - completed: '已完成', - failed: '已失败', - } - return m[currentTask.value?.status || ''] || '' -}) +const visibleTasks = computed(() => + ps.taskSource !== 'sync' ? ps.activeTaskList : [] +) +const logs = computed(() => ps.logs.slice(0, 50)) const stats = computed(() => [ { @@ -290,6 +269,29 @@ function clearLogs(): void { ps.logs.splice(0) } +function statusTagType(status: string): string { + const map: Record = { pending: 'info', running: '', completed: 'success', failed: 'danger' } + return map[status] || 'info' +} + +function statusLabel(status: string): string { + const map: Record = { pending: '等待中', running: '运行中', completed: '已完成', failed: '失败' } + return map[status] || status +} + +async function handleRetry(taskId: string): Promise { + try { + await ps.retryTask(taskId) + ElMessage.success('已重新提交任务') + } catch { + ElMessage.error('重试失败') + } +} + +function handleDismiss(taskId: string): void { + ps.removeTask(taskId) +} + async function refreshStats(): Promise { statsLoading.value = true try { @@ -400,11 +402,11 @@ const runPipeline = () => doAction('/processing/pipeline') const runOcr = () => doAction('/processing/ocr-batch') const runExcel = () => doAction('/processing/excel') -// Auto-refresh stats when task completes +// Auto-refresh stats when any task completes or fails watch( - () => currentTask.value?.status, - (status) => { - if (status === 'completed' || status === 'failed') { + () => visibleTasks.value.map(t => t.status), + (statuses) => { + if (statuses.some(s => s === 'completed' || s === 'failed')) { refreshStats() } } @@ -693,6 +695,75 @@ onMounted(() => { text-overflow: ellipsis; } +/* ── Task cards ── */ +.task-cards { + display: flex; + flex-direction: column; + gap: 10px; +} + +.task-card-item { + border: 1px solid var(--border-light); + border-radius: var(--radius-sm); + padding: 14px 16px; + background: #fafafa; + transition: border-color 0.15s ease; +} + +.task-card-item:hover { + border-color: #d4d4d8; +} + +.task-card-header { + display: flex; + align-items: center; + justify-content: space-between; + margin-bottom: 10px; +} + +.task-name { + font-size: 13px; + font-weight: 600; + color: var(--text-primary); + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +.task-message { + font-size: 12px; + color: var(--text-muted); + margin-top: 8px; +} + +.task-error { + margin-top: 8px; +} + +.task-card-actions { + display: flex; + gap: 8px; + margin-top: 10px; +} + +.task-logs { + margin-top: 10px; + max-height: 200px; + overflow-y: auto; + background: #09090b; + border-radius: var(--radius-sm); + padding: 10px 12px; + font-family: var(--font-mono); + font-size: 11px; + line-height: 1.6; +} + +.task-logs .log-line { + color: #a1a1aa; + padding: 0; + word-break: break-all; +} + /* ── Progress area ── */ .progress-card { display: flex; diff --git a/web/frontend/src/views/Layout.vue b/web/frontend/src/views/Layout.vue index 42f0f09..5afff32 100644 --- a/web/frontend/src/views/Layout.vue +++ b/web/frontend/src/views/Layout.vue @@ -1,6 +1,6 @@