温馨提示:本文翻译自stackoverflow.com,查看原文请点击:arrays - How to use multiprocessing to create gzip file from dataframe in python
arrays multithreading python-3.x

arrays - 如何使用多重处理从python中的数据帧创建gzip文件

发布于 2020-03-29 13:11:22

我有一个正在变得IO约束的过程,在该过程中,我将数据库中的大型数据集提取到pandas数据框中,然后尝试逐行进行一些处理,然后坚持到gzip文件。我正在尝试找到一种使用多重处理的方法,以便能够将gzip的创建分成多个进程,然后将它们合并到一个文件中。或者并行处理而不覆盖先前的线程。我找到了此软件包p_tqdm,但是我遇到了EOF问题,可能是因为线程相互覆盖了。这是我当前解决方案的示例:

from p_tqdm import p_map

df = pd.read_sql(some_sql, engine)
things =[]
for index, row in df.iterrows():
    things.append(row)    
p_map(process, things)

def process():
    with gzip.open("final.gz", "wb") as f:
        value = do_somthing(row)
        f.write(value.encode())

查看更多

查看更多

提问者
dweeb
被浏览
89
Marek Schwarz 2020-01-31 17:24

我不知道,p_tqdm但是如果我理解您的问题,可以使用轻松完成multiprocessing

像这样的东西

import multiprocessing

def process(row):
    # take care that "do_somthing" must return class with encode() method (e.g. string)
    return do_somthing(row)

df = pd.read_sql(some_sql, engine)
things =[]
for index, row in df.iterrows():
    things.append(row)


with gzip.open("final.gz", "wb") as f, multiprocessing.Pool() as pool:
    for processed_row in pool.imap(process, things):
        f.write(processed_row.encode())

仅有几个旁注:

  • pandas iterrows方法很慢-尽可能避免(请参阅pandas迭代是否存在性能问题?)。

  • 另外,您不需要创建things,只需将iterable传递给imap(甚至应该直接传递df.iterrows())就可以节省一些内存。

  • 最后,由于您似乎正在读取sql数据,为什么不直接连接到db并遍历SELECT ...查询中的游标pandas完全跳过