Files
wework_xiaoai_bot/app/services/tts.py
T
houhuan c7b8b01fe2 Add WeWork XiaoAi TTS bot - WeChat Work long connection bridge
Receives messages from WeChat Work bot via WebSocket long connection
and speaks them through XiaoAi smart speaker TTS.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-03 13:52:04 +08:00

102 lines
3.0 KiB
Python

import asyncio
import json
import logging
import threading
from pathlib import Path
from typing import Tuple, Any, Dict
from aiohttp import ClientSession
from miservice import MiAccount, MiNAService, MiTokenStore
import config
logger = logging.getLogger(__name__)
class SafeTokenStore(MiTokenStore):
"""Wraps MiTokenStore to never lose passToken on auth failure."""
def __init__(self, token_path):
super().__init__(token_path)
self._saved_pass_token = ""
self._load_backup()
def _load_backup(self):
path = Path(self.token_path)
backup = Path(str(path) + ".backup")
if backup.exists():
try:
data = json.loads(backup.read_text("utf-8"))
self._saved_pass_token = data.get("passToken", "")
except Exception:
pass
def _save_backup(self, token):
path = Path(self.token_path)
backup = Path(str(path) + ".backup")
try:
backup.write_text(json.dumps(token, ensure_ascii=False, indent=2), encoding="utf-8")
except Exception:
pass
def save_token(self, token=None):
if token and token.get("passToken"):
self._saved_pass_token = token["passToken"]
self._save_backup(token)
elif token is None and self._saved_pass_token:
# miservice is trying to delete token after auth failure
# Don't let it — restore from backup
logger.warning("miservice tried to wipe token, restoring passToken...")
return
super().save_token(token)
def _run_async_in_thread(coro, timeout: float = 15.0):
result = None
error = None
def _target():
nonlocal result, error
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(coro)
except Exception as e:
error = e
finally:
loop.close()
t = threading.Thread(target=_target)
t.start()
t.join(timeout=timeout)
if error:
raise error
return result
def speak(text: str) -> Tuple[bool, Dict[str, Any]]:
if not config.TTS_ENABLED:
logger.info("TTS disabled, skipping: %s", text)
return True, {"skipped": True}
text = text[: config.TTS_MAX_TEXT_LENGTH].strip()
if not text:
return False, {"error": "empty text after truncation"}
async def _tts():
token_store = SafeTokenStore(config.XIAOMI_TOKEN_PATH)
async with ClientSession() as session:
account = MiAccount(
session, config.XIAOMI_USER_ID, None, token_store
)
mina = MiNAService(account)
return await mina.text_to_speech(config.XIAOMI_SPEAKER_DID, text)
try:
result = _run_async_in_thread(_tts(), timeout=config.TTS_TIMEOUT_SECONDS)
ok = isinstance(result, dict) and result.get("code") == 0
return ok, result or {}
except Exception as e:
logger.exception("TTS call failed")
return False, {"error": str(e)}