import asyncio import os import logging import sys import argparse from datetime import datetime from playwright.async_api import async_playwright # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('automation.log', encoding='utf-8') ] ) logger = logging.getLogger(__name__) class ReportAutomation: def __init__(self, start_date=None, end_date=None): self.secsion_login_url = "https://secsion.com:8000/login?redirect=%252Fhomepage" self.secsion_stats_url = "https://secsion.com:8000/commodityStatistics" self.upload_url = "https://sale.94kan.cn/" self.username = "15682076681" self.password = "123456" # 使用传入的日期,如果没有则默认为今天 today = datetime.now().strftime('%Y-%m-%d') self.start_date = start_date or today self.end_date = end_date or start_date or today logger.info(f"任务初始化: 开始日期={self.start_date}, 结束日期={self.end_date}") self.download_dir = os.path.join(os.getcwd(), "downloads") if not os.path.exists(self.download_dir): os.makedirs(self.download_dir) async def run(self): async with async_playwright() as p: # 启动浏览器,忽略 HTTPS 错误 browser = await p.chromium.launch(headless=True) # 生产运行使用 Headless 模式 context = await browser.new_context( ignore_https_errors=True, viewport={'width': 1280, 'height': 800} ) page = await context.new_page() try: # --- 任务 1: 登录 secsion.com 并导出报表 --- await self.login_secsion(page) file_path = await self.export_report(page) if file_path: # --- 任务 2: 上传到 94kan.cn --- await self.upload_to_94kan(page, file_path) else: logger.error("未能获取下载文件路径,跳过上传任务。") except Exception as e: logger.error(f"发生错误: {str(e)}") finally: await browser.close() async def login_secsion(self, page): logger.info(f"正在打开登录页面: {self.secsion_login_url}") await page.goto(self.secsion_login_url) # 1. 选择角色 "店铺" # 假设角色选择是一个下拉框或者一组按钮,根据描述,点击包含“店铺”的元素 logger.info("选择角色: 店铺") try: # 尝试多种定位方式以确保成功 shop_role = page.get_by_text("店铺", exact=True) await shop_role.click() except Exception: logger.warning("直接点击'店铺'失败,尝试备用定位方案...") await page.click("text=店铺") # 2. 输入账号密码 logger.info(f"输入账号: {self.username}") # 根据截图,placeholder 是 "请输入用户名" await page.get_by_placeholder("请输入用户名").fill(self.username) await page.get_by_placeholder("请输入密码").fill(self.password) # 3. 勾选记住密码(可选) if await page.get_by_text("记住密码").is_visible(): await page.get_by_text("记住密码").click() # 4. 点击登录 logger.info("点击登录按钮") # 使用更稳健的 CSS 选择器 try: await page.click("button:has-text('登录')", timeout=5000) except Exception: logger.warning("通过文本点击失败,尝试通过 type='submit' 点击") await page.click("button[type='submit']") # 等待跳转,增加超时时间到 20s,因为后台可能慢 logger.info("等待登录跳转...") # 修正 URL 大小写: homePage await page.wait_for_url("**/homePage", timeout=20000) logger.info("登录成功") async def export_report(self, page): logger.info(f"访问统计页面: {self.secsion_stats_url}") await page.goto(self.secsion_stats_url) # 等待页面加载完成 logger.info("等待统计页面加载...") await page.wait_for_load_state("networkidle") export_btn = page.get_by_role("button", name="导出报表") await export_btn.wait_for(state="visible", timeout=20000) logger.info(f"设置查询日期范围: {self.start_date} 至 {self.end_date}") # 严格按照参考文件 secsion_export.py 的逻辑 try: # 两个日期输入框都叫“请选择日期”,用 nth 区分 start_input = page.get_by_role("textbox", name="请选择日期").nth(0) end_input = page.get_by_role("textbox", name="请选择日期").nth(1) async def set_date(input_box, date_str: str): logger.info(f"尝试设置日期: {date_str}") # 1. 点击输入框 await input_box.click() # 2. 先尝试直接填值 + Enter (参考文件逻辑) try: # 由于是 readonly,fill 可能会超时,这里设置较短超时 await input_box.fill(date_str, timeout=2000) await input_box.press("Enter") await page.wait_for_timeout(200) except Exception: logger.info("直接填值失败或超时,将尝试点击日历单元格") # 3. 若没有生效,则打开日历点击“日”单元格 (参考文件逻辑) val = await input_box.input_value() if val != date_str: logger.info(f"值未同步({val} != {date_str}),执行日历单元格点击") # 确保日历已弹出 await input_box.click() day = str(int(date_str.split("-")[2])) # 参考文件使用 cell 角色 # page.get_by_role("cell", name=day).click() # 考虑到可能有多个月份显示,取最后一个弹出的 await page.get_by_role("cell", name=day).last.click() await page.wait_for_timeout(500) await set_date(start_input, self.start_date) await set_date(end_input, self.end_date) # 等待数据请求完成 logger.info("等待数据请求完成...") await asyncio.sleep(2) except Exception as e: logger.error(f"日期选择逻辑执行失败: {str(e)}") raise e # 点击导出报表并捕获下载 logger.info("点击导出报表...") async with page.expect_download(timeout=60000) as download_info: await export_btn.click() download = await download_info.value # 使用服务器建议的文件名 filename = download.suggested_filename save_path = os.path.join(self.download_dir, filename) await download.save_as(save_path) logger.info(f"报表已保存至: {save_path}") return save_path async def upload_to_94kan(self, page, file_path): logger.info(f"正在打开上传页面: {self.upload_url}") await page.goto(self.upload_url) # 1. 点击上传按钮触发文件选择(如果需要点击的话) # 或者直接找到隐藏的 input[type="file"] logger.info(f"准备上传文件: {file_path}") # 寻找上传 input # sale.94kan.cn 页面通常有一个显眼的上传区域 try: # 方案 A: 直接设置 input 文件(最可靠) # 很多前端框架会隐藏真正的 input await page.set_input_files("input[type='file']", file_path) except Exception: logger.info("未找到直接的 input[type='file'],尝试点击后上传...") # 方案 B: 点击后再处理 async with page.expect_file_chooser() as fc_info: await page.get_by_text("点击或拖拽文件至此").click() file_chooser = await fc_info.value await file_chooser.set_files(file_path) # 2. 等待上传完成 logger.info("等待上传处理完成...") # 根据截图,页面显示“正在上传并分析...” try: await page.wait_for_selector("text=上传并分析", state="visible", timeout=30000) logger.info("上传成功,正在处理数据...") except Exception: logger.warning("未检测到 '上传并分析' 文本,可能已完成或文本不同") if __name__ == "__main__": parser = argparse.ArgumentParser(description='报表自动化导出上传工具') parser.add_argument('--start', type=str, help='开始日期 (格式: YYYY-MM-DD)') parser.add_argument('--end', type=str, help='结束日期 (格式: YYYY-MM-DD)') args = parser.parse_args() automation = ReportAutomation(start_date=args.start, end_date=args.end) asyncio.run(automation.run())