115 lines
3.8 KiB
Python
115 lines
3.8 KiB
Python
"""
|
|
丰享商家端数据抓取脚本
|
|
使用 Playwright 半自动登录 + requests 调用数据接口
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import requests
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
# ============ 配置 ============
|
|
LOGIN_URL = "https://fspass.szfx.com/ucenter/userlogin?platform=saas&redirect_url=https%3A%2F%2Ffs.szfx.com%2FMMS%23%2F&return_url=https%3A%2F%2Ffs.szfx.com%2Fsaasmerchant%2Fsetstoken"
|
|
DATA_API = "https://fs.szfx.com/saasmerchant/pcweb/order/quickpayorder/list"
|
|
MMS_BASE = "https://fs.szfx.com/MMS"
|
|
SHOP_ID = "20434543575189"
|
|
# ==============================
|
|
|
|
|
|
def login_and_get_cookies():
|
|
"""用 Playwright 打开登录页,等待用户手动登录,然后提取 Cookie"""
|
|
print("[1/3] 启动浏览器,请手动完成登录(包括验证码)...")
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=False, args=["--start-maximized"])
|
|
context = browser.new_context(no_viewport=True)
|
|
page = context.new_page()
|
|
|
|
page.goto(LOGIN_URL)
|
|
|
|
# 等待用户登录成功(检测页面跳转到 MMS 首页)
|
|
print("[2/3] 等待登录完成...(请在浏览器中输入账号密码并完成验证码)")
|
|
try:
|
|
page.wait_for_url("**/MMS**", timeout=300_000)
|
|
print(" 登录成功!正在提取 Cookie...")
|
|
except Exception:
|
|
print(" 等待超时(5分钟),请重新运行脚本。")
|
|
browser.close()
|
|
return None
|
|
|
|
# 确保访问过目标页面(触发必要的 Cookie 设置)
|
|
page.goto(MMS_BASE)
|
|
page.wait_for_load_state("networkidle")
|
|
time.sleep(2)
|
|
|
|
# 提取所有 Cookie
|
|
playwright_cookies = context.cookies()
|
|
cookies = {c["name"]: c["value"] for c in playwright_cookies}
|
|
|
|
print(f" 获取到 {len(cookies)} 个 Cookie")
|
|
browser.close()
|
|
return cookies
|
|
|
|
|
|
def fetch_order_list(cookies, page=1, page_size=20):
|
|
"""调用快捷买单订单列表接口"""
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
|
"Referer": "https://fs.szfx.com/MMS",
|
|
"Origin": "https://fs.szfx.com",
|
|
}
|
|
|
|
payload = {
|
|
"shopId": SHOP_ID,
|
|
"page": page,
|
|
"pageSize": page_size,
|
|
}
|
|
|
|
print(f"[3/3] 正在请求订单数据(第{page}页)...")
|
|
resp = requests.post(DATA_API, json=payload, headers=headers, cookies=cookies, timeout=30)
|
|
data = resp.json()
|
|
|
|
if data.get("errno") == 0:
|
|
print(" 请求成功!")
|
|
return data.get("data")
|
|
else:
|
|
print(f" 请求失败: errno={data.get('errno')}, errmsg={data.get('errmsg')}")
|
|
return None
|
|
|
|
|
|
def save_data(data, filename="order_data.json"):
|
|
"""保存数据到文件"""
|
|
with open(filename, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
print(f" 数据已保存到 {filename}")
|
|
|
|
|
|
def main():
|
|
# 第一步:登录获取 Cookie
|
|
cookies = login_and_get_cookies()
|
|
if not cookies:
|
|
return
|
|
|
|
# 保存 Cookie 以便下次复用
|
|
with open("cookies.json", "w", encoding="utf-8") as f:
|
|
json.dump(cookies, f, ensure_ascii=False, indent=2)
|
|
print(" Cookie 已保存到 cookies.json(下次可直接使用)\n")
|
|
|
|
# 第二步:获取订单数据
|
|
result = fetch_order_list(cookies)
|
|
if result:
|
|
save_data(result)
|
|
# 打印摘要
|
|
if isinstance(result, dict):
|
|
print(f"\n 数据摘要:")
|
|
for k, v in result.items():
|
|
if not isinstance(v, (list, dict)):
|
|
print(f" {k}: {v}")
|
|
elif isinstance(v, list):
|
|
print(f" {k}: 共 {len(v)} 条记录")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|