""" secsion.com 自动化下载模块 使用 Playwright 登录 secsion.com,导出销售报表 """ import asyncio import os import logging import argparse from datetime import datetime from playwright.async_api import async_playwright logger = logging.getLogger(__name__) class SecsionDownloader: """从 secsion.com 自动下载销售报表""" LOGIN_URL = "https://secsion.com:8000/login?redirect=%252Fhomepage" STATS_URL = "https://secsion.com:8000/commodityStatistics" def __init__(self, username, password, download_dir=None, shop_id=None): self.username = username self.password = password self.shop_id = shop_id or '' self.download_dir = download_dir or os.path.join(os.getcwd(), "downloads") os.makedirs(self.download_dir, exist_ok=True) async def download_report(self, start_date, end_date): """ 下载指定日期范围的销售报表 Args: start_date: 开始日期 (YYYY-MM-DD) end_date: 结束日期 (YYYY-MM-DD) Returns: str: 下载文件的本地路径,失败返回 None """ logger.info(f"开始下载报表: {start_date} ~ {end_date}") async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( ignore_https_errors=True, viewport={'width': 1280, 'height': 800} ) page = await context.new_page() try: await self._login(page) file_path = await self._export_report(page, start_date, end_date) logger.info(f"报表下载完成: {file_path}") return file_path except Exception as e: logger.error(f"下载报表失败: {e}") return None finally: await browser.close() async def _login(self, page): """登录 secsion.com""" logger.info(f"打开登录页面: {self.LOGIN_URL}") await page.goto(self.LOGIN_URL) # 选择角色 "店铺" logger.info("选择角色: 店铺") try: await page.get_by_text("店铺", exact=True).click() except Exception: await page.click("text=店铺") # 输入账号密码 logger.info(f"输入账号: {self.username}") await page.get_by_placeholder("请输入用户名").fill(self.username) await page.get_by_placeholder("请输入密码").fill(self.password) # 勾选记住密码 if await page.get_by_text("记住密码").is_visible(): await page.get_by_text("记住密码").click() # 点击登录 logger.info("点击登录按钮") try: await page.click("button:has-text('登录')", timeout=5000) except Exception: await page.click("button[type='submit']") # 等待跳转 logger.info("等待登录跳转...") await page.wait_for_url("**/homePage", timeout=20000) logger.info("登录成功") async def _export_report(self, page, start_date, end_date): """访问统计页面并导出报表""" logger.info(f"访问统计页面: {self.STATS_URL}") await page.goto(self.STATS_URL) await page.wait_for_load_state("networkidle") export_btn = page.get_by_role("button", name="导出报表") await export_btn.wait_for(state="visible", timeout=20000) logger.info(f"设置查询日期范围: {start_date} ~ {end_date}") start_input = page.get_by_role("textbox", name="请选择日期").nth(0) end_input = page.get_by_role("textbox", name="请选择日期").nth(1) # 设置开始日期(内部已处理 Enter 确认 + Escape 关闭) await self._set_date(page, start_input, start_date) await page.wait_for_timeout(500) # 设置结束日期 await self._set_date(page, end_input, end_date) await page.wait_for_timeout(500) # 验证日期设置结果 start_val = await start_input.input_value() end_val = await end_input.input_value() logger.info(f"日期设置结果: 开始={start_val}, 结束={end_val}") # 等待数据请求完成 logger.info("等待数据请求完成...") await asyncio.sleep(3) # 如果配置了 shop_id,拦截导出请求注入 shop_id if self.shop_id: import json async def inject_shop_id(route): request = route.request body = json.loads(request.post_data) body['shop_id'] = self.shop_id logger.info(f"注入 shop_id: {self.shop_id}") await route.continue_(post_data=json.dumps(body)) await page.route('**/api/bill/export', inject_shop_id) logger.info(f"已设置 shop_id 拦截: {self.shop_id}") # 点击导出报表并捕获下载 logger.info("点击导出报表...") async with page.expect_download(timeout=60000) as download_info: await export_btn.click() download = await download_info.value filename = download.suggested_filename save_path = os.path.join(self.download_dir, filename) await download.save_as(save_path) logger.info(f"报表已保存至: {save_path}") return save_path async def _set_date(self, page, input_box, date_str): """ 设置 TDesign 日期选择器的值 TDesign 的 needconfirm="true" 模式要求: 1. 点击输入框打开日历 2. 点击日期格子选择日期 3. 在输入框上按 Enter 确认(关键!不确认则关闭时回滚) 4. Escape 关闭日历 """ for attempt in range(3): logger.info(f"设置日期: {date_str} (第 {attempt + 1} 次尝试)") # 1. 点击输入框打开日历 await input_box.click() await page.wait_for_timeout(500) # 2. 点击目标日期格子 target_day = str(int(date_str.split("-")[2])) day_cells = page.get_by_role("cell", name=target_day) cell_count = await day_cells.count() if cell_count > 0: await day_cells.first.click() await page.wait_for_timeout(500) else: logger.warning(f"未找到日期格子: {target_day}") continue # 3. Enter 确认(needconfirm="true" 必须显式确认) await input_box.press("Enter") await page.wait_for_timeout(500) # 4. Escape 关闭日历 await page.keyboard.press("Escape") await page.wait_for_timeout(500) # 5. 验证 val = await input_box.input_value() if date_str in val: logger.info(f"日期设置成功: {val}") return logger.warning(f"日期设置验证失败: 期望包含 '{date_str}', 实际 '{val}'") logger.error(f"日期设置失败(3次尝试后): {date_str}") async def download_report(start_date, end_date, username=None, password=None, download_dir=None, shop_id=None): """ 便捷函数:下载指定日期范围的报表 Args: start_date: 开始日期 (YYYY-MM-DD) end_date: 结束日期 (YYYY-MM-DD) username: secsion.com 用户名(可选,优先使用 config) password: secsion.com 密码(可选,优先使用 config) download_dir: 下载目录(可选) Returns: str: 下载文件路径,失败返回 None """ if not username or not password: from config import Config creds = Config.get_secsion_credentials() if not creds: logger.error("未配置 secsion.com 登录凭据") return None username, password = creds if not shop_id: try: from config import Config shop_id = Config.get_shop_id() except Exception: pass downloader = SecsionDownloader(username, password, download_dir, shop_id) return await downloader.download_report(start_date, end_date) if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('automation.log', encoding='utf-8') ] ) parser = argparse.ArgumentParser(description='secsion.com 报表自动下载工具') parser.add_argument('--start', type=str, help='开始日期 (YYYY-MM-DD)', default=datetime.now().strftime('%Y-%m-%d')) parser.add_argument('--end', type=str, help='结束日期 (YYYY-MM-DD)') parser.add_argument('--username', type=str, help='secsion.com 用户名') parser.add_argument('--password', type=str, help='secsion.com 密码') args = parser.parse_args() end_date = args.end or args.start result = asyncio.run(download_report( start_date=args.start, end_date=end_date, username=args.username, password=args.password )) if result: print(f"下载成功: {result}") else: print("下载失败") exit(1)