import asyncio import os import logging import sys import argparse from datetime import datetime from playwright.async_api import async_playwright # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('automation.log', encoding='utf-8') ] ) logger = logging.getLogger(__name__) class ReportAutomation: def __init__(self, start_date=None, end_date=None): self.secsion_login_url = "https://secsion.com:8000/login?redirect=%252Fhomepage" self.secsion_stats_url = "https://secsion.com:8000/commodityStatistics" self.upload_url = "https://sale.94kan.cn/" self.username = "15682076681" self.password = "123456" # 使用传入的日期,如果没有则默认为今天 today = datetime.now().strftime('%Y-%m-%d') self.start_date = start_date or today self.end_date = end_date or start_date or today logger.info(f"任务初始化: 开始日期={self.start_date}, 结束日期={self.end_date}") self.download_dir = os.path.join(os.getcwd(), "downloads") if not os.path.exists(self.download_dir): os.makedirs(self.download_dir) async def run(self): async with async_playwright() as p: # 启动浏览器,忽略 HTTPS 错误 browser = await p.chromium.launch(headless=True) # 生产运行使用 Headless 模式 context = await browser.new_context( ignore_https_errors=True, viewport={'width': 1280, 'height': 800} ) page = await context.new_page() try: # --- 任务 1: 登录 secsion.com 并导出报表 --- await self.login_secsion(page) file_path = await self.export_report(page) if file_path: # --- 任务 2: 上传到 94kan.cn --- await self.upload_to_94kan(page, file_path) else: logger.error("未能获取下载文件路径,跳过上传任务。") except Exception as e: logger.error(f"发生错误: {str(e)}") # 截图以供调试 await page.screenshot(path="error_screenshot.png") finally: await asyncio.sleep(5) # 留出观察时间 await browser.close() async def login_secsion(self, page): logger.info(f"正在打开登录页面: {self.secsion_login_url}") await page.goto(self.secsion_login_url) # 1. 选择角色 "店铺" # 假设角色选择是一个下拉框或者一组按钮,根据描述,点击包含“店铺”的元素 logger.info("选择角色: 店铺") try: # 尝试多种定位方式以确保成功 shop_role = page.get_by_text("店铺", exact=True) await shop_role.click() except Exception: logger.warning("直接点击'店铺'失败,尝试备用定位方案...") await page.click("text=店铺") # 2. 输入账号密码 logger.info(f"输入账号: {self.username}") # 根据截图,placeholder 是 "请输入用户名" await page.get_by_placeholder("请输入用户名").fill(self.username) await page.get_by_placeholder("请输入密码").fill(self.password) # 3. 勾选记住密码(可选) if await page.get_by_text("记住密码").is_visible(): await page.get_by_text("记住密码").click() # 4. 点击登录 logger.info("点击登录按钮") # 使用更稳健的 CSS 选择器 try: await page.click("button:has-text('登录')", timeout=5000) except Exception: logger.warning("通过文本点击失败,尝试通过 type='submit' 点击") await page.click("button[type='submit']") # 等待跳转,增加超时时间到 20s,因为后台可能慢 logger.info("等待登录跳转...") # 修正 URL 大小写: homePage await page.wait_for_url("**/homePage", timeout=20000) logger.info("登录成功") async def export_report(self, page): logger.info(f"访问统计页面: {self.secsion_stats_url}") await page.goto(self.secsion_stats_url) # 等待页面加载完成,以“导出报表”按钮为准 logger.info("等待统计页面加载...") export_btn = page.get_by_role("button", name="导出报表") await export_btn.wait_for(state="visible", timeout=20000) logger.info(f"设置查询日期范围: {self.start_date} 至 {self.end_date}") try: # 找到所有的日期输入框 inputs = page.locator("input") count = await inputs.count() date_input_indices = [] for i in range(count): val = await inputs.nth(i).input_value() if "-" in val and len(val) >= 10: date_input_indices.append(i) if len(date_input_indices) >= 2: # 填入开始日期 await inputs.nth(date_input_indices[0]).click() await inputs.nth(date_input_indices[0]).fill(self.start_date) await inputs.nth(date_input_indices[0]).press("Enter") # 填入结束日期 await inputs.nth(date_input_indices[1]).click() await inputs.nth(date_input_indices[1]).fill(self.end_date) await inputs.nth(date_input_indices[1]).press("Enter") else: logger.warning("未找到足够的日期输入框,尝试使用默认日期导出") except Exception as e: logger.warning(f"填充日期时遇到问题: {str(e)}") # 点击导出报表并捕获下载 logger.info("点击导出报表...") async with page.expect_download(timeout=60000) as download_info: await export_btn.click() download = await download_info.value filename = f"commodity_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" save_path = os.path.join(self.download_dir, filename) await download.save_as(save_path) logger.info(f"报表已保存至: {save_path}") return save_path async def upload_to_94kan(self, page, file_path): logger.info(f"正在打开上传页面: {self.upload_url}") await page.goto(self.upload_url) # 1. 点击上传按钮触发文件选择(如果需要点击的话) # 或者直接找到隐藏的 input[type="file"] logger.info(f"准备上传文件: {file_path}") # 寻找上传 input # sale.94kan.cn 页面通常有一个显眼的上传区域 try: # 方案 A: 直接设置 input 文件(最可靠) # 很多前端框架会隐藏真正的 input await page.set_input_files("input[type='file']", file_path) except Exception: logger.info("未找到直接的 input[type='file'],尝试点击后上传...") # 方案 B: 点击后再处理 async with page.expect_file_chooser() as fc_info: await page.get_by_text("点击或拖拽文件至此").click() file_chooser = await fc_info.value await file_chooser.set_files(file_path) # 2. 等待上传完成 logger.info("等待上传处理完成...") # 根据截图,页面显示“正在上传并分析...” try: await page.wait_for_selector("text=上传并分析", state="visible", timeout=30000) logger.info("上传成功,正在处理数据...") except Exception: logger.warning("未检测到 '上传并分析' 文本,可能已完成或文本不同") # 截图保存结果 await page.screenshot(path="upload_result.png") if __name__ == "__main__": parser = argparse.ArgumentParser(description='报表自动化导出上传工具') parser.add_argument('--start', type=str, help='开始日期 (格式: YYYY-MM-DD)') parser.add_argument('--end', type=str, help='结束日期 (格式: YYYY-MM-DD)') args = parser.parse_args() automation = ReportAutomation(start_date=args.start, end_date=args.end) asyncio.run(automation.run())