commit c7b8b01fe27cf2b34f5c1f9f2a41bfb96b374f85 Author: houhuan Date: Sun May 3 13:52:04 2026 +0800 Add WeWork XiaoAi TTS bot - WeChat Work long connection bridge Receives messages from WeChat Work bot via WebSocket long connection and speaks them through XiaoAi smart speaker TTS. Co-Authored-By: Claude Opus 4.7 diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..02b5a75 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,13 @@ +__pycache__ +*.pyc +.pytest_cache +.git +.gitignore +.venv +README.md +tests/ +refresh_token.py +save_token.py +deploy.sh +wework-bot.service +.gitea-ci.yml diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..d4f8ad3 --- /dev/null +++ b/.env.example @@ -0,0 +1,13 @@ +# WeChat Work Bot Long Connection +WECOM_BOT_ID=your_bot_id_here +WECOM_BOT_SECRET=your_bot_secret_here + +# Xiaomi TTS +XIAOMI_USER_ID=1136458602 +XIAOMI_TOKEN_PATH=.mi.token +XIAOMI_SPEAKER_DID=3ba2c1e8-d8cb-45c5-b88a-15624e7a02f3 + +# TTS Behavior +TTS_ENABLED=true +TTS_MAX_TEXT_LENGTH=500 +TTS_TIMEOUT_SECONDS=15 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ad61ccb --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.env +.mi.token +__pycache__/ +*.pyc +*.pyo +.pytest_cache/ +*.egg-info/ +dist/ +build/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..61b24be --- /dev/null +++ b/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install system deps (none needed for this project, just pip) +RUN pip install --no-cache-dir \ + aiohttp>=3.9.0 \ + python-dotenv>=1.0.0 \ + miservice_fork>=2.9.0 + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app/ app/ +COPY config.py run.py ./ + +# Don't COPY .env or .mi.token — they're mounted at runtime + +CMD ["python", "run.py"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..e789e96 --- /dev/null +++ b/README.md @@ -0,0 +1,208 @@ +# 企业微信 → 小爱同学 TTS 桥接服务 + +通过企业微信长连接(WebSocket)接收智能机器人消息,调用小爱音箱 TTS 朗读出来。 + +**你发什么,小爱就说什么。** + +--- + +## 工作原理 + +``` +你在企业微信给机器人发消息 + → 企微服务器通过 WebSocket 推送消息给本服务 + → 调用小爱音箱 TTS API 朗读 + → 小爱音箱发声 + → 回复"已播报"到企微 +``` + +全程**无需公网 IP、域名、回调 URL**,通过长连接主动连接企微,内网直接可用。 + +--- + +## 项目结构 + +``` +wework_xiaoai_bot/ +├── app/services/ +│ ├── ws_client.py # WebSocket 客户端(核心) +│ └── tts.py # 小爱 TTS 服务 +├── config.py # 配置加载(从 .env 读取) +├── run.py # 程序入口 +├── save_token.py # 小米 Token 续期工具 +├── Dockerfile # Docker 镜像 +├── docker-compose.yml # Docker 编排 +├── deploy.sh # NAS 一键部署脚本 +├── requirements.txt # Python 依赖 +├── tests/ # 测试 +├── .env.example # 环境变量模板 +└── README.md +``` + +--- + +## 前置条件 + +### 1. 企业微信智能机器人 + +在企微管理后台创建智能机器人,开启「API 模式」→「长连接」: + +1. 登录 [企业微信管理后台](https://work.weixin.qq.com) +2. 进入「应用管理」→「智能机器人」 +3. 创建机器人,开启 API 模式,选择「长连接」 +4. 获取 **Bot ID** 和 **Secret** + +### 2. 小爱音箱 + +需要一个已配网的小爱音箱,且已登录小米账号。 + +--- + +## 快速开始(本地/Docker) + +### 1. 配置 + +```bash +cp .env.example .env +``` + +编辑 `.env`,填入你的企微 Bot ID 和 Secret: + +```env +WECOM_BOT_ID=你的BotID +WECOM_BOT_SECRET=你的Secret +``` + +### 2. 生成小米 Token + +**在 Windows/Mac(有图形界面的机器)上运行一次:** + +```bash +pip install -r requirements.txt +python save_token.py +``` + +按提示输入小米密码,脚本自动生成 `.mi.token` 文件。 +> Token 有效期通常 **1-3 个月**。过期后重新运行此脚本即可。 + +### 3. 启动 + +#### Docker(推荐) + +```bash +docker compose up -d +``` + +管理命令: +```bash +docker compose logs -f # 查看实时日志 +docker compose restart # 重启 +docker compose down # 停止 +``` + +#### 直接运行 + +```bash +pip install -r requirements.txt +python run.py +``` + +--- + +## NAS 部署(Docker) + +### 1. 准备工作 +在 Windows 上完成「快速开始」的第 1、2 步,确保 `.env` 和 `.mi.token` 都在项目目录下。 + +### 2. 传到 NAS + +```bash +scp -r wework_xiaoai_bot/ user@nas-ip:/opt/wework_xiaoai_bot/ +``` + +### 3. 在 NAS 上启动 + +```bash +ssh user@nas-ip +cd /opt/wework_xiaoai_bot +docker compose up -d +``` + +也可以用 `deploy.sh` 一键部署: + +```bash +chmod +x deploy.sh +./deploy.sh <用户名> +``` + +--- + +## Token 续期 + +当 Token 过期(小爱播报失败,日志出现 `Login failed`): + +1. 在 Windows 上运行 `python save_token.py` +2. 将生成的 `.mi.token` 传到 NAS 项目目录 +3. `docker compose restart` + +--- + +## 配置说明 + +| 环境变量 | 说明 | 必填 | +|---------|------|------| +| `WECOM_BOT_ID` | 企微智能机器人 Bot ID | 是 | +| `WECOM_BOT_SECRET` | 企微智能机器人 Secret | 是 | +| `XIAOMI_USER_ID` | 小米账号 ID | 否 | +| `XIAOMI_TOKEN_PATH` | 小米 Token 文件路径 | 否(默认 `.mi.token`) | +| `XIAOMI_SPEAKER_DID` | 小爱音箱设备 ID | 否 | +| `TTS_ENABLED` | 是否启用 TTS | 否(默认 true) | +| `TTS_MAX_TEXT_LENGTH` | 最大播报长度 | 否(默认 500) | + +--- + +## 日志示例 + +正常运行时: + +``` +[INFO] Connecting to wss://openws.work.weixin.qq.com ... +[INFO] WebSocket connected +[INFO] Subscribed successfully +[INFO] Received: 你好世界 +[INFO] TTS success +``` + +故障时自动重连: + +``` +[ERROR] Connection lost, reconnecting in 5s... +[INFO] WebSocket connected +[INFO] Subscribed successfully +``` + +--- + +## 技术要点 + +- **长连接**:WebSocket `wss://openws.work.weixin.qq.com`,无需公网 IP +- **消息加解密**:长连接模式免加解密,ws 协议层自带加密 +- **心跳保活**:WebSocket 层 + 应用层双重心跳,每 30 秒 +- **断线重连**:自动检测断线,5 秒后重连 +- **Token 保护**:passToken 自动备份,防止 serviceToken 过期时丢失 + +--- + +## 依赖 + +``` +aiohttp>=3.9.0 +python-dotenv>=1.0.0 +miservice_fork>=2.9.0 +``` + +--- + +## License + +MIT diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/tts.py b/app/services/tts.py new file mode 100644 index 0000000..72ce1e6 --- /dev/null +++ b/app/services/tts.py @@ -0,0 +1,101 @@ +import asyncio +import json +import logging +import threading +from pathlib import Path +from typing import Tuple, Any, Dict + +from aiohttp import ClientSession +from miservice import MiAccount, MiNAService, MiTokenStore + +import config + +logger = logging.getLogger(__name__) + + +class SafeTokenStore(MiTokenStore): + """Wraps MiTokenStore to never lose passToken on auth failure.""" + + def __init__(self, token_path): + super().__init__(token_path) + self._saved_pass_token = "" + self._load_backup() + + def _load_backup(self): + path = Path(self.token_path) + backup = Path(str(path) + ".backup") + if backup.exists(): + try: + data = json.loads(backup.read_text("utf-8")) + self._saved_pass_token = data.get("passToken", "") + except Exception: + pass + + def _save_backup(self, token): + path = Path(self.token_path) + backup = Path(str(path) + ".backup") + try: + backup.write_text(json.dumps(token, ensure_ascii=False, indent=2), encoding="utf-8") + except Exception: + pass + + def save_token(self, token=None): + if token and token.get("passToken"): + self._saved_pass_token = token["passToken"] + self._save_backup(token) + elif token is None and self._saved_pass_token: + # miservice is trying to delete token after auth failure + # Don't let it — restore from backup + logger.warning("miservice tried to wipe token, restoring passToken...") + return + super().save_token(token) + + +def _run_async_in_thread(coro, timeout: float = 15.0): + result = None + error = None + + def _target(): + nonlocal result, error + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + result = loop.run_until_complete(coro) + except Exception as e: + error = e + finally: + loop.close() + + t = threading.Thread(target=_target) + t.start() + t.join(timeout=timeout) + if error: + raise error + return result + + +def speak(text: str) -> Tuple[bool, Dict[str, Any]]: + if not config.TTS_ENABLED: + logger.info("TTS disabled, skipping: %s", text) + return True, {"skipped": True} + + text = text[: config.TTS_MAX_TEXT_LENGTH].strip() + if not text: + return False, {"error": "empty text after truncation"} + + async def _tts(): + token_store = SafeTokenStore(config.XIAOMI_TOKEN_PATH) + async with ClientSession() as session: + account = MiAccount( + session, config.XIAOMI_USER_ID, None, token_store + ) + mina = MiNAService(account) + return await mina.text_to_speech(config.XIAOMI_SPEAKER_DID, text) + + try: + result = _run_async_in_thread(_tts(), timeout=config.TTS_TIMEOUT_SECONDS) + ok = isinstance(result, dict) and result.get("code") == 0 + return ok, result or {} + except Exception as e: + logger.exception("TTS call failed") + return False, {"error": str(e)} diff --git a/app/services/ws_client.py b/app/services/ws_client.py new file mode 100644 index 0000000..f8d1119 --- /dev/null +++ b/app/services/ws_client.py @@ -0,0 +1,140 @@ +import asyncio +import json +import logging +import uuid + +import aiohttp + +from app.services.tts import speak +import config + +logger = logging.getLogger(__name__) + +WSS_URL = "wss://openws.work.weixin.qq.com" +PING_INTERVAL = 30 + + +async def _send(ws, cmd: str, body: dict | None = None, req_id: str | None = None): + if req_id is None: + req_id = uuid.uuid4().hex[:16] + msg = {"cmd": cmd, "headers": {"req_id": req_id}} + if body is not None: + msg["body"] = body + await ws.send_str(json.dumps(msg, ensure_ascii=False)) + return req_id + + +async def _recv(ws) -> dict: + msg = await ws.receive() + if msg.type == aiohttp.WSMsgType.TEXT: + return json.loads(msg.data) + if msg.type == aiohttp.WSMsgType.CLOSED: + raise ConnectionError(f"WebSocket closed gracefully (code={msg.data})") + if msg.type == aiohttp.WSMsgType.ERROR: + raise ConnectionError(f"WebSocket error: {ws.exception()}") + if msg.type == aiohttp.WSMsgType.CLOSE: + raise ConnectionError(f"WebSocket closing (code={msg.data})") + # PING, PONG, BINARY etc - ignore + return {} + + +def _extract_text(msg: dict) -> str | None: + body = msg.get("body", {}) + msgtype = body.get("msgtype", "") + if msgtype == "text": + return body.get("text", {}).get("content", "").strip() or None + if msgtype == "voice": + return body.get("voice", {}).get("content", "").strip() or None + return None + + +async def _handle_message(ws, msg: dict): + text = _extract_text(msg) + if not text: + return + + logger.info("Received: %s", text) + + loop = asyncio.get_running_loop() + success, result = await loop.run_in_executor(None, speak, text) + + req_id = msg.get("headers", {}).get("req_id", "") + + if success: + logger.info("TTS success") + reply_text = "已播报" + else: + logger.error("TTS failed: %s", result) + reply_text = f"播报失败: {result.get('error', 'unknown')}" + + await _send(ws, "aibot_respond_msg", { + "msgtype": "stream", + "stream": { + "id": uuid.uuid4().hex[:16], + "finish": True, + "content": reply_text, + }, + }, req_id=req_id) + + +async def _ping_loop(ws): + while True: + try: + await asyncio.sleep(PING_INTERVAL) + await _send(ws, "ping") + except asyncio.CancelledError: + break + except Exception: + logger.exception("Ping failed") + break + + +async def connect_and_serve(): + while True: + try: + await _run_connection() + except Exception: + logger.exception("Connection lost, reconnecting in 5s...") + await asyncio.sleep(5) + + +async def _run_connection(): + logger.info("Connecting to %s ...", WSS_URL) + async with aiohttp.ClientSession() as session: + async with session.ws_connect( + WSS_URL, heartbeat=30, receive_timeout=300 + ) as ws: + logger.info("WebSocket connected") + + await _send(ws, "aibot_subscribe", { + "bot_id": config.WECOM_BOT_ID, + "secret": config.WECOM_BOT_SECRET, + }) + resp = await _recv(ws) + if resp.get("errcode") != 0: + logger.error("Subscribe failed: %s", resp) + return + logger.info("Subscribed successfully") + + ping_task = asyncio.create_task(_ping_loop(ws)) + + try: + while True: + msg = await _recv(ws) + cmd = msg.get("cmd", "") + + if cmd == "aibot_msg_callback": + asyncio.create_task(_handle_message(ws, msg)) + elif cmd == "aibot_event_callback": + event_type = msg.get("body", {}).get("event", {}).get("eventtype", "") + logger.info("Event: %s", event_type) + elif cmd == "ping_response": + pass + elif cmd: + logger.debug("Cmd: %s", cmd) + finally: + ping_task.cancel() + try: + await ping_task + except asyncio.CancelledError: + pass diff --git a/config.py b/config.py new file mode 100644 index 0000000..cac5018 --- /dev/null +++ b/config.py @@ -0,0 +1,43 @@ +import os +from pathlib import Path +from dotenv import load_dotenv + +load_dotenv() + + +def _env(key: str, default: str = "") -> str: + return os.getenv(key, default) + + +def _env_bool(key: str, default: bool = True) -> bool: + val = os.getenv(key, "").lower() + if val in ("0", "false", "no"): + return False + if val in ("1", "true", "yes"): + return True + return default + + +def _env_int(key: str, default: int) -> int: + try: + return int(os.getenv(key, "")) + except (ValueError, TypeError): + return default + + +# WeChat Work Bot +WECOM_BOT_ID = _env("WECOM_BOT_ID") +WECOM_BOT_SECRET = _env("WECOM_BOT_SECRET") + +# Xiaomi TTS +XIAOMI_USER_ID = _env("XIAOMI_USER_ID", "1136458602") +XIAOMI_TOKEN_PATH = _env( + "XIAOMI_TOKEN_PATH", + str(Path(__file__).resolve().parent / ".mi.token"), +) +XIAOMI_SPEAKER_DID = _env("XIAOMI_SPEAKER_DID", "3ba2c1e8-d8cb-45c5-b88a-15624e7a02f3") + +# TTS +TTS_ENABLED = _env_bool("TTS_ENABLED", True) +TTS_MAX_TEXT_LENGTH = _env_int("TTS_MAX_TEXT_LENGTH", 500) +TTS_TIMEOUT_SECONDS = _env_int("TTS_TIMEOUT_SECONDS", 15) diff --git a/deploy.sh b/deploy.sh new file mode 100644 index 0000000..63310b6 --- /dev/null +++ b/deploy.sh @@ -0,0 +1,49 @@ +#!/bin/bash +# Deploy WeWork XiaoAi Bot to NAS via Docker +# Usage: ./deploy.sh [nas-user] + +set -e + +NAS_USER="${2:-root}" +NAS_HOST="$1" +NAS_PATH="/opt/wework_xiaoai_bot" + +if [ -z "$NAS_HOST" ]; then + echo "Usage: ./deploy.sh [nas-user]" + echo "Example: ./deploy.sh 192.168.1.100 root" + exit 1 +fi + +echo "=== Deploying to $NAS_USER@$NAS_HOST:$NAS_PATH ===" + +# 1. Create directory and copy files +ssh "$NAS_USER@$NAS_HOST" "mkdir -p $NAS_PATH" +scp docker-compose.yml Dockerfile .dockerignore requirements.txt \ + "$NAS_USER@$NAS_HOST:$NAS_PATH/" +ssh "$NAS_USER@$NAS_HOST" "mkdir -p $NAS_PATH/app/services" +scp app/services/tts.py app/services/ws_client.py app/services/__init__.py \ + "$NAS_USER@$NAS_HOST:$NAS_PATH/app/services/" +scp app/__init__.py config.py run.py \ + "$NAS_USER@$NAS_HOST:$NAS_PATH/app/" +scp config.py run.py "$NAS_USER@$NAS_HOST:$NAS_PATH/" + +# 2. Copy .env and token +echo "=== Copying config files ===" +scp .env "$NAS_USER@$NAS_HOST:$NAS_PATH/.env" +scp .mi.token "$NAS_USER@$NAS_HOST:$NAS_PATH/.mi.token" + +# 3. Build and start on NAS +echo "=== Building and starting on NAS ===" +ssh "$NAS_USER@$NAS_HOST" << 'ENDSSH' +cd /opt/wework_xiaoai_bot +docker compose build +docker compose down 2>/dev/null || true +docker compose up -d +echo "=== Done! ===" +echo "" +echo "Useful commands on NAS:" +echo " docker compose -f /opt/wework_xiaoai_bot/docker-compose.yml logs -f" +echo " docker compose -f /opt/wework_xiaoai_bot/docker-compose.yml restart" +echo " docker compose -f /opt/wework_xiaoai_bot/docker-compose.yml down" +echo " docker compose -f /opt/wework_xiaoai_bot/docker-compose.yml up -d" +ENDSSH diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..ad6d09c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,13 @@ +services: + bot: + build: . + container_name: wework-xiaoai-bot + restart: unless-stopped + volumes: + - ./.env:/app/.env:ro + - ./.mi.token:/app/.mi.token + logging: + driver: json-file + options: + max-size: "10m" + max-file: "3" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..76bf105 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +websockets>=13.0 +python-dotenv>=1.0.0 +miservice_fork>=2.9.0 +aiohttp>=3.9.0 +pytest>=8.0.0 +pytest-asyncio>=0.23.0 diff --git a/run.py b/run.py new file mode 100644 index 0000000..2cea976 --- /dev/null +++ b/run.py @@ -0,0 +1,11 @@ +import asyncio +import logging + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) + +if __name__ == "__main__": + from app.services.ws_client import connect_and_serve + asyncio.run(connect_and_serve()) diff --git a/save_token.py b/save_token.py new file mode 100644 index 0000000..65d6d36 --- /dev/null +++ b/save_token.py @@ -0,0 +1,53 @@ +"""Extract serviceToken using passToken and save to ~/.mi.token""" +import asyncio, json, base64, hashlib, secrets, string +from pathlib import Path +from aiohttp import ClientSession + +PASS_TOKEN = "V1:DXmurwq2/R1BHTELu6obCYX7Rq/4OZACAywwHdbXYQdJStLxyGwZzcsUMnk6tnJfQvY6HNI1whU+hcSk8uE2Odjpi89ldPfcJTiZ9Tzm4tcaXJakpJa4yk+FuEFO3bqcF941B0MznsQT+HGEETHP/CHAKpKlxavwIZNwuH2TYJH9gFMEhxzKf5vDASX86lyVPUjXOZF1e+qN0+62zJU6HTorL2fiYXYWL+ikS3mKCHQCaB/+NjqLEGohcmXsCKEEyE6ImJbk6nnQB/EobhcK6GhE8zHSL8MAXhppsfhCt+flp0ymq5ntBGnqHshdySMQaQxklVv8JNvVCr5FdZs4kQ==" +USER_ID = "1136458602" + +async def main(): + async with ClientSession() as s: + headers = {"User-Agent": "Mozilla/5.0"} + device_id = "".join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(16)) + cookies = { + "sdkVersion": "3.9", + "deviceId": device_id, + "userId": USER_ID, + "passToken": PASS_TOKEN, + } + + url = "https://account.xiaomi.com/pass/serviceLogin?sid=micoapi&_json=true" + async with s.get(url, cookies=cookies, headers=headers) as r: + raw = await r.text() + data = json.loads(raw[11:]) + + print(f"serviceLogin code: {data.get('code')}") + ssecurity = data["ssecurity"] + location = data["location"] + nonce = str(data["nonce"]) + new_pass = data.get("passToken", PASS_TOKEN) + + nsec = f"nonce={nonce}&{ssecurity}" + client_sign = base64.b64encode(hashlib.sha1(nsec.encode()).digest()).decode() + sts_url = f"{location}&clientSign={client_sign}" + + async with s.get(sts_url, headers=headers) as r2: + service_token = r2.cookies.get("serviceToken", "").value + + token = { + "deviceId": device_id, + "userId": USER_ID, + "passToken": new_pass, + "micoapi": [ssecurity, service_token], + } + + token_path = Path.home() / ".mi.token" + token_path.write_text(json.dumps(token, ensure_ascii=False, indent=2), encoding="utf-8") + print(f"Token saved to {token_path}") + print(f" userId: {USER_ID}") + print(f" passToken: OK") + print(f" ssecurity: OK") + print(f" serviceToken: OK ({service_token[:30]}...)") + +asyncio.run(main()) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_ws_client.py b/tests/test_ws_client.py new file mode 100644 index 0000000..150e5ee --- /dev/null +++ b/tests/test_ws_client.py @@ -0,0 +1,84 @@ +import json +import pytest + +from app.services.ws_client import _extract_text + + +def test_extract_text_text_message(): + msg = { + "cmd": "aibot_msg_callback", + "headers": {"req_id": "req123"}, + "body": { + "msgtype": "text", + "text": {"content": "你好世界"}, + }, + } + assert _extract_text(msg) == "你好世界" + + +def test_extract_text_voice_message(): + msg = { + "cmd": "aibot_msg_callback", + "headers": {"req_id": "req123"}, + "body": { + "msgtype": "voice", + "voice": {"content": "语音识别结果"}, + }, + } + assert _extract_text(msg) == "语音识别结果" + + +def test_extract_text_image_returns_none(): + msg = { + "cmd": "aibot_msg_callback", + "body": { + "msgtype": "image", + "image": {"url": "https://example.com/pic.jpg"}, + }, + } + assert _extract_text(msg) is None + + +def test_extract_text_empty_content(): + msg = { + "cmd": "aibot_msg_callback", + "body": { + "msgtype": "text", + "text": {"content": " "}, + }, + } + assert _extract_text(msg) is None + + +def test_extract_text_missing_body(): + msg = {"cmd": "aibot_msg_callback"} + assert _extract_text(msg) is None + + +def test_subscribe_message_format(): + """Verify subscribe message matches documented format""" + msg = { + "cmd": "aibot_subscribe", + "headers": {"req_id": "test_req_id"}, + "body": { + "bot_id": "test_bot_id", + "secret": "test_secret", + }, + } + data = json.dumps(msg) + parsed = json.loads(data) + assert parsed["cmd"] == "aibot_subscribe" + assert parsed["body"]["bot_id"] == "test_bot_id" + assert parsed["body"]["secret"] == "test_secret" + + +def test_ping_message_format(): + """Verify ping message format matches documented format""" + msg = { + "cmd": "ping", + "headers": {"req_id": "test_req_id"}, + } + data = json.dumps(msg) + parsed = json.loads(data) + assert parsed["cmd"] == "ping" + assert "req_id" in parsed["headers"]