"use strict"; var __importDefault = (this && this.__importDefault) || function (mod) { return (mod && mod.__esModule) ? mod : { "default": mod }; }; Object.defineProperty(exports, "__esModule", { value: true }); exports.XiaoYiWebSocketManager = void 0; const ws_1 = __importDefault(require("ws")); const events_1 = require("events"); const url_1 = require("url"); const auth_1 = require("./auth"); const types_1 = require("./types"); class XiaoYiWebSocketManager extends events_1.EventEmitter { constructor(config) { super(); // ==================== Dual WebSocket Connections ==================== this.ws1 = null; this.ws2 = null; // ==================== Dual Server States ==================== this.state1 = { connected: false, ready: false, lastHeartbeat: 0, reconnectAttempts: 0 }; this.state2 = { connected: false, ready: false, lastHeartbeat: 0, reconnectAttempts: 0 }; // ==================== Session → Server Mapping ==================== this.sessionServerMap = new Map(); // ==================== Session Cleanup State ==================== // Track sessions that are pending cleanup (user cleared context but task still running) this.sessionCleanupStateMap = new Map(); // ==================== Active Tasks ==================== this.activeTasks = new Map(); // Resolve configuration with defaults and backward compatibility this.config = this.resolveConfig(config); this.auth = new auth_1.XiaoYiAuth(this.config.ak, this.config.sk, this.config.agentId); console.log(`[WS Manager] Initialized with dual server:`); console.log(` Server 1: ${this.config.wsUrl1}`); console.log(` Server 2: ${this.config.wsUrl2}`); } /** * Check if URL is wss + IP format (skip certificate verification) */ isWssWithIp(urlString) { try { const url = new url_1.URL(urlString); // Check if protocol is wss if (url.protocol !== 'wss:') { return false; } const hostname = url.hostname; // Check for IPv4 address (e.g., 192.168.1.1) const ipv4Regex = /^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/; if (ipv4Regex.test(hostname)) { // Validate each octet is 0-255 const octets = hostname.split('.'); return octets.every(octet => { const num = parseInt(octet, 10); return num >= 0 && num <= 255; }); } // Check for IPv6 address (e.g., [::1] or 2001:db8::1) // IPv6 in URL might be wrapped in brackets const ipv6Regex = /^[\[::0-9a-fA-F]+$/; const ipv6WithoutBrackets = hostname.replace(/[\[\]]/g, ''); // Simple check for IPv6: contains colons and valid hex characters if (hostname.includes('[') && hostname.includes(']')) { return ipv6Regex.test(hostname); } // Check for plain IPv6 format if (hostname.includes(':')) { const ipv6RegexPlain = /^([0-9a-fA-F]{0,4}:){2,7}[0-9a-fA-F]{0,4}$/; return ipv6RegexPlain.test(ipv6WithoutBrackets); } return false; } catch (error) { console.warn(`[WS Manager] Invalid URL format: ${urlString}`); return false; } } /** * Resolve configuration with defaults and backward compatibility */ resolveConfig(userConfig) { // Backward compatibility: if wsUrl is provided but wsUrl1/wsUrl2 are not, // use wsUrl for server1 and default for server2 let wsUrl1 = userConfig.wsUrl1; let wsUrl2 = userConfig.wsUrl2; if (!wsUrl1 && userConfig.wsUrl) { wsUrl1 = userConfig.wsUrl; } // Apply defaults if not provided if (!wsUrl1) { console.warn(`[WS Manager] wsUrl1 not provided, using default: ${types_1.DEFAULT_WS_URL_1}`); wsUrl1 = types_1.DEFAULT_WS_URL_1; } if (!wsUrl2) { console.warn(`[WS Manager] wsUrl2 not provided, using default: ${types_1.DEFAULT_WS_URL_2}`); wsUrl2 = types_1.DEFAULT_WS_URL_2; } return { wsUrl1, wsUrl2, agentId: userConfig.agentId, ak: userConfig.ak, sk: userConfig.sk, enableStreaming: userConfig.enableStreaming ?? true, sessionCleanupTimeoutMs: userConfig.sessionCleanupTimeoutMs ?? XiaoYiWebSocketManager.DEFAULT_CLEANUP_TIMEOUT_MS, }; } /** * Connect to both WebSocket servers */ async connect() { console.log("[WS Manager] Connecting to both servers..."); const results = await Promise.allSettled([ this.connectToServer1(), this.connectToServer2(), ]); // Check if at least one connection succeeded const server1Success = results[0].status === 'fulfilled'; const server2Success = results[1].status === 'fulfilled'; if (!server1Success && !server2Success) { console.error("[WS Manager] Failed to connect to both servers"); throw new Error("Failed to connect to both servers"); } console.log(`[WS Manager] Connection results: Server1=${server1Success}, Server2=${server2Success}`); // Start application-level heartbeat (only if at least one connection is ready) if (this.state1.connected || this.state2.connected) { this.startAppHeartbeat(); } } /** * Connect to server 1 */ async connectToServer1() { console.log(`[Server1] Connecting to ${this.config.wsUrl1}...`); try { const authHeaders = this.auth.generateAuthHeaders(); // Check if URL is wss + IP format, skip certificate verification const skipCertVerify = this.isWssWithIp(this.config.wsUrl1); if (skipCertVerify) { console.log(`[Server1] WSS + IP detected, skipping certificate verification`); } this.ws1 = new ws_1.default(this.config.wsUrl1, { headers: authHeaders, rejectUnauthorized: !skipCertVerify, }); this.setupWebSocketHandlers(this.ws1, 'server1'); await new Promise((resolve, reject) => { const timeout = setTimeout(() => reject(new Error("Connection timeout")), 30000); this.ws1.once("open", () => { clearTimeout(timeout); resolve(); }); this.ws1.once("error", (error) => { clearTimeout(timeout); reject(error); }); }); this.state1.connected = true; this.state1.ready = true; console.log(`[Server1] Connected successfully`); this.emit("connected", "server1"); // Schedule connection stability check before resetting reconnect counter this.scheduleStableConnectionCheck('server1'); // Send init message this.sendInitMessage(this.ws1, 'server1'); // Start protocol heartbeat this.startProtocolHeartbeat('server1'); } catch (error) { console.error(`[Server1] Connection failed:`, error); this.state1.connected = false; this.state1.ready = false; this.emit("error", { serverId: 'server1', error }); throw error; } } /** * Connect to server 2 */ async connectToServer2() { console.log(`[Server2] Connecting to ${this.config.wsUrl2}...`); try { const authHeaders = this.auth.generateAuthHeaders(); // Check if URL is wss + IP format, skip certificate verification const skipCertVerify = this.isWssWithIp(this.config.wsUrl2); if (skipCertVerify) { console.log(`[Server2] WSS + IP detected, skipping certificate verification`); } this.ws2 = new ws_1.default(this.config.wsUrl2, { headers: authHeaders, rejectUnauthorized: !skipCertVerify, }); this.setupWebSocketHandlers(this.ws2, 'server2'); await new Promise((resolve, reject) => { const timeout = setTimeout(() => reject(new Error("Connection timeout")), 30000); this.ws2.once("open", () => { clearTimeout(timeout); resolve(); }); this.ws2.once("error", (error) => { clearTimeout(timeout); reject(error); }); }); this.state2.connected = true; this.state2.ready = true; console.log(`[Server2] Connected successfully`); this.emit("connected", "server2"); // Schedule connection stability check before resetting reconnect counter this.scheduleStableConnectionCheck('server2'); // Send init message this.sendInitMessage(this.ws2, 'server2'); // Start protocol heartbeat this.startProtocolHeartbeat('server2'); } catch (error) { console.error(`[Server2] Connection failed:`, error); this.state2.connected = false; this.state2.ready = false; this.emit("error", { serverId: 'server2', error }); throw error; } } /** * Disconnect from all servers */ disconnect() { console.log("[WS Manager] Disconnecting from all servers..."); this.clearTimers(); if (this.ws1) { this.ws1.close(); this.ws1 = null; } if (this.ws2) { this.ws2.close(); this.ws2 = null; } this.state1.connected = false; this.state1.ready = false; this.state2.connected = false; this.state2.ready = false; this.sessionServerMap.clear(); this.activeTasks.clear(); // Cleanup session cleanup state map for (const [sessionId, state] of this.sessionCleanupStateMap.entries()) { if (state.cleanupTimeoutId) { clearTimeout(state.cleanupTimeoutId); } } this.sessionCleanupStateMap.clear(); this.emit("disconnected"); } /** * Send init message to specific server */ sendInitMessage(ws, serverId) { const initMessage = { msgType: "clawd_bot_init", agentId: this.config.agentId, }; try { ws.send(JSON.stringify(initMessage)); console.log(`[${serverId}] Sent clawd_bot_init message`); } catch (error) { console.error(`[${serverId}] Failed to send init message:`, error); } } /** * Setup WebSocket event handlers for specific server */ setupWebSocketHandlers(ws, serverId) { ws.on("open", () => { console.log(`[${serverId}] WebSocket opened`); }); ws.on("message", (data) => { this.handleIncomingMessage(data, serverId); }); ws.on("close", (code, reason) => { console.log(`[${serverId}] WebSocket closed: ${code} ${reason.toString()}`); // Clear stable connection timer - connection was not stable this.clearStableConnectionCheck(serverId); if (serverId === 'server1') { this.state1.connected = false; this.state1.ready = false; this.clearProtocolHeartbeat('server1'); } else { this.state2.connected = false; this.state2.ready = false; this.clearProtocolHeartbeat('server2'); } this.emit("disconnected", serverId); this.scheduleReconnect(serverId); }); ws.on("error", (error) => { console.error(`[${serverId}] WebSocket error:`, error); this.emit("error", { serverId, error }); }); ws.on("pong", () => { if (serverId === 'server1') { this.state1.lastHeartbeat = Date.now(); } else { this.state2.lastHeartbeat = Date.now(); } }); } /** * Extract sessionId from message based on method type * Different methods have sessionId in different locations: * - message/stream: sessionId in params, fallback to top-level sessionId * - tasks/cancel: sessionId at top level * - clearContext: sessionId at top level */ extractSessionId(message) { // For message/stream, prioritize params.sessionId, fallback to top-level sessionId if (message.method === "message/stream") { return message.params?.sessionId || message.sessionId; } // For tasks/cancel and clearContext, sessionId is at top level if (message.method === "tasks/cancel" || message.method === "clearContext" || message.action === "clear") { return message.sessionId; } return undefined; } /** * Handle incoming message from specific server */ handleIncomingMessage(data, sourceServer) { try { const message = JSON.parse(data.toString()); // Log received message console.log("\n" + "=".repeat(80)); console.log(`[${sourceServer}] Received message:`); console.log(JSON.stringify(message, null, 2)); console.log("=".repeat(80) + "\n"); // Validate agentId if (message.agentId && message.agentId !== this.config.agentId) { console.warn(`[${sourceServer}] Mismatched agentId: ${message.agentId}, expected: ${this.config.agentId}. Discarding.`); return; } // Extract sessionId based on method type const sessionId = this.extractSessionId(message); // Record session → server mapping if (sessionId) { this.sessionServerMap.set(sessionId, sourceServer); console.log(`[MAP] Session ${sessionId} -> ${sourceServer}`); } // Handle special messages (clearContext, tasks/cancel) if (message.method === "clearContext") { this.handleClearContext(message, sourceServer); return; } if (message.action === "clear") { this.handleClearMessage(message, sourceServer); return; } if (message.method === "tasks/cancel" || message.action === "tasks/cancel") { this.handleTasksCancelMessage(message, sourceServer); return; } // Handle regular A2A request if (this.isA2ARequestMessage(message)) { // Store task for potential cancellation (support params.sessionId or top-level sessionId) const sessionId = message.params?.sessionId || message.sessionId; this.activeTasks.set(message.id, { sessionId: sessionId, timestamp: Date.now(), }); // Emit with server info this.emit("message", message); } else { console.warn(`[${sourceServer}] Unknown message format`); } } catch (error) { console.error(`[${sourceServer}] Failed to parse message:`, error); this.emit("error", { serverId: sourceServer, error }); } } /** * Send A2A response message with automatic routing */ async sendResponse(response, taskId, sessionId, isFinal = true, append = true) { // Check if session is pending cleanup const cleanupState = this.sessionCleanupStateMap.get(sessionId); if (cleanupState) { // Session is pending cleanup, silently discard response console.log(`[RESPONSE] Discarding response for pending cleanup session ${sessionId}`); return; } // Find which server this session belongs to const targetServer = this.sessionServerMap.get(sessionId); if (!targetServer) { console.error(`[ROUTE] Unknown server for session ${sessionId}`); throw new Error(`Cannot route response: unknown session ${sessionId}`); } // Get the corresponding WebSocket connection const ws = targetServer === 'server1' ? this.ws1 : this.ws2; const state = targetServer === 'server1' ? this.state1 : this.state2; if (!ws || ws.readyState !== ws_1.default.OPEN) { console.error(`[ROUTE] ${targetServer} not connected for session ${sessionId}`); throw new Error(`${targetServer} is not available`); } // Convert to JSON-RPC format const jsonRpcResponse = this.convertToJsonRpcFormat(response, taskId, isFinal, append); const message = { msgType: "agent_response", agentId: this.config.agentId, sessionId: sessionId, taskId: taskId, msgDetail: JSON.stringify(jsonRpcResponse), }; try { ws.send(JSON.stringify(message)); console.log(`[ROUTE] Response sent to ${targetServer} for session ${sessionId} (isFinal=${isFinal}, append=${append})`); } catch (error) { console.error(`[ROUTE] Failed to send to ${targetServer}:`, error); throw error; } } /** * Send clear context response to specific server */ async sendClearContextResponse(requestId, sessionId, success = true, targetServer) { const serverId = targetServer || this.sessionServerMap.get(sessionId); if (!serverId) { console.error(`[CLEAR] Unknown server for session ${sessionId}`); throw new Error(`Cannot send clear response: unknown session ${sessionId}`); } const ws = serverId === 'server1' ? this.ws1 : this.ws2; if (!ws || ws.readyState !== ws_1.default.OPEN) { console.error(`[CLEAR] ${serverId} not connected`); throw new Error(`${serverId} is not available`); } const jsonRpcResponse = { jsonrpc: "2.0", id: requestId, result: { status: { state: success ? "cleared" : "failed" } }, }; const message = { msgType: "agent_response", agentId: this.config.agentId, sessionId: sessionId, taskId: requestId, msgDetail: JSON.stringify(jsonRpcResponse), }; console.log(`\n[CLEAR] Sending clearContext response to ${serverId}:`); console.log(` sessionId: ${sessionId}`); console.log(` requestId: ${requestId}`); console.log(` success: ${success}\n`); try { ws.send(JSON.stringify(message)); } catch (error) { console.error(`[CLEAR] Failed to send to ${serverId}:`, error); throw error; } } /** * Send status update (for intermediate status messages, e.g., timeout warnings) * This uses "status-update" event type which keeps the conversation active */ async sendStatusUpdate(taskId, sessionId, message, targetServer) { // Check if session is pending cleanup const cleanupState = this.sessionCleanupStateMap.get(sessionId); if (cleanupState) { // Session is pending cleanup, silently discard status updates console.log(`[STATUS] Discarding status update for pending cleanup session ${sessionId}`); return; } const serverId = targetServer || this.sessionServerMap.get(sessionId); if (!serverId) { console.error(`[STATUS] Unknown server for session ${sessionId}`); throw new Error(`Cannot send status update: unknown session ${sessionId}`); } const ws = serverId === 'server1' ? this.ws1 : this.ws2; if (!ws || ws.readyState !== ws_1.default.OPEN) { console.error(`[STATUS] ${serverId} not connected`); throw new Error(`${serverId} is not available`); } // Create unique ID for this status update const messageId = `status_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; const jsonRpcResponse = { jsonrpc: "2.0", id: messageId, result: { taskId: taskId, kind: "status-update", final: false, // IMPORTANT: Not final, keeps conversation active status: { message: { role: "agent", parts: [ { kind: "text", text: message, }, ], }, state: "working", // Indicates task is still being processed }, }, }; const outboundMessage = { msgType: "agent_response", agentId: this.config.agentId, sessionId: sessionId, taskId: taskId, msgDetail: JSON.stringify(jsonRpcResponse), }; console.log(`[STATUS] Sending status update to ${serverId}:`); console.log(` sessionId: ${sessionId}`); console.log(` taskId: ${taskId}`); console.log(` message: ${message}`); console.log(` final: false, state: working\n`); try { ws.send(JSON.stringify(outboundMessage)); } catch (error) { console.error(`[STATUS] Failed to send to ${serverId}:`, error); throw error; } } /** * Send PUSH message (主动推送) via HTTP API * * This is used when SubAgent completes execution and needs to push results to user * independently of the original A2A request-response flow. * * Unlike sendResponse (which responds to a specific request via WebSocket), push messages are * sent through HTTP API asynchronously. * * @param sessionId - User's session ID * @param message - Message content to push * * Reference: 华为小艺推送消息 API * TODO: 实现实际的推送消息发送逻辑 */ async sendPushMessage(sessionId, message) { console.log(`[PUSH] Would send push message to session ${sessionId}, length: ${message.length} chars`); console.log(`[PUSH] Content: ${message.substring(0, 50)}${message.length > 50 ? "..." : ""}`); // TODO: Implement actual push message sending via HTTP API // Need to confirm correct push message format with XiaoYi API documentation } /** * Send tasks cancel response to specific server */ async sendTasksCancelResponse(requestId, sessionId, success = true, targetServer) { const serverId = targetServer || this.sessionServerMap.get(sessionId); if (!serverId) { console.error(`[CANCEL] Unknown server for session ${sessionId}`); throw new Error(`Cannot send cancel response: unknown session ${sessionId}`); } const ws = serverId === 'server1' ? this.ws1 : this.ws2; if (!ws || ws.readyState !== ws_1.default.OPEN) { console.error(`[CANCEL] ${serverId} not connected`); throw new Error(`${serverId} is not available`); } const jsonRpcResponse = { jsonrpc: "2.0", id: requestId, result: { id: requestId, status: { state: success ? "canceled" : "failed" } }, }; const message = { msgType: "agent_response", agentId: this.config.agentId, sessionId: sessionId, taskId: requestId, msgDetail: JSON.stringify(jsonRpcResponse), }; try { ws.send(JSON.stringify(message)); } catch (error) { console.error(`[CANCEL] Failed to send to ${serverId}:`, error); throw error; } } /** * Handle clearContext method */ handleClearContext(message, sourceServer) { const sessionId = this.extractSessionId(message); if (!sessionId) { console.error(`[${sourceServer}] Failed to extract sessionId from clearContext message`); return; } console.log(`[${sourceServer}] Received clearContext for session: ${sessionId}`); this.sendClearContextResponse(message.id, sessionId, true, sourceServer) .catch(error => console.error(`[${sourceServer}] Failed to send clearContext response:`, error)); this.emit("clear", { sessionId: sessionId, id: message.id, serverId: sourceServer, }); // Mark session for cleanup instead of immediate deletion this.markSessionForCleanup(sessionId, sourceServer, this.config.sessionCleanupTimeoutMs ?? XiaoYiWebSocketManager.DEFAULT_CLEANUP_TIMEOUT_MS); } /** * Handle clear message (legacy format) */ handleClearMessage(message, sourceServer) { console.log(`[${sourceServer}] Received clear message for session: ${message.sessionId}`); this.sendClearContextResponse(message.id, message.sessionId, true, sourceServer) .catch(error => console.error(`[${sourceServer}] Failed to send clear response:`, error)); this.emit("clear", { sessionId: message.sessionId, id: message.id, serverId: sourceServer, }); // Mark session for cleanup instead of immediate deletion this.markSessionForCleanup(message.sessionId, sourceServer, this.config.sessionCleanupTimeoutMs ?? XiaoYiWebSocketManager.DEFAULT_CLEANUP_TIMEOUT_MS); } /** * Handle tasks/cancel message */ handleTasksCancelMessage(message, sourceServer) { const sessionId = this.extractSessionId(message); if (!sessionId) { console.error(`[${sourceServer}] Failed to extract sessionId from tasks/cancel message`); return; } const effectiveTaskId = message.taskId || message.id; console.log("\n" + "=".repeat(60)); console.log(`[${sourceServer}] Received cancel request`); console.log(` Session: ${sessionId}`); console.log(` Task ID: ${effectiveTaskId}`); console.log("=".repeat(60) + "\n"); this.sendTasksCancelResponse(message.id, sessionId, true, sourceServer) .catch(error => console.error(`[${sourceServer}] Failed to send cancel response:`, error)); this.emit("cancel", { sessionId: sessionId, taskId: effectiveTaskId, id: message.id, serverId: sourceServer, }); this.activeTasks.delete(effectiveTaskId); } /** * Convert A2AResponseMessage to JSON-RPC 2.0 format */ convertToJsonRpcFormat(response, taskId, isFinal = true, append = true) { const artifactId = `artifact_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; if (response.status === "error" && response.error) { return { jsonrpc: "2.0", id: response.messageId, error: { code: response.error.code, message: response.error.message, }, }; } const parts = []; if (response.content.type === "text" && response.content.text) { // When isFinal=true, use empty string for text (no content needed for final chunk) const textContent = isFinal ? "" : response.content.text; parts.push({ kind: "text", text: textContent, }); } else if (response.content.type === "file") { parts.push({ kind: "file", file: { name: response.content.fileName || "file", mimeType: response.content.mimeType || "application/octet-stream", uri: response.content.mediaUrl, }, }); } // When isFinal=true, append should be true and text should be empty const artifactEvent = { taskId: taskId, kind: "artifact-update", append: isFinal ? true : append, lastChunk: isFinal, final: isFinal, artifact: { artifactId: artifactId, parts: parts, }, }; return { jsonrpc: "2.0", id: response.messageId, result: artifactEvent, }; } /** * Check if at least one server is ready */ isReady() { return (this.state1.ready && this.ws1?.readyState === ws_1.default.OPEN) || (this.state2.ready && this.ws2?.readyState === ws_1.default.OPEN); } /** * Get combined connection state */ getState() { const connected = this.state1.connected || this.state2.connected; const authenticated = connected; // Auth via headers return { connected, authenticated, lastHeartbeat: Math.max(this.state1.lastHeartbeat, this.state2.lastHeartbeat), lastAppHeartbeat: 0, reconnectAttempts: Math.max(this.state1.reconnectAttempts, this.state2.reconnectAttempts), maxReconnectAttempts: 50, }; } /** * Get individual server states */ getServerStates() { return { server1: { ...this.state1 }, server2: { ...this.state2 }, }; } /** * Start protocol-level heartbeat for specific server */ startProtocolHeartbeat(serverId) { const interval = setInterval(() => { const ws = serverId === 'server1' ? this.ws1 : this.ws2; const state = serverId === 'server1' ? this.state1 : this.state2; if (ws && ws.readyState === ws_1.default.OPEN) { ws.ping(); const now = Date.now(); if (state.lastHeartbeat > 0 && now - state.lastHeartbeat > 90000) { console.warn(`[${serverId}] Heartbeat timeout, reconnecting...`); ws.close(); } } }, 30000); if (serverId === 'server1') { this.heartbeatTimeout1 = interval; } else { this.heartbeatTimeout2 = interval; } } /** * Clear protocol heartbeat for specific server */ clearProtocolHeartbeat(serverId) { const interval = serverId === 'server1' ? this.heartbeatTimeout1 : this.heartbeatTimeout2; if (interval) { clearInterval(interval); if (serverId === 'server1') { this.heartbeatTimeout1 = undefined; } else { this.heartbeatTimeout2 = undefined; } } } /** * Start application-level heartbeat (shared across both servers) */ startAppHeartbeat() { this.appHeartbeatInterval = setInterval(() => { const heartbeatMessage = { msgType: "heartbeat", agentId: this.config.agentId, }; // Send to all connected servers if (this.ws1?.readyState === ws_1.default.OPEN) { try { this.ws1.send(JSON.stringify(heartbeatMessage)); } catch (error) { console.error('[Server1] Failed to send app heartbeat:', error); } } if (this.ws2?.readyState === ws_1.default.OPEN) { try { this.ws2.send(JSON.stringify(heartbeatMessage)); } catch (error) { console.error('[Server2] Failed to send app heartbeat:', error); } } }, 20000); } /** * Schedule reconnection for specific server */ scheduleReconnect(serverId) { const state = serverId === 'server1' ? this.state1 : this.state2; if (state.reconnectAttempts >= 50) { console.error(`[${serverId}] Max reconnection attempts reached`); this.emit("maxReconnectAttemptsReached", serverId); return; } const delay = Math.min(2000 * Math.pow(2, state.reconnectAttempts), 60000); state.reconnectAttempts++; console.log(`[${serverId}] Scheduling reconnect attempt ${state.reconnectAttempts}/50 in ${delay}ms`); const timeout = setTimeout(async () => { try { if (serverId === 'server1') { await this.connectToServer1(); } else { await this.connectToServer2(); } console.log(`[${serverId}] Reconnected successfully`); } catch (error) { console.error(`[${serverId}] Reconnection failed:`, error); this.scheduleReconnect(serverId); } }, delay); if (serverId === 'server1') { this.reconnectTimeout1 = timeout; } else { this.reconnectTimeout2 = timeout; } } /** * Clear all timers */ clearTimers() { if (this.heartbeatTimeout1) { clearInterval(this.heartbeatTimeout1); this.heartbeatTimeout1 = undefined; } if (this.heartbeatTimeout2) { clearInterval(this.heartbeatTimeout2); this.heartbeatTimeout2 = undefined; } if (this.appHeartbeatInterval) { clearInterval(this.appHeartbeatInterval); this.appHeartbeatInterval = undefined; } if (this.reconnectTimeout1) { clearTimeout(this.reconnectTimeout1); this.reconnectTimeout1 = undefined; } if (this.reconnectTimeout2) { clearTimeout(this.reconnectTimeout2); this.reconnectTimeout2 = undefined; } // Clear stable connection timers this.clearStableConnectionCheck('server1'); this.clearStableConnectionCheck('server2'); } /** * Schedule a connection stability check * Only reset reconnect counter after connection has been stable for threshold time */ scheduleStableConnectionCheck(serverId) { const timer = setTimeout(() => { const state = serverId === 'server1' ? this.state1 : this.state2; if (state.connected) { console.log(`[${serverId}] Connection stable for ${XiaoYiWebSocketManager.STABLE_CONNECTION_THRESHOLD}ms, resetting reconnect counter`); state.reconnectAttempts = 0; } }, XiaoYiWebSocketManager.STABLE_CONNECTION_THRESHOLD); if (serverId === 'server1') { this.stableConnectionTimer1 = timer; } else { this.stableConnectionTimer2 = timer; } } /** * Clear the connection stability check timer */ clearStableConnectionCheck(serverId) { const timer = serverId === 'server1' ? this.stableConnectionTimer1 : this.stableConnectionTimer2; if (timer) { clearTimeout(timer); if (serverId === 'server1') { this.stableConnectionTimer1 = undefined; } else { this.stableConnectionTimer2 = undefined; } } } /** * Type guard for A2A request messages * sessionId can be in params OR at top level (fallback) */ isA2ARequestMessage(data) { return data && typeof data.agentId === "string" && data.jsonrpc === "2.0" && typeof data.id === "string" && data.method === "message/stream" && data.params && typeof data.params.id === "string" && // sessionId can be in params OR at top level (typeof data.params.sessionId === "string" || typeof data.sessionId === "string") && data.params.message && typeof data.params.message.role === "string" && Array.isArray(data.params.message.parts); } /** * Get active tasks */ getActiveTasks() { return new Map(this.activeTasks); } /** * Remove task from active tasks */ removeActiveTask(taskId) { this.activeTasks.delete(taskId); } /** * Get server for a specific session */ getServerForSession(sessionId) { return this.sessionServerMap.get(sessionId); } /** * Remove session mapping */ removeSession(sessionId) { this.sessionServerMap.delete(sessionId); } /** * Mark a session for delayed cleanup * @param sessionId The session ID to mark for cleanup * @param serverId The server ID associated with this session * @param timeoutMs Timeout in milliseconds before forcing cleanup */ markSessionForCleanup(sessionId, serverId, timeoutMs) { // Check if already marked const existingState = this.sessionCleanupStateMap.get(sessionId); if (existingState) { // Already pending cleanup, reset timeout if (existingState.cleanupTimeoutId) { clearTimeout(existingState.cleanupTimeoutId); } console.log(`[CLEANUP] Session ${sessionId} already pending cleanup, resetting timeout`); } // Create new cleanup state const newState = { sessionId, serverId, markedForCleanupAt: Date.now(), reason: 'user_cleared', }; // Start cleanup timeout const timeoutId = setTimeout(() => { console.log(`[CLEANUP] Timeout reached for session ${sessionId}, forcing cleanup`); this.forceCleanupSession(sessionId); }, timeoutMs); newState.cleanupTimeoutId = timeoutId; this.sessionCleanupStateMap.set(sessionId, newState); console.log(`[CLEANUP] Session ${sessionId} marked for cleanup (timeout: ${timeoutMs}ms)`); } /** * Force cleanup a session immediately * @param sessionId The session ID to cleanup */ forceCleanupSession(sessionId) { // Check if already cleaned const state = this.sessionCleanupStateMap.get(sessionId); if (!state) { console.log(`[CLEANUP] Session ${sessionId} already cleaned up, skipping`); return; } // Clear timeout if (state.cleanupTimeoutId) { clearTimeout(state.cleanupTimeoutId); } // Remove from both maps this.sessionServerMap.delete(sessionId); this.sessionCleanupStateMap.delete(sessionId); console.log(`[CLEANUP] Session ${sessionId} cleanup completed`); } /** * Check if a session is pending cleanup * @param sessionId The session ID to check * @returns True if session is pending cleanup */ isSessionPendingCleanup(sessionId) { return this.sessionCleanupStateMap.has(sessionId); } /** * Get cleanup state for a session * @param sessionId The session ID to check * @returns Cleanup state if exists, undefined otherwise */ getSessionCleanupState(sessionId) { return this.sessionCleanupStateMap.get(sessionId); } /** * Update accumulated text for a pending cleanup session * @param sessionId The session ID * @param text The accumulated text */ updateAccumulatedTextForCleanup(sessionId, text) { const state = this.sessionCleanupStateMap.get(sessionId); if (state) { state.accumulatedText = text; } } } exports.XiaoYiWebSocketManager = XiaoYiWebSocketManager; XiaoYiWebSocketManager.DEFAULT_CLEANUP_TIMEOUT_MS = 60 * 60 * 1000; // 1 hour XiaoYiWebSocketManager.STABLE_CONNECTION_THRESHOLD = 10000; // 10 seconds