Initial commit: Fengxiang order monitor with WeChat & Xiaomi speaker push

This commit is contained in:
houhuan
2026-04-30 14:25:34 +08:00
commit 1bb7bba970
10 changed files with 970 additions and 0 deletions
+114
View File
@@ -0,0 +1,114 @@
"""
丰享商家端数据抓取脚本
使用 Playwright 半自动登录 + requests 调用数据接口
"""
import json
import time
import requests
from playwright.sync_api import sync_playwright
# ============ 配置 ============
LOGIN_URL = "https://fspass.szfx.com/ucenter/userlogin?platform=saas&redirect_url=https%3A%2F%2Ffs.szfx.com%2FMMS%23%2F&return_url=https%3A%2F%2Ffs.szfx.com%2Fsaasmerchant%2Fsetstoken"
DATA_API = "https://fs.szfx.com/saasmerchant/pcweb/order/quickpayorder/list"
MMS_BASE = "https://fs.szfx.com/MMS"
SHOP_ID = "20434543575189"
# ==============================
def login_and_get_cookies():
"""用 Playwright 打开登录页,等待用户手动登录,然后提取 Cookie"""
print("[1/3] 启动浏览器,请手动完成登录(包括验证码)...")
with sync_playwright() as p:
browser = p.chromium.launch(headless=False, args=["--start-maximized"])
context = browser.new_context(no_viewport=True)
page = context.new_page()
page.goto(LOGIN_URL)
# 等待用户登录成功(检测页面跳转到 MMS 首页)
print("[2/3] 等待登录完成...(请在浏览器中输入账号密码并完成验证码)")
try:
page.wait_for_url("**/MMS**", timeout=300_000)
print(" 登录成功!正在提取 Cookie...")
except Exception:
print(" 等待超时(5分钟),请重新运行脚本。")
browser.close()
return None
# 确保访问过目标页面(触发必要的 Cookie 设置)
page.goto(MMS_BASE)
page.wait_for_load_state("networkidle")
time.sleep(2)
# 提取所有 Cookie
playwright_cookies = context.cookies()
cookies = {c["name"]: c["value"] for c in playwright_cookies}
print(f" 获取到 {len(cookies)} 个 Cookie")
browser.close()
return cookies
def fetch_order_list(cookies, page=1, page_size=20):
"""调用快捷买单订单列表接口"""
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Referer": "https://fs.szfx.com/MMS",
"Origin": "https://fs.szfx.com",
}
payload = {
"shopId": SHOP_ID,
"page": page,
"pageSize": page_size,
}
print(f"[3/3] 正在请求订单数据(第{page}页)...")
resp = requests.post(DATA_API, json=payload, headers=headers, cookies=cookies, timeout=30)
data = resp.json()
if data.get("errno") == 0:
print(" 请求成功!")
return data.get("data")
else:
print(f" 请求失败: errno={data.get('errno')}, errmsg={data.get('errmsg')}")
return None
def save_data(data, filename="order_data.json"):
"""保存数据到文件"""
with open(filename, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f" 数据已保存到 {filename}")
def main():
# 第一步:登录获取 Cookie
cookies = login_and_get_cookies()
if not cookies:
return
# 保存 Cookie 以便下次复用
with open("cookies.json", "w", encoding="utf-8") as f:
json.dump(cookies, f, ensure_ascii=False, indent=2)
print(" Cookie 已保存到 cookies.json(下次可直接使用)\n")
# 第二步:获取订单数据
result = fetch_order_list(cookies)
if result:
save_data(result)
# 打印摘要
if isinstance(result, dict):
print(f"\n 数据摘要:")
for k, v in result.items():
if not isinstance(v, (list, dict)):
print(f" {k}: {v}")
elif isinstance(v, list):
print(f" {k}: 共 {len(v)} 条记录")
if __name__ == "__main__":
main()