我multiprocessing
在python3中是一个相对较新的世界,因此,如果以前曾问过这个问题,我感到抱歉。我有一个脚本,该脚本从N个元素的列表中对每个元素运行整个分析,并将每个映射到不同的进程。
我知道这是次优的,实际上我想提高多处理效率。我曾经map()
将每个进程都运行到,Pool()
其中可以包含用户通过命令行参数指定的尽可能多的进程。
代码如下所示:
max_processes = 7
# it is passed by command line actually but not relevant here
def main_function( ... ):
res_1 = sub_function_1( ... )
res_2 = sub_function_2( ... )
if __name__ == '__main__':
p = Pool(max_processes)
Arguments = []
for x in Paths.keys():
# generation of the arguments
...
Arguments.append( Tup_of_arguments )
p.map(main_function, Arguments)
p.close()
p.join()
如您所见,我的流程调用了一个main函数,该main函数依次调用了许多其他函数。现在,每个sub_functions都是可多重处理的。我可以将那些子功能映射到主程序运行所在的同一个池吗?
不,你不能。
该池(几乎)在工作进程中不可用。它有点取决于用于池的启动方法。
生成
一个新的Python解释器进程,并导入该模块。由于在的过程中__name__
是'__mp_main__'
,因此__name__ == '__main__'
不执行块中的代码,并且工作器中不存在任何池对象。
fork
父进程的内存空间被复制到子进程的内存空间。这有效地导致Pool
了每个工作人员的存储空间中存在一个对象。
但是,该池不可用。工作程序是在执行池的过程中创建的__init__
,因此当对工作程序进行分叉时,池的初始化是不完整的。工作进程中池的副本没有运行用于管理工作进程,任务和结果的线程。无论如何,线程不会通过进入子进程fork
。
此外,由于在初始化过程中创建了工作进程,因此该池对象尚未分配任何名称。尽管它确实潜伏在工作人员的内存空间中,但是却没有任何处理。它不会通过显示globals()
; 我只能通过gc.get_objects()
以下方式找到它:<multiprocessing.pool.Pool object at 0x7f75d8e50048>
无论如何,该池对象是主进程中该对象的副本。
forkserver
我无法测试此启动方法
为了解决您的问题,您可以在主进程中摆弄一些队列和一个队列处理程序线程,以从工作人员发送回任务并将其委派给池,但是我能想到的所有方法似乎都很笨拙。
如果您努力采用它来在池中进行处理,则很可能最终会得到更多可维护的代码。
顺便说一句:我不确定允许用户通过命令行传递工人数量是否是一个好主意。我建议os.cpu_count()
至少给该值一个上限。
感谢您的详细回答。事实上,在代码中,我将其
min(args.processes, os.cpu_count())
作为工作程序的数量,因此,如果用户在具有32个CPU的节点中指定80个工作程序,则实际上仅会使用32个。