Initial commit: Report automation script using Playwright

This commit is contained in:
2026-04-18 14:42:46 +08:00
commit 556fec602a
9 changed files with 411 additions and 0 deletions
+183
View File
@@ -0,0 +1,183 @@
import asyncio
import os
import logging
from datetime import datetime
from playwright.async_api import async_playwright
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('automation.log', encoding='utf-8')
]
)
logger = logging.getLogger(__name__)
class ReportAutomation:
def __init__(self):
self.secsion_login_url = "https://secsion.com:8000/login?redirect=%252Fhomepage"
self.secsion_stats_url = "https://secsion.com:8000/commodityStatistics"
self.upload_url = "https://sale.94kan.cn/"
self.username = "15682076681"
self.password = "123456"
self.target_date = "2026-04-18"
self.download_dir = os.path.join(os.getcwd(), "downloads")
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir)
async def run(self):
async with async_playwright() as p:
# 启动浏览器,忽略 HTTPS 错误
browser = await p.chromium.launch(headless=True) # 生产运行使用 Headless 模式
context = await browser.new_context(
ignore_https_errors=True,
viewport={'width': 1280, 'height': 800}
)
page = await context.new_page()
try:
# --- 任务 1: 登录 secsion.com 并导出报表 ---
await self.login_secsion(page)
file_path = await self.export_report(page)
if file_path:
# --- 任务 2: 上传到 94kan.cn ---
await self.upload_to_94kan(page, file_path)
else:
logger.error("未能获取下载文件路径,跳过上传任务。")
except Exception as e:
logger.error(f"发生错误: {str(e)}")
# 截图以供调试
await page.screenshot(path="error_screenshot.png")
finally:
await asyncio.sleep(5) # 留出观察时间
await browser.close()
async def login_secsion(self, page):
logger.info(f"正在打开登录页面: {self.secsion_login_url}")
await page.goto(self.secsion_login_url)
# 1. 选择角色 "店铺"
# 假设角色选择是一个下拉框或者一组按钮,根据描述,点击包含“店铺”的元素
logger.info("选择角色: 店铺")
try:
# 尝试多种定位方式以确保成功
shop_role = page.get_by_text("店铺", exact=True)
await shop_role.click()
except Exception:
logger.warning("直接点击'店铺'失败,尝试备用定位方案...")
await page.click("text=店铺")
# 2. 输入账号密码
logger.info(f"输入账号: {self.username}")
# 根据截图,placeholder 是 "请输入用户名"
await page.get_by_placeholder("请输入用户名").fill(self.username)
await page.get_by_placeholder("请输入密码").fill(self.password)
# 3. 勾选记住密码(可选)
if await page.get_by_text("记住密码").is_visible():
await page.get_by_text("记住密码").click()
# 4. 点击登录
logger.info("点击登录按钮")
# 使用更稳健的 CSS 选择器
try:
await page.click("button:has-text('登录')", timeout=5000)
except Exception:
logger.warning("通过文本点击失败,尝试通过 type='submit' 点击")
await page.click("button[type='submit']")
# 等待跳转,增加超时时间到 20s,因为后台可能慢
logger.info("等待登录跳转...")
# 修正 URL 大小写: homePage
await page.wait_for_url("**/homePage", timeout=20000)
logger.info("登录成功")
async def export_report(self, page):
logger.info(f"访问统计页面: {self.secsion_stats_url}")
await page.goto(self.secsion_stats_url)
# 等待页面加载完成,以“导出报表”按钮为准
logger.info("等待统计页面加载...")
export_btn = page.get_by_role("button", name="导出报表")
await export_btn.wait_for(state="visible", timeout=20000)
logger.info(f"设置查询日期: {self.target_date}")
# 尝试通过日期值定位输入框,或者直接点击导出(如果日期已正确)
# 根据截图,日期可能已经默认是今天
try:
# 尝试定位输入框并填入日期
# 如果 placeholder 不对,我们尝试寻找所有 input 并根据格式填充
inputs = page.locator("input")
count = await inputs.count()
logger.info(f"页面共有 {count} 个输入框")
# 寻找包含日期的输入框并填入
for i in range(count):
try:
val = await inputs.nth(i).input_value()
if "-" in val and len(val) >= 10: # 类似 2026-04-18
# 降低 fill 的超时,因为如果是 readonly 会很快报错
await inputs.nth(i).fill(self.target_date, timeout=2000)
await inputs.nth(i).press("Enter")
except Exception:
continue
except Exception as e:
logger.warning(f"填充日期时遇到问题 (可能已默认正确): {str(e)}")
# 点击导出报表并捕获下载
logger.info("点击导出报表...")
async with page.expect_download(timeout=60000) as download_info:
await export_btn.click()
download = await download_info.value
filename = f"commodity_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
save_path = os.path.join(self.download_dir, filename)
await download.save_as(save_path)
logger.info(f"报表已保存至: {save_path}")
return save_path
async def upload_to_94kan(self, page, file_path):
logger.info(f"正在打开上传页面: {self.upload_url}")
await page.goto(self.upload_url)
# 1. 点击上传按钮触发文件选择(如果需要点击的话)
# 或者直接找到隐藏的 input[type="file"]
logger.info(f"准备上传文件: {file_path}")
# 寻找上传 input
# sale.94kan.cn 页面通常有一个显眼的上传区域
try:
# 方案 A: 直接设置 input 文件(最可靠)
# 很多前端框架会隐藏真正的 input
await page.set_input_files("input[type='file']", file_path)
except Exception:
logger.info("未找到直接的 input[type='file'],尝试点击后上传...")
# 方案 B: 点击后再处理
async with page.expect_file_chooser() as fc_info:
await page.get_by_text("点击或拖拽文件至此").click()
file_chooser = await fc_info.value
await file_chooser.set_files(file_path)
# 2. 等待上传完成
logger.info("等待上传处理完成...")
# 根据截图,页面显示“正在上传并分析...”
try:
await page.wait_for_selector("text=上传并分析", state="visible", timeout=30000)
logger.info("上传成功,正在处理数据...")
except Exception:
logger.warning("未检测到 '上传并分析' 文本,可能已完成或文本不同")
# 截图保存结果
await page.screenshot(path="upload_result.png")
if __name__ == "__main__":
automation = ReportAutomation()
asyncio.run(automation.run())