[Python] 纯文本查看 复制代码
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5
import base64
import binascii
import random
import string
import requests
import hashlib
import time
import os
import re
import json
import ddddocr
os.chdir(os.path.dirname(os.path.abspath(__file__)))
class TongHuaShun:
def __init__(self):
self.cookies = {
'PHPSESSID': 'h8bshbcd7th1db57jv68fkgv5egmccbs',
'ta_random_userid': 'wn37oo0s6d',
'u_ukey': 'A10702B8689642C6BE607730E11E6E4A',
'u_uver': '1.0.0',
'u_dpass': '%2FAb67r3hBk7KrlRaNRP8Fo8nOcW4o2xHRZTKixnRsSR4tV%2Fe%2FNt%2FB5gJaUXHqdv0Hi80LrSsTFH9a%2B6rtRvqGg%3D%3D',
'u_did': '0958F6AFE0D343DBA6DB466F77465C3C',
'u_ttype': 'WEB',
'Hm_lvt_722143063e4892925903024537075d0d': '1752116124',
'Hm_lpvt_722143063e4892925903024537075d0d': '1752116124',
'HMACCOUNT': '9D394D4BCA744D57',
'Hm_lvt_929f8b362150b1f77b477230541dbbc2': '1752116124',
'Hm_lpvt_929f8b362150b1f77b477230541dbbc2': '1752116124',
'Hm_lvt_78c58f01938e4d85eaf619eae71b4ed1': '1752116124',
'Hm_lpvt_78c58f01938e4d85eaf619eae71b4ed1': '1752116124',
'ttype': 'WEB',
'v': 'Ayfk6qtnA0xHaYfA_cFMatawsFD0rPmWVYZ_jPmdQ3Pcb0kOAXyL3mVQD1QK',
}
self.headers = {
'accept': 'application/json, text/javascript, */*; q=0.01',
'accept-language': 'zh-CN,zh;q=0.9,az;q=0.8',
'content-type': 'application/x-www-form-urlencoded; charset=UTF-8',
'hexin-v': 'Ayfk6qtnA0xHaYfA_cFMatawsFD0rPmWVYZ_jPmdQ3Pcb0kOAXyL3mVQD1QK',
'origin': 'https://upass.10jqka.com.cn',
'priority': 'u=1, i',
'referer': 'https://upass.10jqka.com.cn/login',
'sec-ch-ua': '"Google Chrome";v="137", "Chromium";v="137", "Not/A)Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36',
'x-requested-with': 'XMLHttpRequest',
# 'cookie': 'PHPSESSID=h8bshbcd7th1db57jv68fkgv5egmccbs; ta_random_userid=wn37oo0s6d; u_ukey=A10702B8689642C6BE607730E11E6E4A; u_uver=1.0.0; u_dpass=%2FAb67r3hBk7KrlRaNRP8Fo8nOcW4o2xHRZTKixnRsSR4tV%2Fe%2FNt%2FB5gJaUXHqdv0Hi80LrSsTFH9a%2B6rtRvqGg%3D%3D; u_did=0958F6AFE0D343DBA6DB466F77465C3C; u_ttype=WEB; Hm_lvt_722143063e4892925903024537075d0d=1752116124; Hm_lpvt_722143063e4892925903024537075d0d=1752116124; HMACCOUNT=9D394D4BCA744D57; Hm_lvt_929f8b362150b1f77b477230541dbbc2=1752116124; Hm_lpvt_929f8b362150b1f77b477230541dbbc2=1752116124; Hm_lvt_78c58f01938e4d85eaf619eae71b4ed1=1752116124; Hm_lpvt_78c58f01938e4d85eaf619eae71b4ed1=1752116124; ttype=WEB; v=Ayfk6qtnA0xHaYfA_cFMatawsFD0rPmWVYZ_jPmdQ3Pcb0kOAXyL3mVQD1QK',
}
self.uname = "xxx" # 用户名
self.passwd = "123456" # 密码
# 初始化时获取登录所需的参数
def req_getGS(self):
"""
获取登录所需的参数
:return:
"""
crnd = self.get_crnd()
self.crnd = crnd # 保存crnd以供后续使用
data = {
"uname": self.ras_encrypt_encode(self.uname),
"rsa_version": "default_5",
"crnd": crnd,
}
response = requests.post(
"https://upass.10jqka.com.cn/user/getGS",
cookies=self.cookies,
headers=self.headers,
data=data,
)
if response.status_code == 200:
json_data = response.json()
self.dsk = json_data.get("dsk", "")
self.dsv = json_data.get("dsv", "")
self.ssv = json_data.get("ssv", "")
# print(f"getGS请求成功: dsk={self.dsk}, dsv={self.dsv}, ssv={self.ssv}, crnd={self.crnd}")
else:
print(f"getGS请求Error: {response.status_code} - {response.text}")
# 执行登录请求
def req_login(self):
"""
登录请求
:return:
"""
data = {
"uname": self.ras_encrypt_encode(self.uname),
"passwd": self.ras_encrypt_encode(self.hex_md5(self.passwd)),
"saltLoginTimes": "1",
"longLogin": "on",
"rsa_version": "default_5",
"source": "pc_web",
"request_type": "login",
"captcha_type": "4",
"upwd_score": "30",
"ignore_upwd_score": "",
"passwdSalt": self.encode_data_salt_once(self.passwd, self.uname),
"dsk": self.dsk,
"crnd": self.crnd,
"ttype": "WEB",
"sdtis": "C22",
"timestamp": int(time.time() * 1000), # 当前时间戳(毫秒)
}
# print(data)
response = requests.post(
"https://upass.10jqka.com.cn/login/dologinreturnjson2",
cookies=self.cookies,
headers=self.headers,
data=data,
)
print("==============执行登录请求==============")
self.print_response("登录 response", response)
if response.status_code == 200:
try:
json_data = response.json()
except Exception:
print("返回内容不是JSON 可能是被重定向或未授权。请检查cookies和headers是否有效。")
return
if json_data.get("status") == "success":
print("登录成功")
elif json_data.get("errorcode") == -11400:
self.captcha_solve()
else:
print(f"登录失败: {json_data.get('msg', '未知错误')}")
else:
print(f"登录请求失败,状态码: {response.status_code} 请检查cookies和headers是否有效。")
def relogin(self):
data = {
"uname": self.ras_encrypt_encode(self.uname),
"passwd": self.ras_encrypt_encode(self.hex_md5(self.passwd)),
"saltLoginTimes": "1",
"longLogin": "on",
"rsa_version": "default_5",
"source": "pc_web",
"request_type": "login",
"captcha_type": "4",
'captcha_phrase': self.captcha_phrase,
'captcha_ticket': self.captcha_tiket,
'captcha_signature': self.captcha_sign,
"upwd_score": "30",
"ignore_upwd_score": "",
"passwdSalt": self.encode_data_salt_once(self.passwd, self.uname),
"dsk": self.dsk,
"crnd": self.crnd,
"ttype": "WEB",
"sdtis": "C22",
"timestamp": int(time.time() * 1000) # 当前时间戳(毫秒)
}
response = requests.post('https://upass.10jqka.com.cn/login/dologinreturnjson2', cookies=self.cookies, headers=self.headers, data=data)
print("==============重新登录请求==============")
self.print_response("重新登录 response", response)
# 获取验证码
def captcha_solve(self):
params = {
"captcha_type": "4",
"appid": "registernew",
"random": random.random() * int(time.time() * 1000),
"callback": "PreHandle",
}
print("==============需要滑块验证码 开始识别验证码图片==============")
response = requests.get(
"https://captcha.10jqka.com.cn/getPreHandle",
params=params,
cookies=self.cookies,
headers=self.headers,
)
# self.print_response("Captcha Response", response)
if response.status_code != 200:
print(f"获取验证码失败: {response.status_code} - {response.text}")
return
# 匹配 PreHandle(xxx) 并提取 xxx 内容
match = re.search(r"PreHandle\((.*)\)", response.text)
if match:
json_str = match.group(1)
try:
data = json.loads(json_str).get("data", {})
except Exception as e:
print(f"解析PreHandle内容失败: {e}\n内容为: {json_str}")
return
else:
print("未找到 PreHandle(xxx) 格式内容")
return
# 保存验证码参数到实例变量
self.captcha_src1 = data.get("src1")
self.captcha_src2 = data.get("src2")
self.captcha_sign = data.get("sign")
self.captcha_urlParams = data.get("urlParams")
# 解析urlParams为字典
self.captcha_urlParams_dict = (
dict(item.split("=", 1) for item in self.captcha_urlParams.split("&"))
if self.captcha_urlParams
else {}
)
self.captcha_imgs = data.get("imgs")
self.captcha_initx = data.get("initx")
self.captcha_inity = data.get("inity")
self.get_captcha_img_dis(data)
self.captcha_verfiy()
# 计算距离
def get_captcha_img_dis(self, data):
bg_url = "https://captcha.10jqka.com.cn/getImg?"+ data.get("urlParams")+ "&iuk="+ data.get("imgs", [])[0]
slice_url = "https://captcha.10jqka.com.cn/getImg?"+ data.get("urlParams")+ "&iuk="+ data.get("imgs", [])[1]
slide = ddddocr.DdddOcr(det=False, ocr=False, show_ad=False)
slice_image = requests.get(slice_url).content
bg_image = requests.get(bg_url).content
result = slide.slide_match(slice_image, bg_image, simple_target=True)
print(f"滑块验证码识别结果: {result}")
self.x = result['target'][0]
# 模拟验证码识别
def captcha_verfiy(self):
height = 177.22058823529412
inity = self.captcha_inity / 195 * height
self.captcha_phrase = str(self.x-15) + ";" + str(inity) + ";309;177.22058823529412"
params = {
"rand": self.captcha_urlParams_dict.get("rand", ""),
"time": self.captcha_urlParams_dict.get("time", ""),
"appid": "registernew",
"captcha_type": "4",
"signature": self.captcha_sign,
"phrase": self.captcha_phrase,
"callback": "verify",
}
# print(json.dumps({"滑块验证码参数": params}, ensure_ascii=False, indent=2))
response = requests.get(
"https://captcha.10jqka.com.cn/getTicket",
params=params,
cookies=self.cookies,
headers=self.headers,
)
print("==============开始模拟滑动验证==============")
self.print_response("滑块验证 Response", response)
match = re.search(r"verify\((.*)\)", response.text)
json_str = match.group(1)
data = json.loads(json_str)
if data.get("msg") == "ok":
print(f"滑块验证成功 ticket:{data.get('ticket')}")
# 保存验证码票据 并重新登录
self.captcha_tiket = data.get("ticket")
self.relogin()
else:
print(f"滑块验证失败: {data.get('msg', '未知错误')}")
def encode_data_salt_once(self, e, t):
# e: password, t: uname
if not (self.dsk and self.dsv and self.ssv):
return ""
crnd = self.crnd
sha256 = hashlib.sha256()
sha256.update((crnd + self.dsk).encode("utf-8"))
n = sha256.hexdigest()
ssv_decoded = base64.b64decode(self.ssv)
# getStrXOR: XOR two byte arrays
def get_str_xor(a, b):
if isinstance(a, str):
a = a.encode()
if isinstance(b, str):
b = b.encode()
return bytes([x ^ y for x, y in zip(a, b)])
n_xor = get_str_xor(ssv_decoded, n)
n_xor_str = n_xor.decode(errors="ignore")
n_split = n_xor_str.split("$")
if len(n_split) < 2:
return ""
n_eq = n_split[1].split("=")
if len(n_eq) < 2:
return ""
n_val = n_eq[1]
# r.hex_hmac(n_val, hex_md5(e))
hmac_sha256 = hashlib.pbkdf2_hmac(
"sha256", self.hex_md5(e).encode(), n_val.encode(), 1
)
dsv_hex = hashlib.sha256(self.dsv.encode()).hexdigest()
n2 = get_str_xor(hmac_sha256.hex(), dsv_hex)
n2_b64 = base64.b64encode(n2).decode()
# thsencrypt.encode: just return as is, unless you have a custom function
return n2_b64
def ras_encrypt_encode(self, b):
modulus_hex = "D90F4DD5BF444916913F7B434F192587C354387FA531F2964725B5188FB9D5B40FDDD2B34F61B5560468D1F5C568796EB15F7799F03E4A3301EAF8B79B655F1B2B7DC6FFE1084E4C14A05DD9C6D0C72C5ED75890DC5D11AB5990A3C7DEBA0D68EFFD8C619B2A21AEFA3E7FF902CDAE0502025901EE2D42B76A0D7AD389F0BF69"
exponent_hex = "10001"
if not modulus_hex or not exponent_hex:
raise ValueError("modulus and publicExponent could not be null")
modulus = int(modulus_hex, 16)
exponent = int(exponent_hex, 16)
rsa_key = RSA.construct((modulus, exponent))
cipher = PKCS1_v1_5.new(rsa_key)
encrypted = cipher.encrypt(b.encode("utf-8"))
if not encrypted:
raise ValueError("encrypt failed")
hex_encrypted = binascii.hexlify(encrypted).decode("utf-8")
b64_encrypted = base64.b64encode(binascii.unhexlify(hex_encrypted)).decode(
"utf-8"
)
return b64_encrypted
def hex_md5(self, s):
"""
计算字符串的MD5值并返回16进制字符串
"""
m = hashlib.md5()
m.update(s.encode("utf-8"))
return m.hexdigest()
def get_crnd(self):
crnd = "".join(
random.choices(string.ascii_lowercase + string.digits, k=8)
) + "".join(random.choices(string.ascii_lowercase + string.digits, k=8))
return crnd
def print_response(self, prefix, response):
"""
标准输出response结果 兼容unicode解码
"""
try:
text = response.text.encode("utf-8").decode("unicode_escape")
except Exception:
text = response.text
print(f"{prefix}: {response.status_code} - {text}")
if __name__ == "__main__":
t = TongHuaShun()
t.req_getGS() # 获取登录所需的参数
t.req_login() # 执行登录请求
# 这里可以添加更多的逻辑,比如处理验证码等