[Python] 纯文本查看 复制代码
import multiprocessing
import os
import random
import sys
import time
import requests
import json
import urllib3
urllib3.disable_warnings()
from faker import Faker
fake = Faker(locale='zh_CN')
def get_headers():
headers = {
'Content-Type': 'application/json',
'User-Agent': fake.user_agent()
}
return headers
def get_category(page_num):
url = "https://www.rymusic.art/dbk12front/Service/PavilionsService/getK12ResourceList"
payload = json.dumps({
"pageSize": 12,
"pageNumber": page_num,
"resTypeId": [
"HTML",
"Disk"
],
"resCategoryIds": [
"90de89cf205311ecaf66fa163e5473a9"
],
"publishStatus": 1,
"orderBy": "a.publish_date desc"
})
print(f'获取合辑:{url}\n页数:{page_num}')
return request_method("POST", url, get_headers(), payload, False)
def get_res_relations(single_res_id, page_num):
url = "https://www.rymusic.art/dbk12front/Service/PavilionsService/getRYK12ResRelations"
payload = json.dumps({
"pageSize": 12,
"pageNumber": page_num,
"resId": single_res_id,
"resTypeId": [
"Unit"
],
"publishStatus": 1,
"subResList": 1
})
print(f'获取id:{single_res_id}\n课本:{url}\n页数:{page_num}\n')
return request_method("POST", url, get_headers(), payload, False)
def get_preview(ref_code, user_id):
url = "https://www.rymusic.art/dbk12front/Service/PavilionsService/awtSupportResPreview"
payload = json.dumps({
"refCode": ref_code,
"action": "preview",
"userId": user_id,
"watermarkText": "人音教材",
"awtPlayerPreviewUrl": "pc",
"fileKind": ""
})
return request_method("POST", url, get_headers(), payload, False)
def request_method(request_type, request_url, headers, payload, is_stream):
with requests.Session() as s:
status = 0
count = 0
while status != 200:
if count != 0:
time.sleep(random.randint(1, 3))
count = count + 1
try:
resp = s.request(request_type, request_url, headers=headers, data=payload, timeout=5, stream=is_stream, verify=False)
status = resp.status_code
except Exception as e:
print(f'网络异常{e}')
time.sleep(random.randint(1, 3))
if is_stream:
return resp
else:
return resp.json()
def download_file(download_url, dir_path, file_name):
r = request_method('GET', download_url, headers='', payload='', is_stream=True)
# 获取文件下载数据源
content = r.content
# 打开文件写入
file_path = os.path.join(dir_path, file_name)
with open(file_path, 'wb') as f:
f.write(content)
def get_music_info_list(my_info):
res_id = my_info[0]
user_id = my_info[1]
music_info_list_result = []
is_more = True
res_page_num = 1
while is_more:
res_list_json = get_res_relations(res_id, res_page_num)
music_list = res_list_json.get('result')
if len(music_list) > 0:
for music_info in music_list:
res_name = music_info.get('resName')
sub_res_list = music_info.get('subResList')
for sub_music in sub_res_list:
ref_code = sub_music.get('refCode')
preview_info = get_preview(ref_code, user_id)
download_url = preview_info.get('url')
file_name = sub_music.get('resName')
file_suffix = sub_music.get('fileType')
music_info_list_result.append([res_name, download_url, file_name, file_suffix])
res_page_num = res_page_num + 1
else:
is_more = False
return music_info_list_result
def to_download_music(music_info):
directory = music_info[0]
download_url = music_info[1]
file_name = music_info[2]
file_suffix = music_info[3]
file_full_name = f'{file_name}.{file_suffix}'
program_path = os.path.dirname(os.path.realpath(sys.argv[0]))
res_dir_path = os.path.join(program_path, '下载', directory)
if not os.path.exists(res_dir_path):
os.makedirs(res_dir_path)
if not os.path.isfile(os.path.join(res_dir_path, file_full_name)):
print(f'开始下载【{file_full_name}】:{download_url}')
download_file(download_url, res_dir_path, file_full_name)
else:
pass
print('文件已下载,跳过')
def get_res_ids():
res_ids_result = []
category_page_num = 1
category_has_more = True
while category_has_more:
category_rsp = get_category(category_page_num)
category_list = category_rsp.get('result')
if len(category_list) > 0:
print(category_list)
for item in category_list:
res_id = item.get('resId')
res_ids_result.append(res_id)
category_page_num = category_page_num + 1
else:
category_has_more = False
return res_ids_result
if __name__ == '__main__':
my_user_id = "22bae34df412343a82de780597a5154a"
res_ids_result = get_res_ids()
pool = multiprocessing.Pool(processes=int(multiprocessing.cpu_count() * 0.5))
res_ids = [[res_id, my_user_id] for res_id in res_ids_result]
music_info_result = pool.map(get_music_info_list, res_ids)
# music_info_result = []
# for c in range(0, 1):
# res_id = res_ids[c]
# for res_id in res_ids:
# res_id_result = get_music_info_list(res_id)
# music_info_result.append(res_id_result)
all_music_urls = []
# for cc in range(0, 1):
# for info in music_info_result[cc]:
for item in music_info_result:
for info in item:
all_music_urls.append(info)
pool.map(to_download_music, all_music_urls)
# for info in all_music_urls:
# to_download_music(info)
print('下载完成')