217 lines
9.1 KiB
Python
217 lines
9.1 KiB
Python
import asyncio
|
|
import os
|
|
import logging
|
|
import sys
|
|
import argparse
|
|
from datetime import datetime
|
|
from playwright.async_api import async_playwright
|
|
|
|
# 配置日志
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(),
|
|
logging.FileHandler('automation.log', encoding='utf-8')
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ReportAutomation:
|
|
def __init__(self, start_date=None, end_date=None):
|
|
self.secsion_login_url = "https://secsion.com:8000/login?redirect=%252Fhomepage"
|
|
self.secsion_stats_url = "https://secsion.com:8000/commodityStatistics"
|
|
self.upload_url = "https://sale.94kan.cn/"
|
|
self.username = "15682076681"
|
|
self.password = "123456"
|
|
|
|
# 使用传入的日期,如果没有则默认为今天
|
|
today = datetime.now().strftime('%Y-%m-%d')
|
|
self.start_date = start_date or today
|
|
self.end_date = end_date or start_date or today
|
|
|
|
logger.info(f"任务初始化: 开始日期={self.start_date}, 结束日期={self.end_date}")
|
|
|
|
self.download_dir = os.path.join(os.getcwd(), "downloads")
|
|
|
|
if not os.path.exists(self.download_dir):
|
|
os.makedirs(self.download_dir)
|
|
|
|
async def run(self):
|
|
async with async_playwright() as p:
|
|
# 启动浏览器,忽略 HTTPS 错误
|
|
browser = await p.chromium.launch(headless=True) # 生产运行使用 Headless 模式
|
|
context = await browser.new_context(
|
|
ignore_https_errors=True,
|
|
viewport={'width': 1280, 'height': 800}
|
|
)
|
|
page = await context.new_page()
|
|
|
|
try:
|
|
# --- 任务 1: 登录 secsion.com 并导出报表 ---
|
|
await self.login_secsion(page)
|
|
file_path = await self.export_report(page)
|
|
|
|
if file_path:
|
|
# --- 任务 2: 上传到 94kan.cn ---
|
|
await self.upload_to_94kan(page, file_path)
|
|
else:
|
|
logger.error("未能获取下载文件路径,跳过上传任务。")
|
|
|
|
except Exception as e:
|
|
logger.error(f"发生错误: {str(e)}")
|
|
finally:
|
|
await browser.close()
|
|
|
|
async def login_secsion(self, page):
|
|
logger.info(f"正在打开登录页面: {self.secsion_login_url}")
|
|
await page.goto(self.secsion_login_url)
|
|
|
|
# 1. 选择角色 "店铺"
|
|
# 假设角色选择是一个下拉框或者一组按钮,根据描述,点击包含“店铺”的元素
|
|
logger.info("选择角色: 店铺")
|
|
try:
|
|
# 尝试多种定位方式以确保成功
|
|
shop_role = page.get_by_text("店铺", exact=True)
|
|
await shop_role.click()
|
|
except Exception:
|
|
logger.warning("直接点击'店铺'失败,尝试备用定位方案...")
|
|
await page.click("text=店铺")
|
|
|
|
# 2. 输入账号密码
|
|
logger.info(f"输入账号: {self.username}")
|
|
# 根据截图,placeholder 是 "请输入用户名"
|
|
await page.get_by_placeholder("请输入用户名").fill(self.username)
|
|
await page.get_by_placeholder("请输入密码").fill(self.password)
|
|
|
|
# 3. 勾选记住密码(可选)
|
|
if await page.get_by_text("记住密码").is_visible():
|
|
await page.get_by_text("记住密码").click()
|
|
|
|
# 4. 点击登录
|
|
logger.info("点击登录按钮")
|
|
# 使用更稳健的 CSS 选择器
|
|
try:
|
|
await page.click("button:has-text('登录')", timeout=5000)
|
|
except Exception:
|
|
logger.warning("通过文本点击失败,尝试通过 type='submit' 点击")
|
|
await page.click("button[type='submit']")
|
|
|
|
# 等待跳转,增加超时时间到 20s,因为后台可能慢
|
|
logger.info("等待登录跳转...")
|
|
# 修正 URL 大小写: homePage
|
|
await page.wait_for_url("**/homePage", timeout=20000)
|
|
logger.info("登录成功")
|
|
|
|
async def export_report(self, page):
|
|
logger.info(f"访问统计页面: {self.secsion_stats_url}")
|
|
await page.goto(self.secsion_stats_url)
|
|
|
|
# 等待页面加载完成
|
|
logger.info("等待统计页面加载...")
|
|
await page.wait_for_load_state("networkidle")
|
|
export_btn = page.get_by_role("button", name="导出报表")
|
|
await export_btn.wait_for(state="visible", timeout=20000)
|
|
|
|
logger.info(f"设置查询日期范围: {self.start_date} 至 {self.end_date}")
|
|
|
|
# 严格按照参考文件 secsion_export.py 的逻辑
|
|
try:
|
|
# 两个日期输入框都叫“请选择日期”,用 nth 区分
|
|
start_input = page.get_by_role("textbox", name="请选择日期").nth(0)
|
|
end_input = page.get_by_role("textbox", name="请选择日期").nth(1)
|
|
|
|
async def set_date(input_box, date_str: str):
|
|
logger.info(f"尝试设置日期: {date_str}")
|
|
# 1. 点击输入框
|
|
await input_box.click()
|
|
|
|
# 2. 先尝试直接填值 + Enter (参考文件逻辑)
|
|
try:
|
|
# 由于是 readonly,fill 可能会超时,这里设置较短超时
|
|
await input_box.fill(date_str, timeout=2000)
|
|
await input_box.press("Enter")
|
|
await page.wait_for_timeout(200)
|
|
except Exception:
|
|
logger.info("直接填值失败或超时,将尝试点击日历单元格")
|
|
|
|
# 3. 若没有生效,则打开日历点击“日”单元格 (参考文件逻辑)
|
|
val = await input_box.input_value()
|
|
if val != date_str:
|
|
logger.info(f"值未同步({val} != {date_str}),执行日历单元格点击")
|
|
# 确保日历已弹出
|
|
await input_box.click()
|
|
day = str(int(date_str.split("-")[2]))
|
|
# 参考文件使用 cell 角色
|
|
# page.get_by_role("cell", name=day).click()
|
|
# 考虑到可能有多个月份显示,取最后一个弹出的
|
|
await page.get_by_role("cell", name=day).last.click()
|
|
await page.wait_for_timeout(500)
|
|
|
|
await set_date(start_input, self.start_date)
|
|
await set_date(end_input, self.end_date)
|
|
|
|
# 等待数据请求完成
|
|
logger.info("等待数据请求完成...")
|
|
await asyncio.sleep(2)
|
|
|
|
except Exception as e:
|
|
logger.error(f"日期选择逻辑执行失败: {str(e)}")
|
|
raise e
|
|
|
|
# 点击导出报表并捕获下载
|
|
logger.info("点击导出报表...")
|
|
async with page.expect_download(timeout=60000) as download_info:
|
|
await export_btn.click()
|
|
|
|
download = await download_info.value
|
|
# 使用服务器建议的文件名
|
|
filename = download.suggested_filename
|
|
save_path = os.path.join(self.download_dir, filename)
|
|
|
|
await download.save_as(save_path)
|
|
logger.info(f"报表已保存至: {save_path}")
|
|
return save_path
|
|
|
|
async def upload_to_94kan(self, page, file_path):
|
|
logger.info(f"正在打开上传页面: {self.upload_url}")
|
|
await page.goto(self.upload_url)
|
|
|
|
# 1. 点击上传按钮触发文件选择(如果需要点击的话)
|
|
# 或者直接找到隐藏的 input[type="file"]
|
|
logger.info(f"准备上传文件: {file_path}")
|
|
|
|
# 寻找上传 input
|
|
# sale.94kan.cn 页面通常有一个显眼的上传区域
|
|
try:
|
|
# 方案 A: 直接设置 input 文件(最可靠)
|
|
# 很多前端框架会隐藏真正的 input
|
|
await page.set_input_files("input[type='file']", file_path)
|
|
except Exception:
|
|
logger.info("未找到直接的 input[type='file'],尝试点击后上传...")
|
|
# 方案 B: 点击后再处理
|
|
async with page.expect_file_chooser() as fc_info:
|
|
await page.get_by_text("点击或拖拽文件至此").click()
|
|
file_chooser = await fc_info.value
|
|
await file_chooser.set_files(file_path)
|
|
|
|
# 2. 等待上传完成
|
|
logger.info("等待上传处理完成...")
|
|
# 根据截图,页面显示“正在上传并分析...”
|
|
try:
|
|
await page.wait_for_selector("text=上传并分析", state="visible", timeout=30000)
|
|
logger.info("上传成功,正在处理数据...")
|
|
except Exception:
|
|
logger.warning("未检测到 '上传并分析' 文本,可能已完成或文本不同")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description='报表自动化导出上传工具')
|
|
parser.add_argument('--start', type=str, help='开始日期 (格式: YYYY-MM-DD)')
|
|
parser.add_argument('--end', type=str, help='结束日期 (格式: YYYY-MM-DD)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
automation = ReportAutomation(start_date=args.start, end_date=args.end)
|
|
asyncio.run(automation.run())
|