#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ File: jd_wxShopGift.py(店铺特效关注有礼) Author: HarbourJ Date: 2022/8/8 19:52 TG: https://t.me/HarbourToulu TgChat: https://t.me/HarbourSailing cron: 1 1 1 1 1 1 new Env('店铺特效关注有礼'); ActivityEntry: https://lzkj-isv.isvjcloud.com/wxShopGift/activity?activityId=971e85d5dfd445e1acfc63bafffb8ecc 变量 export jd_wxShopGiftId="971e85d5dfd445e1axxxxxxxxxxxx" """ import time import requests import sys import re import os from datetime import datetime import json import random from urllib.parse import quote_plus, unquote_plus from functools import partial print = partial(print, flush=True) import warnings warnings.filterwarnings("ignore", category=DeprecationWarning) from jd_sign import * try: from jdCookie import get_cookies getCk = get_cookies() except: print("请先下载依赖脚本,\n下载链接: https://raw.githubusercontent.com/shufflewzc/faker2/main/jdCookie.py") sys.exit(3) redis_url = os.environ.get("redis_url") if os.environ.get("redis_url") else "172.17.0.1" redis_pwd = os.environ.get("redis_pwd") if os.environ.get("redis_pwd") else "" activityId = os.environ.get("jd_wxShopGiftId") if os.environ.get("jd_wxShopGiftId") else "" if not activityId: print("⚠️未发现有效活动变量,退出程序!") sys.exit() activityUrl = f"https://lzkj-isv.isvjcloud.com/wxShopGift/activity?activityId={activityId}" def redis_conn(): try: import redis try: pool = redis.ConnectionPool(host=redis_url, port=6379, decode_responses=True, socket_connect_timeout=5, password=redis_pwd) r = redis.Redis(connection_pool=pool) r.get('conn_test') print('✅redis连接成功') return r except: print("⚠️redis连接异常") except: print("⚠️缺少redis依赖,请运行pip3 install redis") sys.exit() def getToken(ck, r=None): host = f'{activityUrl.split("com/")[0]}com' try: # redis缓存Token 活动域名+pt_pin pt_pin = unquote_plus(re.compile(r'pt_pin=(.*?);').findall(ck)[0]) except: # redis缓存Token 活动域名+ck前7位(获取pin失败) pt_pin = ck[:8] if r is not None: Token = r.get(f'{activityUrl.split("https://")[1].split("-")[0]}_{pt_pin}') # print("Token过期时间", r.ttl(f'{activityUrl.split("https://")[1].split("-")[0]}_{pt_pin}')) if Token is not None: # print(f"♻️获取缓存Token->: {Token}") print(f"♻️获取缓存Token") return Token else: print("🈳去设置Token缓存") s.headers = { 'Connection': 'keep-alive', 'Accept-Encoding': 'gzip, deflate, br', 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'User-Agent': '', 'Cookie': ck, 'Host': 'api.m.jd.com', 'Referer': '', 'Accept-Language': 'zh-Hans-CN;q=1 en-CN;q=0.9', 'Accept': '*/*' } sign_txt = sign({"url": f"{host}", "id": ""}, 'isvObfuscator') # print(sign_txt) f = s.post('https://api.m.jd.com/client.action', verify=False, timeout=30) if f.status_code != 200: print(f.status_code) return else: if "参数异常" in f.text: return Token_new = f.json()['token'] # print(f"Token->: {Token_new}") if r.set(f'{activityUrl.split("https://")[1].split("-")[0]}_{pt_pin}', Token_new, ex=1800): print("✅Token缓存设置成功") else: print("❌Token缓存设置失败") return Token_new else: s.headers = { 'Connection': 'keep-alive', 'Accept-Encoding': 'gzip, deflate, br', 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'User-Agent': '', 'Cookie': ck, 'Host': 'api.m.jd.com', 'Referer': '', 'Accept-Language': 'zh-Hans-CN;q=1 en-CN;q=0.9', 'Accept': '*/*' } sign_txt = sign({"url": f"{host}", "id": ""}, 'isvObfuscator') # print(sign_txt) f = s.post('https://api.m.jd.com/client.action', verify=False, timeout=30) if f.status_code != 200: print(f.status_code) return else: if "参数异常" in f.text: return Token = f.json()['token'] print(f"Token->: {Token}") return Token def getJdTime(): jdTime = int(round(time.time() * 1000)) return jdTime def randomString(e, flag=False): t = "0123456789abcdef" if flag: t = t.upper() n = [random.choice(t) for _ in range(e)] return ''.join(n) def refresh_cookies(res): if res.cookies: cookies = res.cookies.get_dict() set_cookie = [(set_cookie + "=" + cookies[set_cookie]) for set_cookie in cookies] global activityCookie activityCookieMid = [i for i in activityCookie.split(';') if i != ''] for i in activityCookieMid: for x in set_cookie: if i.split('=')[0] == x.split('=')[0]: if i.split('=')[1] != x.split('=')[1]: activityCookieMid.remove(i) activityCookie = ''.join(sorted([(set_cookie + ";") for set_cookie in list(set(activityCookieMid + set_cookie))])) def getActivity(): url = f"https://lzkj-isv.isvjcloud.com/wxShopGift/activity?activityId={activityId}" headers = { 'Host': 'lzkj-isv.isvjcloud.com', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': ua, 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive' } response = requests.request("GET", url, headers=headers) if response.status_code == 200: if response.cookies: cookies = response.cookies.get_dict() set_cookies = [(set_cookie + "=" + cookies[set_cookie]) for set_cookie in cookies] set_cookie = ''.join(sorted([(set_cookie + ";") for set_cookie in set_cookies])) return set_cookie else: print(response.status_code) print("⚠️疑似ip黑了") sys.exit() def getSystemConfigForNew(): url = "https://lzkj-isv.isvjcloud.com/wxCommonInfo/getSystemConfigForNew" payload = f'activityId={activityId}&activityType=99' headers = { 'Host': 'lzkj-isv.isvjcloud.com', 'Accept': 'application/json', 'X-Requested-With': 'XMLHttpRequest', 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': 'https://lzkj-isv.isvjcloud.com', 'User-Agent': ua, 'Connection': 'keep-alive', 'Referer': activityUrl, 'Cookie': activityCookie } response = requests.request("POST", url, headers=headers, data=payload) refresh_cookies(response) def getSimpleActInfoVo(): url = "https://lzkj-isv.isvjcloud.com/customer/getSimpleActInfoVo" payload = f"activityId={activityId}" headers = { 'Host': 'lzkj-isv.isvjcloud.com', 'Accept': 'application/json', 'X-Requested-With': 'XMLHttpRequest', 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': 'https://lzkj-isv.isvjcloud.com', 'User-Agent': ua, 'Connection': 'keep-alive', 'Referer': activityUrl, 'Cookie': activityCookie } response = requests.request("POST", url, headers=headers, data=payload) refresh_cookies(response) res = response.json() if res['result']: return res['data'] def getMyPing(venderId): url = "https://lzkj-isv.isvjcloud.com/customer/getMyPing" payload = f"userId={venderId}&token={token}&fromType=APP_shopGift" headers = { 'Host': 'lzkj-isv.isvjcloud.com', 'Accept': 'application/json', 'X-Requested-With': 'XMLHttpRequest', 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': 'https://lzkj-isv.isvjcloud.com', 'User-Agent': ua, 'Connection': 'keep-alive', 'Referer': activityUrl, 'Cookie': activityCookie } response = requests.request("POST", url, headers=headers, data=payload) refresh_cookies(response) res = response.json() if res['result']: return res['data']['nickname'], res['data']['secretPin'] else: print(f"⚠️{res['errorMessage']}") def accessLogWithAD(venderId, pin): url = "https://lzkj-isv.isvjcloud.com/common/accessLogWithAD" payload = f"venderId={venderId}&code=24&pin={quote_plus(pin)}&activityId={activityId}&pageUrl={quote_plus(activityUrl)}&subType=app&adSource=" headers = { 'Host': 'lzkj-isv.isvjcloud.com', 'Accept': 'application/json', 'X-Requested-With': 'XMLHttpRequest', 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': 'https://lzkj-isv.isvjcloud.com', 'User-Agent': ua, 'Connection': 'keep-alive', 'Referer': activityUrl, 'Cookie': activityCookie } response = requests.request("POST", url, headers=headers, data=payload) refresh_cookies(response) def activityContent(pin): url = "https://lzkj-isv.isvjcloud.com/wxShopGift/activityContent" payload = f"activityId={activityId}&buyerPin={quote_plus(pin)}" headers = { 'Host': 'lzkj-isv.isvjcloud.com', 'Accept': 'application/json', 'X-Requested-With': 'XMLHttpRequest', 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': 'https://lzkj-isv.isvjcloud.com', 'User-Agent': ua, 'Connection': 'keep-alive', 'Referer': activityUrl, 'Cookie': f'IsvToken={token};{activityCookie}' } response = requests.request("POST", url, headers=headers, data=payload) refresh_cookies(response) res = response.json() act_label = True reward = '' if res['result']: endTime = res['data']['endTime'] list = res['data']['list'] if getJdTime() > endTime: print("⛈活动已结束,下次早点来~") sys.exit() if len(list) == 0: print("礼品已领完") act_label = False return act_label for r in list: reward += str(r['takeNum']) + r['type'] + '' if len(reward) > 0: reward = reward.replace('jd', '京豆').replace('jf', '积分').replace('dq', '东券') else: print(f"⛈{res['errorMessage']}") sys.exit() return reward, act_label def draw(pin, nickname, reward): url = "https://lzkj-isv.isvjcloud.com/wxShopGift/draw" payload = f"activityId={activityId}&buyerPin={quote_plus(pin)}&hasFollow=false&accessType=app" headers = { 'Host': 'lzkj-isv.isvjcloud.com', 'Accept': 'application/json', 'X-Requested-With': 'XMLHttpRequest', 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': 'https://lzkj-isv.isvjcloud.com', 'User-Agent': ua, 'Connection': 'keep-alive', 'Referer': activityUrl, 'Cookie': f'IsvToken={token};{activityCookie}' } response = requests.request("POST", url, headers=headers, data=payload) res = response.json() if res['result']: print(f"🎉🎉🎉{nickname} 成功领取 {reward}") else: print(f"⛈⛈⛈{nickname} {res['errorMessage']}") def attendLog(venderId, pin): url = "https://lzkj-isv.isvjcloud.com/common/attendLog" payload = f"venderId={venderId}&activityType=24&activityId={activityId}&pin={quote_plus(pin)}&clientType=app" headers = { 'Host': 'lzkj-isv.isvjcloud.com', 'Accept': 'application/json', 'X-Requested-With': 'XMLHttpRequest', 'Accept-Language': 'zh-CN,zh-Hans;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': 'https://lzkj-isv.isvjcloud.com', 'User-Agent': ua, 'Connection': 'keep-alive', 'Referer': activityUrl, 'Cookie': activityCookie } requests.request("POST", url, headers=headers, data=payload) if __name__ == '__main__': r = redis_conn() try: cks = getCk if not cks: sys.exit() except: print("未获取到有效COOKIE,退出程序!") sys.exit() num = 0 for cookie in cks[:]: num += 1 if num % 9 == 0: print("⏰等待8s,休息一下") time.sleep(8) global ua, activityCookie, token ua = userAgent() try: pt_pin = re.compile(r'pt_pin=(.*?);').findall(cookie)[0] pt_pin = unquote_plus(pt_pin) except IndexError: pt_pin = f'用户{num}' print(f'\n******开始【京东账号{num}】{pt_pin} *********\n') print(datetime.now()) token = getToken(cookie, r) if token is None: print(f"⚠️获取Token失败!⏰等待2s") time.sleep(2) continue time.sleep(0.5) activityCookie = getActivity() time.sleep(0.5) getSystemConfigForNew() time.sleep(0.3) getSimAct = getSimpleActInfoVo() venderId = getSimAct['venderId'] time.sleep(0.2) getPin = getMyPing(venderId) if getPin is not None: nickname = getPin[0] secretPin = getPin[1] time.sleep(0.3) accessLogWithAD(venderId, secretPin) time.sleep(0.5) actCon = activityContent(secretPin) if not actCon: continue if not actCon[1]: continue reward = actCon[0] time.sleep(0.8) draw(secretPin, nickname, reward) time.sleep(5)