我有一个正在变得IO约束的过程,在该过程中,我将数据库中的大型数据集提取到pandas数据框中,然后尝试逐行进行一些处理,然后坚持到gzip文件。我正在尝试找到一种使用多重处理的方法,以便能够将gzip的创建分成多个进程,然后将它们合并到一个文件中。或者并行处理而不覆盖先前的线程。我找到了此软件包p_tqdm,但是我遇到了EOF问题,可能是因为线程相互覆盖了。这是我当前解决方案的示例:
from p_tqdm import p_map
df = pd.read_sql(some_sql, engine)
things =[]
for index, row in df.iterrows():
things.append(row)
p_map(process, things)
def process():
with gzip.open("final.gz", "wb") as f:
value = do_somthing(row)
f.write(value.encode())
我不知道,p_tqdm
但是如果我理解您的问题,可以使用轻松完成multiprocessing
。
像这样的东西
import multiprocessing
def process(row):
# take care that "do_somthing" must return class with encode() method (e.g. string)
return do_somthing(row)
df = pd.read_sql(some_sql, engine)
things =[]
for index, row in df.iterrows():
things.append(row)
with gzip.open("final.gz", "wb") as f, multiprocessing.Pool() as pool:
for processed_row in pool.imap(process, things):
f.write(processed_row.encode())
仅有几个旁注:
pandas iterrows
方法很慢-尽可能避免(请参阅pandas迭代是否存在性能问题?)。
另外,您不需要创建things
,只需将iterable传递给imap
(甚至应该直接传递df.iterrows())就可以节省一些内存。
最后,由于您似乎正在读取sql数据,为什么不直接连接到db并遍历SELECT ...
查询中的游标,pandas
完全跳过。
感谢您的回答和周到的评论。经过进一步研究,我还意识到iterrows()是性能的噩梦。我将数据框更改为dask数据框,然后传递df.to_array(),然后对此进行迭代以生成.gz文件。性能要好得多。我想知道使用df.array()传递到多处理池是否会更好。我对读取SQL表感到好奇-我不想妨碍DB的性能,但我想加快这一部分。现在,我基本上完成了所有操作,然后在数百万行上等待10秒的盲目性。有小费吗?
如我所说,直接在游标对象上进行迭代。您没有说您正在使用的数据库引擎是什么,但是,例如,如果它是类似sql的东西
cur.execute("SELECT * FROM x";)
,那么通常是可能的:然后for row in cur: ...
应该是可能的。(该行通常是一个数据元组。)检出例如docs.python.org/3.8/library/sqlite3.html。我认为多重处理不会给您带来很多性能方面的改进(它有相当大的开销),但这取决于您的用例。