Files
SaleShow/automation/secsion.py
T
houhuan 975f9e5887 优化: Docker 环境下的下载性能和网络稳定性
- Chromium 启动参数优化:禁用 dev-shm 和 GPU 加速,防止Docker内存不足
- 增加所有超时时间:login/navigate/export 超时 30s,下载超时 300s
- 改进网络延迟处理:增加数据加载等待时间,添加网络加载检测
- Docker Compose 资源配置:限制 2 CPU / 2GB 内存,DNS 配置国际公共 DNS
- Dockerfile 优化:添加 PYTHONHASHSEED 环境变量,跳过浏览器下载校验
- 新增 docker-debug.sh 脚本:便捷测试 Docker 容器中的下载功能
- 新增 .dockerignore:加速 Docker 构建,减少镜像大小

Docker 下载现在支持更长的网络延迟和更大的数据量
2026-05-17 16:09:55 +08:00

313 lines
12 KiB
Python

"""
secsion.com 自动化下载模块
使用 Playwright 登录 secsion.com,导出销售报表
"""
import asyncio
import os
import logging
import argparse
from datetime import datetime
from playwright.async_api import async_playwright
logger = logging.getLogger(__name__)
class SecsionDownloader:
"""从 secsion.com 自动下载销售报表"""
LOGIN_URL = "https://secsion.com:8000/login?redirect=%252Fhomepage"
STATS_URL = "https://secsion.com:8000/commodityStatistics"
def __init__(self, username, password, download_dir=None, shop_id=None):
self.username = username
self.password = password
self.shop_id = shop_id or ''
self.download_dir = download_dir or os.path.join(os.getcwd(), "downloads")
os.makedirs(self.download_dir, exist_ok=True)
async def download_report(self, start_date, end_date, retry_count=3):
"""
下载指定日期范围的销售报表
Args:
start_date: 开始日期 (YYYY-MM-DD)
end_date: 结束日期 (YYYY-MM-DD)
retry_count: 重试次数(默认3次)
Returns:
str: 下载文件的本地路径,失败返回 None
"""
for attempt in range(retry_count):
try:
logger.info(f"开始下载报表: {start_date} ~ {end_date} (第 {attempt + 1}/{retry_count} 次)")
async with async_playwright() as p:
# Docker 优化:添加 --disable-dev-shm-usage 避免共享内存不足
browser = await p.chromium.launch(
headless=True,
args=[
"--disable-dev-shm-usage",
"--disable-gpu",
"--single-process"
]
)
context = await browser.new_context(
ignore_https_errors=True,
viewport={'width': 1280, 'height': 800}
)
page = await context.new_page()
try:
await self._login(page)
file_path = await self._export_report(page, start_date, end_date)
logger.info(f"报表下载完成: {file_path}")
return file_path
finally:
await browser.close()
except Exception as e:
logger.error(f"下载报表失败 (第 {attempt + 1}/{retry_count} 次): {e}")
if attempt < retry_count - 1:
wait_time = (attempt + 1) * 5
logger.info(f"等待 {wait_time} 秒后重试...")
await asyncio.sleep(wait_time)
continue
logger.error(f"下载报表最终失败 (重试 {retry_count} 次均失败)")
return None
async def _login(self, page):
"""登录 secsion.com"""
logger.info(f"打开登录页面: {self.LOGIN_URL}")
await page.goto(self.LOGIN_URL, timeout=30000)
# 选择角色 "店铺"
logger.info("选择角色: 店铺")
try:
await page.get_by_text("店铺", exact=True).click(timeout=10000)
except Exception:
await page.click("text=店铺", timeout=10000)
# 输入账号密码
logger.info(f"输入账号: {self.username}")
await page.get_by_placeholder("请输入用户名").fill(self.username)
await page.get_by_placeholder("请输入密码").fill(self.password)
# 勾选记住密码
if await page.get_by_text("记住密码").is_visible():
await page.get_by_text("记住密码").click()
# 点击登录
logger.info("点击登录按钮")
try:
await page.click("button:has-text('登录')", timeout=5000)
except Exception:
await page.click("button[type='submit']", timeout=5000)
# 等待跳转(Docker 中需要更长时间)
logger.info("等待登录跳转...")
await page.wait_for_url("**/homePage", timeout=30000)
logger.info("登录成功")
async def _export_report(self, page, start_date, end_date):
"""访问统计页面并导出报表"""
logger.info(f"访问统计页面: {self.STATS_URL}")
await page.goto(self.STATS_URL, timeout=30000)
await page.wait_for_load_state("networkidle", timeout=30000)
export_btn = page.get_by_role("button", name="导出报表")
await export_btn.wait_for(state="visible", timeout=30000)
logger.info(f"设置查询日期范围: {start_date} ~ {end_date}")
start_input = page.get_by_role("textbox", name="请选择日期").nth(0)
end_input = page.get_by_role("textbox", name="请选择日期").nth(1)
# 设置开始日期(内部已处理 Enter 确认 + Escape 关闭)
await self._set_date(page, start_input, start_date)
await page.wait_for_timeout(500)
# 设置结束日期
await self._set_date(page, end_input, end_date)
await page.wait_for_timeout(500)
# 验证日期设置结果
start_val = await start_input.input_value()
end_val = await end_input.input_value()
logger.info(f"日期设置结果: 开始={start_val}, 结束={end_val}")
# 等待数据请求完成 + 表格渲染(Docker 中增加等待时间)
logger.info("等待数据请求完成...")
await asyncio.sleep(3)
# 检查数据是否加载完成(等待loading消失或有实际数据)
try:
# 等待加载指示符消失或数据表格出现
await page.wait_for_function(
"""() => {
// 检查是否存在加载中的标志
const loading = document.querySelector('[class*="loading"]');
if (loading && loading.style.display !== 'none') return false;
// 检查是否有数据行
const rows = document.querySelectorAll('table tbody tr');
return rows.length > 0;
}""",
timeout=30000
)
logger.info("数据表格已加载")
except Exception as e:
logger.warning(f"表格加载检查失败: {e},继续执行...")
await asyncio.sleep(3)
# 如果配置了 shop_id,拦截导出请求注入 shop_id
if self.shop_id:
import json
async def inject_shop_id(route):
request = route.request
body = json.loads(request.post_data)
body['shop_id'] = self.shop_id
logger.info(f"注入 shop_id: {self.shop_id}")
await route.continue_(post_data=json.dumps(body))
await page.route('**/api/bill/export', inject_shop_id)
logger.info(f"已设置 shop_id 拦截: {self.shop_id}")
# 点击导出报表并捕获下载(Docker 中增加超时到300秒)
logger.info("点击导出报表...")
async with page.expect_download(timeout=300000) as download_info:
await export_btn.click()
logger.info("等待文件下载中...")
download = await download_info.value
filename = download.suggested_filename
save_path = os.path.join(self.download_dir, filename)
await download.save_as(save_path)
logger.info(f"报表已保存至: {save_path}")
return save_path
async def _set_date(self, page, input_box, date_str):
"""
设置 TDesign 日期选择器的值
TDesign 的 needconfirm="true" 模式要求:
1. 点击输入框打开日历
2. 点击日期格子选择日期
3. 在输入框上按 Enter 确认(关键!不确认则关闭时回滚)
4. Escape 关闭日历
"""
max_attempts = 5
for attempt in range(max_attempts):
try:
logger.info(f"设置日期: {date_str} (第 {attempt + 1}/{max_attempts} 次尝试)")
# 1. 点击输入框打开日历
await input_box.click()
await page.wait_for_timeout(800)
# 2. 点击目标日期格子
target_day = str(int(date_str.split("-")[2]))
day_cells = page.get_by_role("cell", name=target_day)
cell_count = await day_cells.count()
if cell_count > 0:
logger.debug(f"找到 {cell_count} 个日期格子,点击第一个")
await day_cells.first.click()
await page.wait_for_timeout(800)
else:
logger.warning(f"未找到日期格子: {target_day},重试...")
await page.keyboard.press("Escape")
await page.wait_for_timeout(500)
continue
# 3. Enter 确认(needconfirm="true" 必须显式确认)
await input_box.press("Enter")
await page.wait_for_timeout(800)
# 4. Escape 关闭日历
await page.keyboard.press("Escape")
await page.wait_for_timeout(800)
# 5. 验证
val = await input_box.input_value()
if date_str in val:
logger.info(f"日期设置成功: {val}")
return
logger.warning(f"日期设置验证失败: 期望包含 '{date_str}', 实际 '{val}',重试...")
await page.wait_for_timeout(500)
except Exception as e:
logger.warning(f"日期设置异常 (第 {attempt + 1}/{max_attempts} 次): {e}")
await page.keyboard.press("Escape")
await page.wait_for_timeout(500)
continue
logger.error(f"日期设置失败({max_attempts}次尝试后): {date_str}")
async def download_report(start_date, end_date, username=None, password=None, download_dir=None, shop_id=None):
"""
便捷函数:下载指定日期范围的报表
Args:
start_date: 开始日期 (YYYY-MM-DD)
end_date: 结束日期 (YYYY-MM-DD)
username: secsion.com 用户名(可选,优先使用 config)
password: secsion.com 密码(可选,优先使用 config)
download_dir: 下载目录(可选)
Returns:
str: 下载文件路径,失败返回 None
"""
if not username or not password:
from config import Config
creds = Config.get_secsion_credentials()
if not creds:
logger.error("未配置 secsion.com 登录凭据")
return None
username, password = creds
if not shop_id:
try:
from config import Config
shop_id = Config.get_shop_id()
except Exception:
pass
downloader = SecsionDownloader(username, password, download_dir, shop_id)
return await downloader.download_report(start_date, end_date)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('automation.log', encoding='utf-8')
]
)
parser = argparse.ArgumentParser(description='secsion.com 报表自动下载工具')
parser.add_argument('--start', type=str, help='开始日期 (YYYY-MM-DD)', default=datetime.now().strftime('%Y-%m-%d'))
parser.add_argument('--end', type=str, help='结束日期 (YYYY-MM-DD)')
parser.add_argument('--username', type=str, help='secsion.com 用户名')
parser.add_argument('--password', type=str, help='secsion.com 密码')
args = parser.parse_args()
end_date = args.end or args.start
result = asyncio.run(download_report(
start_date=args.start,
end_date=end_date,
username=args.username,
password=args.password
))
if result:
print(f"下载成功: {result}")
else:
print("下载失败")
exit(1)