""" secsion.com 自动化下载模块 使用 Playwright 登录 secsion.com,导出销售报表 """ import asyncio import os import logging import argparse from datetime import datetime from playwright.async_api import async_playwright logger = logging.getLogger(__name__) class SecsionDownloader: """从 secsion.com 自动下载销售报表""" LOGIN_URL = "https://secsion.com:8000/login?redirect=%252Fhomepage" STATS_URL = "https://secsion.com:8000/commodityStatistics" def __init__(self, username, password, download_dir=None, shop_id=None): self.username = username self.password = password self.shop_id = shop_id or '' self.download_dir = download_dir or os.path.join(os.getcwd(), "downloads") os.makedirs(self.download_dir, exist_ok=True) async def download_report(self, start_date, end_date, retry_count=3): """ 下载指定日期范围的销售报表 Args: start_date: 开始日期 (YYYY-MM-DD) end_date: 结束日期 (YYYY-MM-DD) retry_count: 重试次数(默认3次) Returns: str: 下载文件的本地路径,失败返回 None """ for attempt in range(retry_count): try: logger.info(f"开始下载报表: {start_date} ~ {end_date} (第 {attempt + 1}/{retry_count} 次)") async with async_playwright() as p: # Docker 优化:添加 --disable-dev-shm-usage 避免共享内存不足 # 注意:不能使用 --single-process,它会破坏 Chromium 的下载机制 browser = await p.chromium.launch( headless=True, downloads_path=self.download_dir, args=[ "--disable-dev-shm-usage", "--disable-gpu", "--no-sandbox" ] ) context = await browser.new_context( ignore_https_errors=True, viewport={'width': 1280, 'height': 800}, accept_downloads=True ) page = await context.new_page() try: await self._login(page) file_path = await self._export_report(page, start_date, end_date) logger.info(f"报表下载完成: {file_path}") return file_path finally: await browser.close() except Exception as e: logger.error(f"下载报表失败 (第 {attempt + 1}/{retry_count} 次): {e}") if attempt < retry_count - 1: wait_time = (attempt + 1) * 5 logger.info(f"等待 {wait_time} 秒后重试...") await asyncio.sleep(wait_time) continue logger.error(f"下载报表最终失败 (重试 {retry_count} 次均失败)") return None async def _login(self, page): """登录 secsion.com""" logger.info(f"打开登录页面: {self.LOGIN_URL}") await page.goto(self.LOGIN_URL, timeout=30000) # 选择角色 "店铺" logger.info("选择角色: 店铺") try: await page.get_by_text("店铺", exact=True).click(timeout=10000) except Exception: await page.click("text=店铺", timeout=10000) # 输入账号密码 logger.info(f"输入账号: {self.username}") await page.get_by_placeholder("请输入用户名").fill(self.username) await page.get_by_placeholder("请输入密码").fill(self.password) # 勾选记住密码 if await page.get_by_text("记住密码").is_visible(): await page.get_by_text("记住密码").click() # 点击登录 logger.info("点击登录按钮") try: await page.click("button:has-text('登录')", timeout=5000) except Exception: await page.click("button[type='submit']", timeout=5000) # 等待跳转(Docker 中需要更长时间) logger.info("等待登录跳转...") await page.wait_for_url("**/homePage", timeout=30000) logger.info("登录成功") async def _export_report(self, page, start_date, end_date): """访问统计页面并导出报表""" logger.info(f"访问统计页面: {self.STATS_URL}") await page.goto(self.STATS_URL, timeout=30000) await page.wait_for_load_state("networkidle", timeout=30000) export_btn = page.get_by_role("button", name="导出报表") await export_btn.wait_for(state="visible", timeout=30000) logger.info(f"设置查询日期范围: {start_date} ~ {end_date}") start_input = page.get_by_role("textbox", name="请选择日期").nth(0) end_input = page.get_by_role("textbox", name="请选择日期").nth(1) # 设置开始日期(内部已处理 Enter 确认 + Escape 关闭) await self._set_date(page, start_input, start_date) await page.wait_for_timeout(500) # 设置结束日期 await self._set_date(page, end_input, end_date) await page.wait_for_timeout(500) # 验证日期设置结果 start_val = await start_input.input_value() end_val = await end_input.input_value() logger.info(f"日期设置结果: 开始={start_val}, 结束={end_val}") # 等待数据请求完成 + 表格渲染(Docker 中增加等待时间) logger.info("等待数据请求完成...") await asyncio.sleep(3) # 检查数据是否加载完成(等待loading消失或有实际数据) try: # 等待加载指示符消失或数据表格出现 await page.wait_for_function( """() => { // 检查是否存在加载中的标志 const loading = document.querySelector('[class*="loading"]'); if (loading && loading.style.display !== 'none') return false; // 检查是否有数据行 const rows = document.querySelectorAll('table tbody tr'); return rows.length > 0; }""", timeout=30000 ) logger.info("数据表格已加载") except Exception as e: logger.warning(f"表格加载检查失败: {e},继续执行...") await asyncio.sleep(3) # 如果配置了 shop_id,拦截导出请求注入 shop_id,并捕获服务端响应 import json export_response = {'status': None, 'body': None, 'content_type': None} if self.shop_id: async def inject_shop_id(route): request = route.request body = json.loads(request.post_data) body['shop_id'] = self.shop_id logger.info(f"注入 shop_id: {self.shop_id}") await route.continue_(post_data=json.dumps(body)) await page.route('**/api/bill/export', inject_shop_id) logger.info(f"已设置 shop_id 拦截: {self.shop_id}") # 捕获导出接口的响应(用于调试) async def on_response(response): if '/api/bill/export' in response.url: export_response['status'] = response.status export_response['content_type'] = response.headers.get('content-type', '') try: body = await response.text() export_response['body'] = body[:2000] if body else '' except Exception: export_response['body'] = '(binary or empty)' logger.info(f"导出接口响应: status={response.status}, content-type={export_response['content_type']}, body长度={len(export_response['body'] or '')}") page.on("response", on_response) # 记录下载目录现有文件(用于兜底检测) existing_files = set(os.listdir(self.download_dir)) if os.path.exists(self.download_dir) else set() # 点击导出报表并捕获下载 logger.info("点击导出报表...") download_timeout = 120000 # 2 分钟,给 SSL 绕过留足时间 try: async with page.expect_download(timeout=download_timeout) as download_info: await export_btn.click() logger.info("等待文件下载中...") # 点击导出后,可能弹出 SSL 证书过期拦截页面 await self._bypass_ssl_interstitial(page) download = await download_info.value filename = download.suggested_filename save_path = os.path.join(self.download_dir, filename) await download.save_as(save_path) logger.info(f"报表已保存至: {save_path}") return save_path except Exception as download_err: # Playwright download 事件未触发,尝试 SSL 绕过后再等 logger.warning(f"Playwright 下载事件捕获失败: {download_err}") # 二次尝试:可能 SSL 页面刚出现,再尝试绕过 bypassed = await self._bypass_ssl_interstitial(page) if bypassed: logger.info("SSL 拦截已绕过,等待下载...") try: async with page.expect_download(timeout=30000) as dl_info: pass download = await dl_info.value filename = download.suggested_filename save_path = os.path.join(self.download_dir, filename) await download.save_as(save_path) logger.info(f"SSL 绕过后下载成功: {save_path}") return save_path except Exception: logger.warning("SSL 绕过后仍未触发下载事件") logger.info("尝试文件系统兜底检测...") # 等待一小段时间让可能的下载完成 await asyncio.sleep(5) new_files = self._find_new_files(existing_files) if new_files: # 按修改时间取最新的 latest = max(new_files, key=lambda f: os.path.getmtime(os.path.join(self.download_dir, f))) save_path = os.path.join(self.download_dir, latest) logger.info(f"文件系统兜底检测到新文件: {save_path}") return save_path logger.warning("文件系统兜底检测也未发现新文件") # 保存调试截图 try: screenshot_path = os.path.join(self.download_dir, f"debug_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png") await page.screenshot(path=screenshot_path, full_page=True) logger.info(f"调试截图已保存: {screenshot_path}") except Exception as ss_err: logger.warning(f"截图保存失败: {ss_err}") # 打印捕获到的响应信息 if export_response['status']: logger.info(f"服务端实际响应: status={export_response['status']}, content-type={export_response['content_type']}") if export_response['body']: logger.info(f"响应内容(前500字): {export_response['body'][:500]}") else: logger.warning("未捕获到 /api/bill/export 响应,可能是请求被拦截或未发出") # 检查页面是否有错误提示 try: error_text = await page.evaluate("""() => { const msgs = document.querySelectorAll('.t-message--error, .t-notification--error, [class*="error"], .el-message--error'); return Array.from(msgs).map(el => el.textContent.trim()).filter(Boolean).join(' | '); }""") if error_text: logger.error(f"页面错误提示: {error_text}") except Exception: pass # 检查是否有新打开的标签页(某些网站通过 window.open 下载) try: pages = page.context.pages if len(pages) > 1: logger.info(f"检测到 {len(pages)} 个标签页,检查新标签页...") for p in pages[1:]: url = p.url logger.info(f"新标签页 URL: {url}") if url.startswith('blob:') or 'download' in url.lower() or 'export' in url.lower(): # 尝试从新标签页下载 try: async with p.expect_download(timeout=30000) as dl_info: pass download = await dl_info.value filename = download.suggested_filename save_path = os.path.join(self.download_dir, filename) await download.save_as(save_path) logger.info(f"从新标签页下载成功: {save_path}") return save_path except Exception: pass except Exception: pass raise def _find_new_files(self, existing_files): """检测下载目录中新增的文件""" if not os.path.exists(self.download_dir): return [] current_files = set(os.listdir(self.download_dir)) new_files = current_files - existing_files # 过滤掉临时文件和调试截图 return [f for f in new_files if not f.endswith(('.crdownload', '.tmp')) and not f.startswith('debug_')] async def _bypass_ssl_interstitial(self, page): """ 绕过 Chrome SSL 证书错误拦截页面 secsion.com 的导出下载链接 SSL 证书过期,Chrome 会弹 "您的连接不是私密连接" 警告页。点 "高级" → "继续前往"。 Returns: bool: 是否成功绕过(或无需绕过) """ try: await page.wait_for_timeout(2000) current_url = page.url logger.debug(f"SSL 绕过检查: 当前 URL={current_url}") # 检查是否在 SSL 错误页面 is_ssl_error_page = ( 'chrome-error' in current_url or 'security' in current_url.lower() or await page.evaluate( """() => { return document.querySelector('#details-button') !== null || document.querySelector('#proceed-link') !== null || document.body?.innerText?.includes('您的连接不是私密连接') || document.body?.innerText?.includes('NET::ERR_CERT'); }""" ) ) if not is_ssl_error_page: return False logger.info("检测到 SSL 证书错误拦截页面,尝试绕过...") # 点击 "高级" 按钮展开详情 details_btn = page.locator('#details-button') if await details_btn.count() > 0: await details_btn.click() await page.wait_for_timeout(500) logger.info("已点击「高级」") # 点击 "继续前往 xxx(不安全)" proceed_link = page.locator('#proceed-link') if await proceed_link.count() > 0: await proceed_link.click() await page.wait_for_timeout(2000) logger.info("已点击「继续前往(不安全)」,SSL 绕过成功") return True # 备选:中文按钮文字 unsafe_link = page.get_by_text('继续前往') if await unsafe_link.count() > 0: await unsafe_link.click() await page.wait_for_timeout(2000) logger.info("已点击「继续前往」,SSL 绕过成功") return True logger.warning("SSL 拦截页面检测到但未找到绕过按钮") return False except Exception as e: logger.warning(f"SSL 绕过检查异常: {e}") return False async def _set_date(self, page, input_box, date_str): """ 设置 TDesign 日期选择器的值 TDesign 的 needconfirm="true" 模式要求: 1. 点击输入框打开日历 2. 点击日期格子选择日期 3. 在输入框上按 Enter 确认(关键!不确认则关闭时回滚) 4. Escape 关闭日历 """ max_attempts = 5 for attempt in range(max_attempts): try: logger.info(f"设置日期: {date_str} (第 {attempt + 1}/{max_attempts} 次尝试)") # 1. 点击输入框打开日历 await input_box.click() await page.wait_for_timeout(800) # 2. 点击目标日期格子 target_day = str(int(date_str.split("-")[2])) day_cells = page.get_by_role("cell", name=target_day) cell_count = await day_cells.count() if cell_count > 0: logger.debug(f"找到 {cell_count} 个日期格子,点击第一个") await day_cells.first.click() await page.wait_for_timeout(800) else: logger.warning(f"未找到日期格子: {target_day},重试...") await page.keyboard.press("Escape") await page.wait_for_timeout(500) continue # 3. Enter 确认(needconfirm="true" 必须显式确认) await input_box.press("Enter") await page.wait_for_timeout(800) # 4. Escape 关闭日历 await page.keyboard.press("Escape") await page.wait_for_timeout(800) # 5. 验证 val = await input_box.input_value() if date_str in val: logger.info(f"日期设置成功: {val}") return logger.warning(f"日期设置验证失败: 期望包含 '{date_str}', 实际 '{val}',重试...") await page.wait_for_timeout(500) except Exception as e: logger.warning(f"日期设置异常 (第 {attempt + 1}/{max_attempts} 次): {e}") await page.keyboard.press("Escape") await page.wait_for_timeout(500) continue logger.error(f"日期设置失败({max_attempts}次尝试后): {date_str}") async def download_report(start_date, end_date, username=None, password=None, download_dir=None, shop_id=None): """ 便捷函数:下载指定日期范围的报表 Args: start_date: 开始日期 (YYYY-MM-DD) end_date: 结束日期 (YYYY-MM-DD) username: secsion.com 用户名(可选,优先使用 config) password: secsion.com 密码(可选,优先使用 config) download_dir: 下载目录(可选) Returns: str: 下载文件路径,失败返回 None """ if not username or not password: from config import Config creds = Config.get_secsion_credentials() if not creds: logger.error("未配置 secsion.com 登录凭据") return None username, password = creds if not shop_id: try: from config import Config shop_id = Config.get_shop_id() except Exception: pass downloader = SecsionDownloader(username, password, download_dir, shop_id) return await downloader.download_report(start_date, end_date) if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('automation.log', encoding='utf-8') ] ) parser = argparse.ArgumentParser(description='secsion.com 报表自动下载工具') parser.add_argument('--start', type=str, help='开始日期 (YYYY-MM-DD)', default=datetime.now().strftime('%Y-%m-%d')) parser.add_argument('--end', type=str, help='结束日期 (YYYY-MM-DD)') parser.add_argument('--username', type=str, help='secsion.com 用户名') parser.add_argument('--password', type=str, help='secsion.com 密码') args = parser.parse_args() end_date = args.end or args.start result = asyncio.run(download_report( start_date=args.start, end_date=end_date, username=args.username, password=args.password )) if result: print(f"下载成功: {result}") else: print("下载失败") exit(1)