Files
SaleShow/automation/secsion.py
T
houhuan b402612641 fix: 增强导出下载诊断能力,缩短超时时间
下载超时从 300s 减至 120s,失败时自动保存截图、打印服务端响应内容、
检查页面错误提示和新标签页,便于定位 download 事件未触发的根因。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-17 18:23:32 +08:00

388 lines
16 KiB
Python

"""
secsion.com 自动化下载模块
使用 Playwright 登录 secsion.com,导出销售报表
"""
import asyncio
import os
import logging
import argparse
from datetime import datetime
from playwright.async_api import async_playwright
logger = logging.getLogger(__name__)
class SecsionDownloader:
"""从 secsion.com 自动下载销售报表"""
LOGIN_URL = "https://secsion.com:8000/login?redirect=%252Fhomepage"
STATS_URL = "https://secsion.com:8000/commodityStatistics"
def __init__(self, username, password, download_dir=None, shop_id=None):
self.username = username
self.password = password
self.shop_id = shop_id or ''
self.download_dir = download_dir or os.path.join(os.getcwd(), "downloads")
os.makedirs(self.download_dir, exist_ok=True)
async def download_report(self, start_date, end_date, retry_count=3):
"""
下载指定日期范围的销售报表
Args:
start_date: 开始日期 (YYYY-MM-DD)
end_date: 结束日期 (YYYY-MM-DD)
retry_count: 重试次数(默认3次)
Returns:
str: 下载文件的本地路径,失败返回 None
"""
for attempt in range(retry_count):
try:
logger.info(f"开始下载报表: {start_date} ~ {end_date} (第 {attempt + 1}/{retry_count} 次)")
async with async_playwright() as p:
# Docker 优化:添加 --disable-dev-shm-usage 避免共享内存不足
browser = await p.chromium.launch(
headless=True,
args=[
"--disable-dev-shm-usage",
"--disable-gpu",
"--single-process"
]
)
context = await browser.new_context(
ignore_https_errors=True,
viewport={'width': 1280, 'height': 800}
)
page = await context.new_page()
try:
await self._login(page)
file_path = await self._export_report(page, start_date, end_date)
logger.info(f"报表下载完成: {file_path}")
return file_path
finally:
await browser.close()
except Exception as e:
logger.error(f"下载报表失败 (第 {attempt + 1}/{retry_count} 次): {e}")
if attempt < retry_count - 1:
wait_time = (attempt + 1) * 5
logger.info(f"等待 {wait_time} 秒后重试...")
await asyncio.sleep(wait_time)
continue
logger.error(f"下载报表最终失败 (重试 {retry_count} 次均失败)")
return None
async def _login(self, page):
"""登录 secsion.com"""
logger.info(f"打开登录页面: {self.LOGIN_URL}")
await page.goto(self.LOGIN_URL, timeout=30000)
# 选择角色 "店铺"
logger.info("选择角色: 店铺")
try:
await page.get_by_text("店铺", exact=True).click(timeout=10000)
except Exception:
await page.click("text=店铺", timeout=10000)
# 输入账号密码
logger.info(f"输入账号: {self.username}")
await page.get_by_placeholder("请输入用户名").fill(self.username)
await page.get_by_placeholder("请输入密码").fill(self.password)
# 勾选记住密码
if await page.get_by_text("记住密码").is_visible():
await page.get_by_text("记住密码").click()
# 点击登录
logger.info("点击登录按钮")
try:
await page.click("button:has-text('登录')", timeout=5000)
except Exception:
await page.click("button[type='submit']", timeout=5000)
# 等待跳转(Docker 中需要更长时间)
logger.info("等待登录跳转...")
await page.wait_for_url("**/homePage", timeout=30000)
logger.info("登录成功")
async def _export_report(self, page, start_date, end_date):
"""访问统计页面并导出报表"""
logger.info(f"访问统计页面: {self.STATS_URL}")
await page.goto(self.STATS_URL, timeout=30000)
await page.wait_for_load_state("networkidle", timeout=30000)
export_btn = page.get_by_role("button", name="导出报表")
await export_btn.wait_for(state="visible", timeout=30000)
logger.info(f"设置查询日期范围: {start_date} ~ {end_date}")
start_input = page.get_by_role("textbox", name="请选择日期").nth(0)
end_input = page.get_by_role("textbox", name="请选择日期").nth(1)
# 设置开始日期(内部已处理 Enter 确认 + Escape 关闭)
await self._set_date(page, start_input, start_date)
await page.wait_for_timeout(500)
# 设置结束日期
await self._set_date(page, end_input, end_date)
await page.wait_for_timeout(500)
# 验证日期设置结果
start_val = await start_input.input_value()
end_val = await end_input.input_value()
logger.info(f"日期设置结果: 开始={start_val}, 结束={end_val}")
# 等待数据请求完成 + 表格渲染(Docker 中增加等待时间)
logger.info("等待数据请求完成...")
await asyncio.sleep(3)
# 检查数据是否加载完成(等待loading消失或有实际数据)
try:
# 等待加载指示符消失或数据表格出现
await page.wait_for_function(
"""() => {
// 检查是否存在加载中的标志
const loading = document.querySelector('[class*="loading"]');
if (loading && loading.style.display !== 'none') return false;
// 检查是否有数据行
const rows = document.querySelectorAll('table tbody tr');
return rows.length > 0;
}""",
timeout=30000
)
logger.info("数据表格已加载")
except Exception as e:
logger.warning(f"表格加载检查失败: {e},继续执行...")
await asyncio.sleep(3)
# 如果配置了 shop_id,拦截导出请求注入 shop_id,并捕获服务端响应
import json
export_response = {'status': None, 'body': None, 'content_type': None}
if self.shop_id:
async def inject_shop_id(route):
request = route.request
body = json.loads(request.post_data)
body['shop_id'] = self.shop_id
logger.info(f"注入 shop_id: {self.shop_id}")
await route.continue_(post_data=json.dumps(body))
await page.route('**/api/bill/export', inject_shop_id)
logger.info(f"已设置 shop_id 拦截: {self.shop_id}")
# 捕获导出接口的响应(用于调试)
async def on_response(response):
if '/api/bill/export' in response.url:
export_response['status'] = response.status
export_response['content_type'] = response.headers.get('content-type', '')
try:
body = await response.text()
export_response['body'] = body[:2000] if body else ''
except Exception:
export_response['body'] = '(binary or empty)'
logger.info(f"导出接口响应: status={response.status}, content-type={export_response['content_type']}, body长度={len(export_response['body'] or '')}")
page.on("response", on_response)
# 点击导出报表并捕获下载
logger.info("点击导出报表...")
download_timeout = 120000 # 2 分钟
try:
async with page.expect_download(timeout=download_timeout) as download_info:
await export_btn.click()
logger.info("等待文件下载中...")
download = await download_info.value
filename = download.suggested_filename
save_path = os.path.join(self.download_dir, filename)
await download.save_as(save_path)
logger.info(f"报表已保存至: {save_path}")
return save_path
except Exception as download_err:
# 下载事件未触发,进行诊断
logger.warning(f"下载事件捕获失败: {download_err}")
# 保存调试截图
try:
screenshot_path = os.path.join(self.download_dir, f"debug_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
await page.screenshot(path=screenshot_path, full_page=True)
logger.info(f"调试截图已保存: {screenshot_path}")
except Exception as ss_err:
logger.warning(f"截图保存失败: {ss_err}")
# 打印捕获到的响应信息
if export_response['status']:
logger.info(f"服务端实际响应: status={export_response['status']}, content-type={export_response['content_type']}")
if export_response['body']:
logger.info(f"响应内容(前500字): {export_response['body'][:500]}")
else:
logger.warning("未捕获到 /api/bill/export 响应,可能是请求被拦截或未发出")
# 检查页面是否有错误提示
try:
error_text = await page.evaluate("""() => {
const msgs = document.querySelectorAll('.t-message--error, .t-notification--error, [class*="error"], .el-message--error');
return Array.from(msgs).map(el => el.textContent.trim()).filter(Boolean).join(' | ');
}""")
if error_text:
logger.error(f"页面错误提示: {error_text}")
except Exception:
pass
# 检查是否有新打开的标签页(某些网站通过 window.open 下载)
try:
pages = page.context.pages
if len(pages) > 1:
logger.info(f"检测到 {len(pages)} 个标签页,检查新标签页...")
for p in pages[1:]:
url = p.url
logger.info(f"新标签页 URL: {url}")
if url.startswith('blob:') or 'download' in url.lower() or 'export' in url.lower():
# 尝试从新标签页下载
try:
async with p.expect_download(timeout=30000) as dl_info:
pass
download = await dl_info.value
filename = download.suggested_filename
save_path = os.path.join(self.download_dir, filename)
await download.save_as(save_path)
logger.info(f"从新标签页下载成功: {save_path}")
return save_path
except Exception:
pass
except Exception:
pass
raise
async def _set_date(self, page, input_box, date_str):
"""
设置 TDesign 日期选择器的值
TDesign 的 needconfirm="true" 模式要求:
1. 点击输入框打开日历
2. 点击日期格子选择日期
3. 在输入框上按 Enter 确认(关键!不确认则关闭时回滚)
4. Escape 关闭日历
"""
max_attempts = 5
for attempt in range(max_attempts):
try:
logger.info(f"设置日期: {date_str} (第 {attempt + 1}/{max_attempts} 次尝试)")
# 1. 点击输入框打开日历
await input_box.click()
await page.wait_for_timeout(800)
# 2. 点击目标日期格子
target_day = str(int(date_str.split("-")[2]))
day_cells = page.get_by_role("cell", name=target_day)
cell_count = await day_cells.count()
if cell_count > 0:
logger.debug(f"找到 {cell_count} 个日期格子,点击第一个")
await day_cells.first.click()
await page.wait_for_timeout(800)
else:
logger.warning(f"未找到日期格子: {target_day},重试...")
await page.keyboard.press("Escape")
await page.wait_for_timeout(500)
continue
# 3. Enter 确认(needconfirm="true" 必须显式确认)
await input_box.press("Enter")
await page.wait_for_timeout(800)
# 4. Escape 关闭日历
await page.keyboard.press("Escape")
await page.wait_for_timeout(800)
# 5. 验证
val = await input_box.input_value()
if date_str in val:
logger.info(f"日期设置成功: {val}")
return
logger.warning(f"日期设置验证失败: 期望包含 '{date_str}', 实际 '{val}',重试...")
await page.wait_for_timeout(500)
except Exception as e:
logger.warning(f"日期设置异常 (第 {attempt + 1}/{max_attempts} 次): {e}")
await page.keyboard.press("Escape")
await page.wait_for_timeout(500)
continue
logger.error(f"日期设置失败({max_attempts}次尝试后): {date_str}")
async def download_report(start_date, end_date, username=None, password=None, download_dir=None, shop_id=None):
"""
便捷函数:下载指定日期范围的报表
Args:
start_date: 开始日期 (YYYY-MM-DD)
end_date: 结束日期 (YYYY-MM-DD)
username: secsion.com 用户名(可选,优先使用 config)
password: secsion.com 密码(可选,优先使用 config)
download_dir: 下载目录(可选)
Returns:
str: 下载文件路径,失败返回 None
"""
if not username or not password:
from config import Config
creds = Config.get_secsion_credentials()
if not creds:
logger.error("未配置 secsion.com 登录凭据")
return None
username, password = creds
if not shop_id:
try:
from config import Config
shop_id = Config.get_shop_id()
except Exception:
pass
downloader = SecsionDownloader(username, password, download_dir, shop_id)
return await downloader.download_report(start_date, end_date)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('automation.log', encoding='utf-8')
]
)
parser = argparse.ArgumentParser(description='secsion.com 报表自动下载工具')
parser.add_argument('--start', type=str, help='开始日期 (YYYY-MM-DD)', default=datetime.now().strftime('%Y-%m-%d'))
parser.add_argument('--end', type=str, help='结束日期 (YYYY-MM-DD)')
parser.add_argument('--username', type=str, help='secsion.com 用户名')
parser.add_argument('--password', type=str, help='secsion.com 密码')
args = parser.parse_args()
end_date = args.end or args.start
result = asyncio.run(download_report(
start_date=args.start,
end_date=end_date,
username=args.username,
password=args.password
))
if result:
print(f"下载成功: {result}")
else:
print("下载失败")
exit(1)