吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 586|回复: 21
收起左侧

[已解决] 关于python合并sqlite数据库文件的问题

[复制链接]
agooo 发表于 2025-2-11 09:29
本帖最后由 agooo 于 2025-2-15 14:06 编辑

各位大佬好。
我用python爬了一个json文件,已经用代码将json文件转成了需要的db文件,源代码地址:
https://wwiw.lanzouo.com/ijL9Z2nfk8pe
密码:3d6r

这两个db文件的表头结构是:

plan

plan

score

score

我想把这两个db文件合并成一个db文件,保留他们字段相同的部分,专业名称 和 专业详解 这两列的内容其实是互补的,如果专业详解中包含专业名称的关键字,就把这两个db中的数据合并成一行。(我不知道我表达清楚没有
我始终无法合并成一行,导致db中总是出现两个db数据中的各自数据,不能完全融合在一起。
我尝试在处理json文件时就直接生成融合的db文件,结果如下:

merge

merge

我发现我的代码也并没有完全处理好两个不同schoolspecialscore路径下json文件中的全部数据。

2024-02-12 19:40 PS:修改了精确筛选(三个条件:batch, sp_info, special_id)的条件,但是最低分,最低位次的返回数据是错误的。
2024-02-13 09:13 PS:修改了匹配关键key,解决了部分问题,还有剩余部分无法匹配返回score的数据。
2024-02-14 18:57 PS:更新了匹配函数,95%的数据都是正确的,但是个别spname虽然判匹配,但是返回的min和min_section的值是相同的。(返回错误数据,举例:重庆大学,建筑学)
2024-02-15 14:06 PS:问题已全部解决,感谢@知心 大佬的热心帮助!



[Python] 纯文本查看 复制代码
001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
import os
import json
import sqlite3
import csv
import shutil
import re
from datetime import datetime
from glob import glob
  
province_code = input("\n\033[32m请输入需要处理 JSON文件的省市区代码(默认值为50,代表重庆): \033[0m") or "50"
  
main_dir = 'download/schoolspecialplan'
sub_dir = 'download/schoolspecialscore'
db_dir = 'db'
db_path = os.path.join(db_dir, f'{province_code}_merge_data.db')
backup_db_path = os.path.join(db_dir, f'{province_code}_merge_data.db.bak')
school_csv_path = 'src/school_id.csv'
school_info_path = 'src/school_info_data.json'
  
if not os.path.exists(db_dir):
    os.makedirs(db_dir)
  
if os.path.exists(db_path):
    if os.path.exists(backup_db_path):
        os.remove(backup_db_path)
    shutil.copy(db_path, backup_db_path)
    print(f"\n\033[33m数据库 {province_code}_merge_data.db文件已备份为 {backup_db_path}\033[0m\n")
  
if os.path.exists(db_path):
    os.remove(db_path)
  
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
  
province_id_name_mapping = {
    "11": "北京", "12": "天津", "13": "河北", "14": "山西", "15": "内蒙古",
    "21": "辽宁", "22": "吉林", "23": "黑龙江", "31": "上海", "32": "江苏",
    "33": "浙江", "34": "安徽", "35": "福建", "36": "江西", "37": "山东",
    "41": "河南", "42": "湖北", "43": "湖南", "44": "广东", "45": "广西",
    "46": "海南", "50": "重庆", "51": "四川", "52": "贵州", "53": "云南",
    "54": "西藏", "61": "陕西", "62": "甘肃", "63": "青海", "64": "宁夏", "65": "新疆"
}
  
def get_json_files(province_name):
    province_dir = os.path.join(main_dir, province_name)
    if not os.path.exists(province_dir):
        print(f"错误:找不到 {province_name} 省的文件夹路径。")
        return []
  
    json_files_pattern = os.path.join(province_dir, '*', '*_schoolspecialplan.json')
    return glob(json_files_pattern)
  
def get_corresponding_sub_file(json_file):
    sub_file = json_file.replace(main_dir, sub_dir).replace('schoolspecialplan.json', 'schoolspecialscore.json')
    return sub_file
  
def load_school_data():
    school_dict = {}
    with open(school_csv_path, mode='r', encoding='utf-8') as f:
        reader = csv.reader(f)
        for row in reader:
            if len(row) >= 2:
                school_dict[row[1]] = row[0]
    return school_dict
  
def load_school_info():
    with open(school_info_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    return data.get("data", {})
  
def get_school_level(school_name, school_info_data):
    for school in school_info_data.values():
        if school.get("name") == school_name:
            if school.get("dual_class") == "1" and school.get("f211") == "1" and school.get("f985") == "1":
                return "985"
            elif school.get("dual_class") == "1" and school.get("f211") == "1" and school.get("f985") == "2":
                return "211"
            elif school.get("dual_class") == "1" and school.get("f211") == "2" and school.get("f985") == "2":
                return "双一流"
            else:
                return ""
    return ""
  
school_dict = load_school_data()
school_info_data = load_school_info()
  
def create_table(province_name):
    table_name = f"score_and_plan"
    cursor.execute(f'''
        CREATE TABLE IF NOT EXISTS "{table_name}" (
            等级 TEXT,
            学校名称 TEXT,
            省市区 TEXT,
            招生年份 TEXT,
            类型 TEXT,
            录取批次 TEXT,
            专业名称 TEXT,
            专业详解 TEXT,      
            最低分 TEXT,
            最低位次 TEXT,
            招生人数 TEXT,
            学制 TEXT,
            每年学费 TEXT,
            选科要求 TEXT
        )
    ''')
    conn.commit()
  
def insert_data(data, province_name):
    table_name = f"score_and_plan"
  
    for item in data:
        item.setdefault('最低分', '-')
        item.setdefault('最低位次', '-')
        item.setdefault('选科要求', '')
        item.setdefault('专业详解', '')
        item.setdefault('学制', '')
        item.setdefault('招生人数', '')
        item.setdefault('每年学费', '')
  
    cursor.executemany(f'''
        INSERT OR IGNORE INTO "{table_name}" (
            等级, 学校名称, 省市区, 招生年份, 类型, 录取批次, 专业名称, 专业详解, 最低分, 最低位次, 招生人数, 学制, 每年学费, 选科要求
        ) VALUES (
            :等级, :学校名称, :省市区, :招生年份, :类型, :录取批次, :专业名称, :专业详解, :最低分, :最低位次, :招生人数, :学制, :每年学费, :选科要求
        )
    ''', data)
    conn.commit()
  
def get_subject_type(type_code):
    if type_code == "1":
        return "理科"
    elif type_code == "2":
        return "文科"
    elif type_code == "2073":
        return "物理类"
    elif type_code == "2074":
        return "历史类"
    return type_code
  
def get_length_type(type_code):
    if type_code == "1":
        return "一年"
    elif type_code == "3":
        return "三年"
    elif type_code == "4":
        return "四年"
    elif type_code == "5":
        return "五年"
    return type_code
 
def parse_data(data):
    """解析数据,提取主名称和括号内的内容列表"""
    main_name = data.split('(')[0].strip()
    brackets = re.findall(r'(([^)]*))', data)
    return main_name, brackets
 
def process_json_file(json_file, json_file_sub):
    data_items = []
    filename = os.path.basename(json_file)
    school_code, year, province_code = filename.split('_')[:3]
    school_name = school_dict.get(school_code, f"未知学校_{school_code}")
    province_name = province_id_name_mapping.get(province_code, f"未知省份_{province_code}")
    school_level = get_school_level(school_name, school_info_data)
 
    with open(json_file, encoding='utf-8') as f1:
        plan_data = json.load(f1)
 
    with open(json_file_sub, encoding='utf-8') as f2:
        score_data = json.load(f2)
 
    for plan_key in plan_data['data']:
        plan_items = plan_data['data'][plan_key]['item']
        score_items = score_data['data'].get(plan_key, {}).get('item', [])
 
        for plan_item in plan_items[:]:
            sp_name = plan_item.get('sp_name', '未知专业')
            spname = plan_item.get('spname', '')
            length = get_length_type(plan_item.get('length', ''))
            local_batch_name = plan_item.get('local_batch_name', '')
            num = plan_item.get('num', '')
            tuition = plan_item.get('tuition', '')
            sp_info = plan_item.get('sp_info', '')
            xk_type = get_subject_type(plan_item.get('type', ''))
 
            matched_score_item = None
            for score_item in score_items:
                main1, brackets1 = parse_data(score_item.get('spname', ''))
                main2, brackets2 = parse_data(spname)
 
                if main1 != main2:
                    continue
 
                if not all(b in brackets2 for b in brackets1):
                    continue
 
                min_score = score_item.get('min', '-')
                min_section = score_item.get('min_section', '-')
                score_spname = score_item.get('spname', '-')
 
                matched_score_item = {
                    '等级': school_level,
                    '学校名称': school_name,
                    '省市区': province_name,
                    '招生年份': year,
                    '类型': xk_type,
                    '录取批次': local_batch_name,
                    '专业名称': score_spname,
                    '专业详解': spname,
                    '招生人数': num,
                    '学制': length,
                    '每年学费': tuition,
                    '选科要求': sp_info,
                    '最低分': min_score,
                    '最低位次': min_section
                }
 
                score_items.remove(score_item)
                break
 
            if matched_score_item:
                data_items.append(matched_score_item)
                plan_items.remove(plan_item)
 
    return data_items
 
def process_all_files(province_code):
    province_name = province_id_name_mapping.get(province_code, "重庆")
    json_files = get_json_files(province_name)
  
    if not json_files:
        print("没有找到相关的文件。")
        return
  
    create_table(province_name)
  
    total_files = len(json_files)
    for index, json_file in enumerate(json_files, start=1):
        json_file_sub = get_corresponding_sub_file(json_file)
        data_items = process_json_file(json_file, json_file_sub)
        insert_data(data_items, province_name)
        print(f"处理进度: {index}/{total_files} 文件已处理", end='\r')
  
    print()
    print(f"\n\033[32m数据已成功保存到 {db_path} 数据库中\033[0m")
  
if province_code not in province_id_name_mapping:
    print(f"无效的省市区代码:{province_code}")
else:
    process_all_files(province_code)
  
conn.close()
  
input(f"\033[31m按 Enter 键退出...\033[0m")

免费评分

参与人数 1吾爱币 +1 热心值 +1 收起 理由
知心 + 1 + 1 我很赞同!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

zunmx 发表于 2025-2-11 11:02
可以考虑使用 pandasql 然后连接查询 合并成一张表。
一般我用csv来存储一些数据,这里可以用read_sql 然后挂上sqlite的驱动
plan = pandas.read_csv() # 读取 schoolspecialplan
score =  pandas.read_csv() # 读取 schoolspecialscore
result = pandasql("SELECT column ... FROM plan p LEFT JOIN score s ON p.xx = s.xx") # 连接查询
resut.to_csv("./result.csv") # 导出结果

不知道我理解对你的需求了没有。
 楼主| agooo 发表于 2025-2-11 11:34
zunmx 发表于 2025-2-11 11:02
可以考虑使用 pandasql 然后连接查询 合并成一张表。
一般我用csv来存储一些数据,这里可以用read_sql 然 ...

谢谢,我思考一下你的方法。
 楼主| agooo 发表于 2025-2-11 13:13
本帖最后由 agooo 于 2025-2-11 13:18 编辑
zunmx 发表于 2025-2-11 11:02
可以考虑使用 pandasql 然后连接查询 合并成一张表。
一般我用csv来存储一些数据,这里可以用read_sql 然 ...

用了你的办法,还是搞不懂,合并的数据是混乱的,并没有一一对应起来。
有很多错误的组合,主要的问题是,专业名称和专业详解匹配的情况下,添加plan中没有的score的相关字段数据。
我做不到专业名称和专业详解之间的模糊关键匹配,从而大量重复添加了很多错误组合数据。
你能帮我写段代码试试合并csv吗?

这是csv的合并文件,专业详解中的内容完全失真了。
image.png image.png




知心 发表于 2025-2-11 13:19
本帖最后由 知心 于 2025-2-11 13:23 编辑

不符合要求的数据要保留吗
qiaoling 发表于 2025-2-11 13:20
哪里爬取的  url发下了
 楼主| agooo 发表于 2025-2-11 13:43
知心 发表于 2025-2-11 13:19
不符合要求的数据要保留吗

其实最主要的是把plan中的:学制,招生人数,每年学费;score中的最低分,最低位次,选科要求。放在同一行的相同“等级,学校名称,省市区,招生年份,类型,录取批次,专业名称”的行中。
其实在处理一行的数据时,取的是plan和score的合集,但是合集中又要去重一些字段。

有一些专业名称是同名的,但是需要通过专业详解来区分,我搞不定的就是这一点。

说起来有点拗口,不知道我讲清楚没有。
 楼主| agooo 发表于 2025-2-11 13:44
本帖最后由 agooo 于 2025-2-11 14:09 编辑
qiaoling 发表于 2025-2-11 13:20
哪里爬取的  url发下了

我现在写的爬取json文件已经可以下载全部省市区的每年招生数据了。
就是怎么对处理的db文件进一步的融合,现在做的感觉还有合并精简的空间。
知心 发表于 2025-2-11 14:23
本帖最后由 知心 于 2025-2-11 15:58 编辑
agooo 发表于 2025-2-11 13:43
其实最主要的是把plan中的:学制,招生人数,每年学费;score中的最低分,最低位次,选科要求。放在同一 ...


https://wwfv.lanzouw.com/b028kzj6sf
密码:9vuk
看一下是不是这样的
Pwaerm 发表于 2025-2-11 14:46
  ai应该直接就可以写出代码  复制运行就可以啦
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-5-20 11:43

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表