吾爱破解 - LCG - LSG |安卓破解|病毒分析|www.52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 756|回复: 14
收起左侧

[已解决] 多进程multiprocessing读写问题

[复制链接]
lizy169 发表于 2023-9-4 20:13
50吾爱币
两个py文件,一个多进程循环追加写入,一个多进程循环读取;
用csv格式没发现问题,改成HDF5就不行了,提示读写错误,

请大神指点,谢谢!!

写入py
[Python] 纯文本查看 复制代码
import multiprocessing as mp
import pandas as pd
import numpy as np


def w_in():
    df = pd.DataFrame(np.random.randint(0, 100, size=(100, 4)), columns=list('ABCD'))
    df.to_hdf('data.h5', key='abc', append=True, format='table')


if __name__ == '__main__':
    p1 = mp.Process(target=w_in, name='p1')
    p2 = mp.Process(target=w_in, name='p2')
    p3 = mp.Process(target=w_in, name='p3')

    mp_list = [p1, p2, p3]
    for t in mp_list:
        t.start()

    for t in mp_list:
        t.join()

    while True:
        if not p1.is_alive():
            p1 = mp.Process(target=w_in, name='p1')
            p1.start()

        if not p2.is_alive():
            p2 = mp.Process(target=w_in, name='p2')
            p2.start()

        if not p3.is_alive():
            p3 = mp.Process(target=w_in, name='p3')
            p3.start()

        for t in mp_list:
            t.join()



读取py
[Python] 纯文本查看 复制代码
import multiprocessing as mp
import pandas as pd


def read_data():
    read = pd.read_hdf('data.h5', key='abc')
    print(read)


if __name__ == '__main__':

    p1 = mp.Process(target=read_data, name='p1')
    p2 = mp.Process(target=read_data, name='p2')
    p3 = mp.Process(target=read_data, name='p3')
    p4 = mp.Process(target=read_data, name='p4')
    p5 = mp.Process(target=read_data, name='p5')

    mp_list = [p1, p2, p3, p4, p5]
    for t in mp_list:
        t.start()

    for t in mp_list:
        t.close()

    for t in mp_list:
        t.join()

    while True:
        if not p1.is_alive():
            p1 = mp.Process(target=read_data, name='p1')
            p1.start()

        if not p2.is_alive():
            p2 = mp.Process(target=read_data, name='p2')
            p2.start()

        if not p3.is_alive():
            p3 = mp.Process(target=read_data, name='p3')
            p3.start()

        if not p1.is_alive():
            p4 = mp.Process(target=read_data, name='p4')
            p4.start()

        if not p1.is_alive():
            p5 = mp.Process(target=read_data, name='p5')
            p5.start()

        for t in mp_list:
            t.join()

最佳答案

查看完整内容

[md] 我试着使用进程锁也无法解决问题,那么很可能是底层出了问题。 然后去查看了 `Pandas` 的 `to_hdf` 文档,知道了它基于 `PyTables` 库。 [/md] [md] 最终在 `PyTables` 的 `FAQ` 中找到了答案:**在多线程、多进程的情况下,读取内容是没有问题的,但写入内容会出问题!** 链接在这里:`https://www.pytables.org/FAQ.html`,里面提到了解决办法。 [/md] * * * 后续补充 * * * [md] 回想起来意识到 ...

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

LoveCode 发表于 2023-9-4 20:13
本帖最后由 LoveCode 于 2023-9-5 12:13 编辑

我试着使用进程锁也无法解决问题,那么很可能是底层出了问题。
然后去查看了 Pandasto_hdf 文档,知道了它基于 PyTables 库。



Snipaste_2023-09-05_00-28-27.png

最终在 PyTablesFAQ 中找到了答案:在多线程、多进程的情况下,读取内容是没有问题的,但写入内容会出问题!
链接在这里:https://www.pytables.org/FAQ.html,里面提到了解决办法。



Snipaste_2023-09-05_00-26-11.png


* * * 后续补充 * * *

回想起来意识到了我的失误:

  1. Pandas 使用了 PyTables,现在 PyTables 不支持多进程写内容,自然影响到了 Pandasto_hdf 方法。
  2. 即便解决了 PyTables 的多进程问题,最后还是要回到 Pandas 上,它内部使用了 PyTables,总不能去修改 Pandas 的源码。

于是我仔细看了上述 PyTables 文档提到的四种解决方案,简单概括就是实现多进程的通信。根据文章的提示,
它有一个测试文件 example/multiprocess_access_benchmarks.py,这里上 Github 项目,查看对应目录中的测试文件,里面展示了上述四种方案。

那么现在对 Pandas 的使用来说,核心问题在于:不能有多个进程来写入 HDF 文件,那么我能想到的是如下方案:

  1. 单独启动一个 write_to_hdf 进程,它从管道、共享队列等中读取内容并且写入 HDF 文件,并且只有它能写入内容。
  2. 其它进程则将数据通过管道、共享队列等发送给 write_to_hdf 进程,这样就避免了多进程写 HDF 文件的问题。

现在修改 write.py 如下,因我使用的是共享队列。

import multiprocessing as mp
import pandas as pd
import numpy as np

def write_to_hdf(hdf_filename: str, q: mp.Queue):
    while True:
        df = q.get()
        # 取出队列中的 DataFrame,然后再写入文件
        df.to_hdf(hdf_filename, key="abc", append=True, format="table")
        print("测试: 成功写入")

def w_in(q: mp.Queue):
    print("w_in 启动")
    df = pd.DataFrame(np.random.randint(0, 100, size=(100, 4)), columns=list("ABCD"))
    # 这里直接将 df 传入队列
    q.put(df)

if __name__ == "__main__":
    # 队列中的数据类型是 pd.DataFrame
    data_queue = mp.Queue(maxsize=10)
    mp.Process(target=write_to_hdf, args=("data.h5", data_queue)).start()

    p1 = mp.Process(target=w_in, name="p1", args=(data_queue,))
    p2 = mp.Process(target=w_in, name="p2", args=(data_queue,))
    p3 = mp.Process(target=w_in, name="p3", args=(data_queue,))

    mp_list = [p1, p2, p3]
    for t in mp_list:
        t.start()

    for t in mp_list:
        t.join()

    while True:
        if not p1.is_alive():
            p1 = mp.Process(target=w_in, name="p1", args=(data_queue,))
            p1.start()

        if not p2.is_alive():
            p2 = mp.Process(target=w_in, name="p2", args=(data_queue,))
            p2.start()

        if not p3.is_alive():
            p3 = mp.Process(target=w_in, name="p3", args=(data_queue,))
            p3.start()

        for t in mp_list:
            t.join()

测试写入 HDF 文件的代码结果如下:
image-20230905104201557.png

顺便测试了一下读取 HDF 文件的代码也是没有问题的。





免费评分

参与人数 2吾爱币 +2 热心值 +2 收起 理由
lbbas + 1 + 1 热心回复!
xinluan + 1 + 1 用心讨论,共获提升!

查看全部评分

绿颜〃 发表于 2023-9-4 21:12
绿颜〃 发表于 2023-9-4 21:14
写入py
[Python] 纯文本查看 复制代码
import multiprocessing as mp
import pandas as pd
import numpy as np


def w_in():
    df = pd.DataFrame(np.random.randint(0, 100, size=(100, 4)), columns=list('ABCD'))
    df.to_hdf('data.h5', key='abc', append=True, format='table')


if __name__ == '__main__':
    p1 = mp.Process(target=w_in, name='p1')
    p2 = mp.Process(target=w_in, name='p2')
    p3 = mp.Process(target=w_in, name='p3')

    mp_list = [p1, p2, p3]
    for t in mp_list:
        t.start()

    while True:
        if not p1.is_alive():
            p1 = mp.Process(target=w_in, name='p1')
            p1.start()

        if not p2.is_alive():
            p2 = mp.Process(target=w_in, name='p2')
            p2.start()

        if not p3.is_alive():
            p3 = mp.Process(target=w_in, name='p3')
            p3.start()

        for t in mp_list:
            t.join()
            break

读取py
[Python] 纯文本查看 复制代码
import multiprocessing as mpimport pandas as pd


def read_data():
    read = pd.read_hdf('data.h5', key='abc')
    print(read)


if __name__ == '__main__':
    p1 = mp.Process(target=read_data, name='p1')
    p2 = mp.Process(target=read_data, name='p2')
    p3 = mp.Process(target=read_data, name='p3')
    p4 = mp.Process(target=read_data, name='p4')
    p5 = mp.Process(target=read_data, name='p5')

    mp_list = [p1, p2, p3, p4, p5]
    for t in mp_list:
        t.start()

    while True:
        if not p1.is_alive():
            p1 = mp.Process(target=read_data, name='p1')
            p1.start()

        if not p2.is_alive():
            p2 = mp.Process(target=read_data, name='p2')
            p2.start()

        if not p3.is_alive():
            p3 = mp.Process(target=read_data, name='p3')
            p3.start()

        if not p4.is_alive():
            p4 = mp.Process(target=read_data, name='p4')
            p4.start()

        if not p5.is_alive():
            p5 = mp.Process(target=read_data, name='p5')
            p5.start()

        for t in mp_list:
            t.join()

免费评分

参与人数 1吾爱币 +1 热心值 +1 收起 理由
lizy169 + 1 + 1 谢谢@Thanks!

查看全部评分

 楼主| lizy169 发表于 2023-9-4 21:25
绿颜〃 发表于 2023-9-4 21:14
写入py[mw_shl_code=python,true]import multiprocessing as mp
import pandas as pd
import numpy as np ...

End of HDF5 error back trace

Unable to open/create file 'data.h5'


也一样报错哦
njbb888 发表于 2023-9-5 08:23
这就是python特色,无法避免
winxpnt 发表于 2023-9-5 08:24
这个有点意思,学习了。
~零度 发表于 2023-9-5 10:39
本帖最后由 ~零度 于 2023-9-5 10:41 编辑

[Python] 纯文本查看 复制代码
import multiprocessing as mp
import pandas as pd
import numpy as np
import time
import random
import platform

if platform.platform().upper().find("WIN") >= 0:
    import msvcrt  # 适用于windows系统
    is_windows = True
else:
    import fcntl  # 适用于linux系统
    is_windows = False


def w_in():
    try:
        with open(".lock", "w") as lock_file:
            if is_windows:
                msvcrt.locking(lock_file.fileno(), msvcrt.LK_LOCK, 1)
            else:
                fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
            df = pd.DataFrame(np.random.randint(0, 100, size=(100, 4)), columns=list('ABCD'))
            df.to_hdf('data.h5', key='abc', append=True, format='table')
            if is_windows:
                msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
            else:
                fcntl.flock(lock_file, fcntl.LOCK_UN)
    except Exception as e:  # 遇到文件被其他进程占用时,随机等待一段时间
        print(e)
        time.sleep(random.random() * 3)


if __name__ == '__main__':
    p1 = mp.Process(target=w_in, name='p1')
    p2 = mp.Process(target=w_in, name='p2')
    p3 = mp.Process(target=w_in, name='p3')

    mp_list = [p1, p2, p3]
    for t in mp_list:
        t.start()

    for t in mp_list:
        t.join()

    while True:
        for idx, t in enumerate(mp_list):
            if not t.is_alive():
                mp_list[idx] = mp.Process(target=w_in, name='p'+str(idx+1))
                mp_list[idx].start()

        for t in mp_list:
            t.join()



[Python] 纯文本查看 复制代码
import multiprocessing as mp
import pandas as pd
import time
import random
import platform

if platform.platform().upper().find("WIN") >= 0:
    import msvcrt  # 适用于windows系统
    is_windows = True
else:
    import fcntl  # 适用于linux系统
    is_windows = False


def read_data():
    try:
        with open(".lock", "w") as lock_file:
            if is_windows:
                msvcrt.locking(lock_file.fileno(), msvcrt.LK_LOCK, 1)
            else:
                fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
            read = pd.read_hdf('data.h5', key='abc')
            print(read)
            if is_windows:
                msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
            else:
                fcntl.flock(lock_file, fcntl.LOCK_UN)
    except Exception as e:  # 遇到文件被其他进程占用时,随机等待一段时间
        print(e)
        time.sleep(random.random() * 3)


if __name__ == '__main__':

    p1 = mp.Process(target=read_data, name='p1')
    p2 = mp.Process(target=read_data, name='p2')
    p3 = mp.Process(target=read_data, name='p3')
    p4 = mp.Process(target=read_data, name='p4')
    p5 = mp.Process(target=read_data, name='p5')

    mp_list = [p1, p2, p3, p4, p5]
    for t in mp_list:
        t.start()

    for t in mp_list:
        t.terminate()

    for t in mp_list:
        t.join()

    while True:
        for idx, t in enumerate(mp_list):
            if not t.is_alive():
                mp_list[idx] = mp.Process(target=read_data, name='p'+str(idx+1))
                mp_list[idx].start()

        for t in mp_list:
            t.join()

免费评分

参与人数 1吾爱币 +1 热心值 +1 收起 理由
lizy169 + 1 + 1 谢谢大神!你这招是大招啊,学会灵活用不简单呢,我会努力

查看全部评分

~零度 发表于 2023-9-5 10:43
~零度 发表于 2023-9-5 10:39
[mw_shl_code=python,true]import multiprocessing as mp
import pandas as pd
import numpy as np

如果有残留的错误格式的data.h5,需要先删除,不然会报错

免费评分

参与人数 1吾爱币 +1 热心值 +1 收起 理由
lizy169 + 1 + 1 谢谢@Thanks!

查看全部评分

qiangxinglin 发表于 2023-9-5 11:17
学习了! 可能py12出来以后, 社区会开始重视多进程
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则 警告:本版块禁止回复与主题无关非技术内容,违者重罚!

快速回复 收藏帖子 返回列表 搜索

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-4-27 22:47

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表