Files
yixuan_sale/main.py
T

227 lines
9.6 KiB
Python

import asyncio
import os
import logging
import sys
import argparse
from datetime import datetime
from playwright.async_api import async_playwright
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('automation.log', encoding='utf-8')
]
)
logger = logging.getLogger(__name__)
class ReportAutomation:
def __init__(self, start_date=None, end_date=None):
self.secsion_login_url = "https://secsion.com:8000/login?redirect=%252Fhomepage"
self.secsion_stats_url = "https://secsion.com:8000/commodityStatistics"
self.upload_url = "https://sale.94kan.cn/"
self.username = "15682076681"
self.password = "123456"
# 使用传入的日期,如果没有则默认为今天
today = datetime.now().strftime('%Y-%m-%d')
self.start_date = start_date or today
self.end_date = end_date or start_date or today
logger.info(f"任务初始化: 开始日期={self.start_date}, 结束日期={self.end_date}")
self.download_dir = os.path.join(os.getcwd(), "downloads")
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir)
async def run(self):
async with async_playwright() as p:
# 启动浏览器,忽略 HTTPS 错误
browser = await p.chromium.launch(headless=True) # 生产运行使用 Headless 模式
context = await browser.new_context(
ignore_https_errors=True,
viewport={'width': 1280, 'height': 800}
)
page = await context.new_page()
try:
# --- 任务 1: 登录 secsion.com 并导出报表 ---
await self.login_secsion(page)
file_path = await self.export_report(page)
if file_path:
# --- 任务 2: 上传到 94kan.cn ---
await self.upload_to_94kan(page, file_path)
else:
logger.error("未能获取下载文件路径,跳过上传任务。")
except Exception as e:
logger.error(f"发生错误: {str(e)}")
# 截图以供调试
await page.screenshot(path="error_screenshot.png")
finally:
await asyncio.sleep(5) # 留出观察时间
await browser.close()
async def login_secsion(self, page):
logger.info(f"正在打开登录页面: {self.secsion_login_url}")
await page.goto(self.secsion_login_url)
# 1. 选择角色 "店铺"
# 假设角色选择是一个下拉框或者一组按钮,根据描述,点击包含“店铺”的元素
logger.info("选择角色: 店铺")
try:
# 尝试多种定位方式以确保成功
shop_role = page.get_by_text("店铺", exact=True)
await shop_role.click()
except Exception:
logger.warning("直接点击'店铺'失败,尝试备用定位方案...")
await page.click("text=店铺")
# 2. 输入账号密码
logger.info(f"输入账号: {self.username}")
# 根据截图,placeholder 是 "请输入用户名"
await page.get_by_placeholder("请输入用户名").fill(self.username)
await page.get_by_placeholder("请输入密码").fill(self.password)
# 3. 勾选记住密码(可选)
if await page.get_by_text("记住密码").is_visible():
await page.get_by_text("记住密码").click()
# 4. 点击登录
logger.info("点击登录按钮")
# 使用更稳健的 CSS 选择器
try:
await page.click("button:has-text('登录')", timeout=5000)
except Exception:
logger.warning("通过文本点击失败,尝试通过 type='submit' 点击")
await page.click("button[type='submit']")
# 等待跳转,增加超时时间到 20s,因为后台可能慢
logger.info("等待登录跳转...")
# 修正 URL 大小写: homePage
await page.wait_for_url("**/homePage", timeout=20000)
logger.info("登录成功")
async def export_report(self, page):
logger.info(f"访问统计页面: {self.secsion_stats_url}")
await page.goto(self.secsion_stats_url)
# 等待页面加载完成
logger.info("等待统计页面加载...")
await page.wait_for_load_state("networkidle")
export_btn = page.get_by_role("button", name="导出报表")
await export_btn.wait_for(state="visible", timeout=20000)
logger.info(f"设置查询日期范围: {self.start_date}{self.end_date}")
# 严格按照参考文件 secsion_export.py 的逻辑
try:
# 两个日期输入框都叫“请选择日期”,用 nth 区分
start_input = page.get_by_role("textbox", name="请选择日期").nth(0)
end_input = page.get_by_role("textbox", name="请选择日期").nth(1)
async def set_date(input_box, date_str: str):
logger.info(f"尝试设置日期: {date_str}")
# 1. 点击输入框
await input_box.click()
# 2. 先尝试直接填值 + Enter (参考文件逻辑)
try:
# 由于是 readonly,fill 可能会超时,这里设置较短超时
await input_box.fill(date_str, timeout=2000)
await input_box.press("Enter")
await page.wait_for_timeout(200)
except Exception:
logger.info("直接填值失败或超时,将尝试点击日历单元格")
# 3. 若没有生效,则打开日历点击“日”单元格 (参考文件逻辑)
val = await input_box.input_value()
if val != date_str:
logger.info(f"值未同步({val} != {date_str}),执行日历单元格点击")
# 确保日历已弹出
await input_box.click()
day = str(int(date_str.split("-")[2]))
# 参考文件使用 cell 角色
# page.get_by_role("cell", name=day).click()
# 考虑到可能有多个月份显示,取最后一个弹出的
await page.get_by_role("cell", name=day).last.click()
await page.wait_for_timeout(500)
await set_date(start_input, self.start_date)
await set_date(end_input, self.end_date)
# 等待数据请求完成
logger.info("等待数据请求完成...")
await asyncio.sleep(2)
# 截图确认日期设置后的状态
logger.info("保存日期设置确认截图: date_setting_check.png")
await page.screenshot(path="date_setting_check.png")
except Exception as e:
logger.error(f"日期选择逻辑执行失败: {str(e)}")
await page.screenshot(path="date_error.png")
raise e
# 点击导出报表并捕获下载
logger.info("点击导出报表...")
async with page.expect_download(timeout=60000) as download_info:
await export_btn.click()
download = await download_info.value
# 使用服务器建议的文件名
filename = download.suggested_filename
save_path = os.path.join(self.download_dir, filename)
await download.save_as(save_path)
logger.info(f"报表已保存至: {save_path}")
return save_path
async def upload_to_94kan(self, page, file_path):
logger.info(f"正在打开上传页面: {self.upload_url}")
await page.goto(self.upload_url)
# 1. 点击上传按钮触发文件选择(如果需要点击的话)
# 或者直接找到隐藏的 input[type="file"]
logger.info(f"准备上传文件: {file_path}")
# 寻找上传 input
# sale.94kan.cn 页面通常有一个显眼的上传区域
try:
# 方案 A: 直接设置 input 文件(最可靠)
# 很多前端框架会隐藏真正的 input
await page.set_input_files("input[type='file']", file_path)
except Exception:
logger.info("未找到直接的 input[type='file'],尝试点击后上传...")
# 方案 B: 点击后再处理
async with page.expect_file_chooser() as fc_info:
await page.get_by_text("点击或拖拽文件至此").click()
file_chooser = await fc_info.value
await file_chooser.set_files(file_path)
# 2. 等待上传完成
logger.info("等待上传处理完成...")
# 根据截图,页面显示“正在上传并分析...”
try:
await page.wait_for_selector("text=上传并分析", state="visible", timeout=30000)
logger.info("上传成功,正在处理数据...")
except Exception:
logger.warning("未检测到 '上传并分析' 文本,可能已完成或文本不同")
# 截图保存结果
await page.screenshot(path="upload_result.png")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='报表自动化导出上传工具')
parser.add_argument('--start', type=str, help='开始日期 (格式: YYYY-MM-DD)')
parser.add_argument('--end', type=str, help='结束日期 (格式: YYYY-MM-DD)')
args = parser.parse_args()
automation = ReportAutomation(start_date=args.start, end_date=args.end)
asyncio.run(automation.run())