import
os
import
json
import
sqlite3
import
csv
import
shutil
import
re
from
datetime
import
datetime
from
glob
import
glob
province_code
=
input
(
"\n\033[32m请输入需要处理 JSON文件的省市区代码(默认值为50,代表重庆): \033[0m"
)
or
"50"
main_dir
=
'download/schoolspecialplan'
sub_dir
=
'download/schoolspecialscore'
db_dir
=
'db'
db_path
=
os.path.join(db_dir, f
'{province_code}_merge_data.db'
)
backup_db_path
=
os.path.join(db_dir, f
'{province_code}_merge_data.db.bak'
)
school_csv_path
=
'src/school_id.csv'
school_info_path
=
'src/school_info_data.json'
if
not
os.path.exists(db_dir):
os.makedirs(db_dir)
if
os.path.exists(db_path):
if
os.path.exists(backup_db_path):
os.remove(backup_db_path)
shutil.copy(db_path, backup_db_path)
print
(f
"\n\033[33m数据库 {province_code}_merge_data.db文件已备份为 {backup_db_path}\033[0m\n"
)
if
os.path.exists(db_path):
os.remove(db_path)
conn
=
sqlite3.connect(db_path)
cursor
=
conn.cursor()
province_id_name_mapping
=
{
"11"
:
"北京"
,
"12"
:
"天津"
,
"13"
:
"河北"
,
"14"
:
"山西"
,
"15"
:
"内蒙古"
,
"21"
:
"辽宁"
,
"22"
:
"吉林"
,
"23"
:
"黑龙江"
,
"31"
:
"上海"
,
"32"
:
"江苏"
,
"33"
:
"浙江"
,
"34"
:
"安徽"
,
"35"
:
"福建"
,
"36"
:
"江西"
,
"37"
:
"山东"
,
"41"
:
"河南"
,
"42"
:
"湖北"
,
"43"
:
"湖南"
,
"44"
:
"广东"
,
"45"
:
"广西"
,
"46"
:
"海南"
,
"50"
:
"重庆"
,
"51"
:
"四川"
,
"52"
:
"贵州"
,
"53"
:
"云南"
,
"54"
:
"西藏"
,
"61"
:
"陕西"
,
"62"
:
"甘肃"
,
"63"
:
"青海"
,
"64"
:
"宁夏"
,
"65"
:
"新疆"
}
def
get_json_files(province_name):
province_dir
=
os.path.join(main_dir, province_name)
if
not
os.path.exists(province_dir):
print
(f
"错误:找不到 {province_name} 省的文件夹路径。"
)
return
[]
json_files_pattern
=
os.path.join(province_dir,
'*'
,
'*_schoolspecialplan.json'
)
return
glob(json_files_pattern)
def
get_corresponding_sub_file(json_file):
sub_file
=
json_file.replace(main_dir, sub_dir).replace(
'schoolspecialplan.json'
,
'schoolspecialscore.json'
)
return
sub_file
def
load_school_data():
school_dict
=
{}
with
open
(school_csv_path, mode
=
'r'
, encoding
=
'utf-8'
) as f:
reader
=
csv.reader(f)
for
row
in
reader:
if
len
(row) >
=
2
:
school_dict[row[
1
]]
=
row[
0
]
return
school_dict
def
load_school_info():
with
open
(school_info_path,
'r'
, encoding
=
'utf-8'
) as f:
data
=
json.load(f)
return
data.get(
"data"
, {})
def
get_school_level(school_name, school_info_data):
for
school
in
school_info_data.values():
if
school.get(
"name"
)
=
=
school_name:
if
school.get(
"dual_class"
)
=
=
"1"
and
school.get(
"f211"
)
=
=
"1"
and
school.get(
"f985"
)
=
=
"1"
:
return
"985"
elif
school.get(
"dual_class"
)
=
=
"1"
and
school.get(
"f211"
)
=
=
"1"
and
school.get(
"f985"
)
=
=
"2"
:
return
"211"
elif
school.get(
"dual_class"
)
=
=
"1"
and
school.get(
"f211"
)
=
=
"2"
and
school.get(
"f985"
)
=
=
"2"
:
return
"双一流"
else
:
return
""
return
""
school_dict
=
load_school_data()
school_info_data
=
load_school_info()
def
create_table(province_name):
table_name
=
f
"score_and_plan"
cursor.execute(f
)
conn.commit()
def
insert_data(data, province_name):
table_name
=
f
"score_and_plan"
for
item
in
data:
item.setdefault(
'最低分'
,
'-'
)
item.setdefault(
'最低位次'
,
'-'
)
item.setdefault(
'选科要求'
, '')
item.setdefault(
'专业详解'
, '')
item.setdefault(
'学制'
, '')
item.setdefault(
'招生人数'
, '')
item.setdefault(
'每年学费'
, '')
cursor.executemany(f
, data)
conn.commit()
def
get_subject_type(type_code):
if
type_code
=
=
"1"
:
return
"理科"
elif
type_code
=
=
"2"
:
return
"文科"
elif
type_code
=
=
"2073"
:
return
"物理类"
elif
type_code
=
=
"2074"
:
return
"历史类"
return
type_code
def
get_length_type(type_code):
if
type_code
=
=
"1"
:
return
"一年"
elif
type_code
=
=
"3"
:
return
"三年"
elif
type_code
=
=
"4"
:
return
"四年"
elif
type_code
=
=
"5"
:
return
"五年"
return
type_code
def
parse_data(data):
main_name
=
data.split(
'('
)[
0
].strip()
brackets
=
re.findall(r
'(([^)]*))'
, data)
return
main_name, brackets
def
process_json_file(json_file, json_file_sub):
data_items
=
[]
filename
=
os.path.basename(json_file)
school_code, year, province_code
=
filename.split(
'_'
)[:
3
]
school_name
=
school_dict.get(school_code, f
"未知学校_{school_code}"
)
province_name
=
province_id_name_mapping.get(province_code, f
"未知省份_{province_code}"
)
school_level
=
get_school_level(school_name, school_info_data)
with
open
(json_file, encoding
=
'utf-8'
) as f1:
plan_data
=
json.load(f1)
with
open
(json_file_sub, encoding
=
'utf-8'
) as f2:
score_data
=
json.load(f2)
for
plan_key
in
plan_data[
'data'
]:
plan_items
=
plan_data[
'data'
][plan_key][
'item'
]
score_items
=
score_data[
'data'
].get(plan_key, {}).get(
'item'
, [])
for
plan_item
in
plan_items[:]:
sp_name
=
plan_item.get(
'sp_name'
,
'未知专业'
)
spname
=
plan_item.get(
'spname'
, '')
length
=
get_length_type(plan_item.get(
'length'
, ''))
local_batch_name
=
plan_item.get(
'local_batch_name'
, '')
num
=
plan_item.get(
'num'
, '')
tuition
=
plan_item.get(
'tuition'
, '')
sp_info
=
plan_item.get(
'sp_info'
, '')
xk_type
=
get_subject_type(plan_item.get(
'type'
, ''))
matched_score_item
=
None
for
score_item
in
score_items:
main1, brackets1
=
parse_data(score_item.get(
'spname'
, ''))
main2, brackets2
=
parse_data(spname)
if
main1 !
=
main2:
continue
if
not
all
(b
in
brackets2
for
b
in
brackets1):
continue
min_score
=
score_item.get(
'min'
,
'-'
)
min_section
=
score_item.get(
'min_section'
,
'-'
)
score_spname
=
score_item.get(
'spname'
,
'-'
)
matched_score_item
=
{
'等级'
: school_level,
'学校名称'
: school_name,
'省市区'
: province_name,
'招生年份'
: year,
'类型'
: xk_type,
'录取批次'
: local_batch_name,
'专业名称'
: score_spname,
'专业详解'
: spname,
'招生人数'
: num,
'学制'
: length,
'每年学费'
: tuition,
'选科要求'
: sp_info,
'最低分'
: min_score,
'最低位次'
: min_section
}
score_items.remove(score_item)
break
if
matched_score_item:
data_items.append(matched_score_item)
plan_items.remove(plan_item)
return
data_items
def
process_all_files(province_code):
province_name
=
province_id_name_mapping.get(province_code,
"重庆"
)
json_files
=
get_json_files(province_name)
if
not
json_files:
print
(
"没有找到相关的文件。"
)
return
create_table(province_name)
total_files
=
len
(json_files)
for
index, json_file
in
enumerate
(json_files, start
=
1
):
json_file_sub
=
get_corresponding_sub_file(json_file)
data_items
=
process_json_file(json_file, json_file_sub)
insert_data(data_items, province_name)
print
(f
"处理进度: {index}/{total_files} 文件已处理"
, end
=
'\r'
)
print
()
print
(f
"\n\033[32m数据已成功保存到 {db_path} 数据库中\033[0m"
)
if
province_code
not
in
province_id_name_mapping:
print
(f
"无效的省市区代码:{province_code}"
)
else
:
process_all_files(province_code)
conn.close()
input
(f
"\033[31m按 Enter 键退出...\033[0m"
)