Files

144 lines
4.3 KiB
Python

import asyncio
import json
import logging
import uuid
import aiohttp
from app.services.tts import speak
import config
logger = logging.getLogger(__name__)
WSS_URL = "wss://openws.work.weixin.qq.com"
PING_INTERVAL = 30
async def _send(ws, cmd: str, body: dict | None = None, req_id: str | None = None):
if req_id is None:
req_id = uuid.uuid4().hex[:16]
msg = {"cmd": cmd, "headers": {"req_id": req_id}}
if body is not None:
msg["body"] = body
await ws.send_str(json.dumps(msg, ensure_ascii=False))
return req_id
async def _recv(ws) -> dict:
msg = await ws.receive()
if msg.type == aiohttp.WSMsgType.TEXT:
return json.loads(msg.data)
if msg.type == aiohttp.WSMsgType.CLOSED:
raise ConnectionError(f"WebSocket closed gracefully (code={msg.data})")
if msg.type == aiohttp.WSMsgType.ERROR:
raise ConnectionError(f"WebSocket error: {ws.exception()}")
if msg.type == aiohttp.WSMsgType.CLOSE:
raise ConnectionError(f"WebSocket closing (code={msg.data})")
# PING, PONG, BINARY etc - ignore
return {}
def _extract_text(msg: dict) -> str | None:
body = msg.get("body", {})
msgtype = body.get("msgtype", "")
if msgtype == "text":
return body.get("text", {}).get("content", "").strip() or None
if msgtype == "voice":
return body.get("voice", {}).get("content", "").strip() or None
return None
async def _handle_message(ws, msg: dict):
text = _extract_text(msg)
if not text:
return
logger.info("Received: %s", text)
loop = asyncio.get_running_loop()
success, result = await loop.run_in_executor(None, speak, text)
req_id = msg.get("headers", {}).get("req_id", "")
if success:
logger.info("TTS success")
reply_text = "已播报"
else:
logger.error("TTS failed: %s", result)
reply_text = f"播报失败: {result.get('error', 'unknown')}"
await _send(ws, "aibot_respond_msg", {
"msgtype": "stream",
"stream": {
"id": uuid.uuid4().hex[:16],
"finish": True,
"content": reply_text,
},
}, req_id=req_id)
async def _ping_loop(ws):
while True:
try:
await asyncio.sleep(PING_INTERVAL)
await _send(ws, "ping")
except asyncio.CancelledError:
break
except Exception:
logger.exception("Ping failed")
break
async def connect_and_serve():
while True:
try:
await _run_connection()
except ConnectionError:
logger.info("Connection rotated by server, reconnecting...")
await asyncio.sleep(5)
except Exception:
logger.exception("Connection lost, reconnecting in 5s...")
await asyncio.sleep(5)
async def _run_connection():
logger.info("Connecting to %s ...", WSS_URL)
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
WSS_URL, heartbeat=30, receive_timeout=300
) as ws:
logger.info("WebSocket connected")
await _send(ws, "aibot_subscribe", {
"bot_id": config.WECOM_BOT_ID,
"secret": config.WECOM_BOT_SECRET,
})
resp = await _recv(ws)
if resp.get("errcode") != 0:
logger.error("Subscribe failed: %s", resp)
return
logger.info("Subscribed successfully")
ping_task = asyncio.create_task(_ping_loop(ws))
try:
while True:
msg = await _recv(ws)
cmd = msg.get("cmd", "")
if cmd == "aibot_msg_callback":
asyncio.create_task(_handle_message(ws, msg))
elif cmd == "aibot_event_callback":
event_type = msg.get("body", {}).get("event", {}).get("eventtype", "")
logger.info("Event: %s", event_type)
elif cmd == "ping_response":
pass
elif cmd:
logger.debug("Cmd: %s", cmd)
finally:
ping_task.cancel()
try:
await ping_task
except asyncio.CancelledError:
pass