运行
def work(repo,cpuid):
my_tool_subprocess = subprocess.Popen('./scan.py {} {}'.format(repo,cpuid),shell=True, stdout=subprocess.PIPE)
line = True
while line:
myline = my_tool_subprocess.stdout.readline()
if "scan is done" in myline:
break
num = 10 # set to the number of workers you want (it defaults to the cpu count of your machine)
tp = ThreadPool(num)
cpuid=1
for repo in repos:
tp.apply_async(work, (repo[0],"1-240"))
print('Runing {} at core {}'.format(repo[0],"1-240"))
tp.close()
tp.join()
scan.py
completed = subprocess.run(['git', 'clone', repo],env=my_env)
bunch of other subprocess.run()
# at the end:
print('returncode:', completed.returncode)
print('scan is done')
我原本希望活动进程的数量为10(10个线程),但是不知何故……不是。似乎它不等到scan.py中的最后一条语句“扫描完成”,而是遍历存储库列表(for循环),从存储库列表中克隆所有存储库。重复一遍,它不会等待克隆和处理第1至第10个存储库(保持10个进程的移动窗口),而是继续...创建其他进程和存储库克隆。
有人知道这里有什么问题吗?
尝试像这样重构代码:
在中scan.py
,将所有模块级别的代码移到一个函数中,例如:
def run(repo, cpuid):
# do whatever scan.py does given a repo path and cpuid
# instead of printing to stdout, have this return a value
如果您还仍然希望scan.py
拥有命令行界面,请添加:
import argparse
def main(argv=None):
parser = argparse.ArgumentParser()
# ... implement command-line argument parsing here
args = parser.parse_args(argv)
value = run(args.repo, args.cpuid)
print(value)
if __name__ == '__main__':
main()
现在,您可以run.py
执行以下操作:
import multiprocessing
import scan # maybe give this a more specialized name
def work(args):
repo, cpuid = args
output = scan.run(repo, cpuid)
for line in output.splitlines():
# Do whatever you want to do here...
def main():
repos = ... # You didn't show us where this comes from
pool = multiprocessing.Pool() # Or pass however many processes
pool.map(work, [(r[0], '1-240') for r in repos])
if __name__ == '__main__':
main()
这样的事情。我在这里要说明的是,如果您明智地考虑代码因素,它将使多处理变得更加简单。但是,这里的某些细节有些过时。
谢谢。会尽快尝试。在run()中,我可以顺序调用subprocess.run(),对吗?
是的,如果您正在运行多个命令并收集它们的标准输出,则可能需要将每个标准输出添加到返回的列表中,或者可以将其添加到生成器函数中,并
yield
在运行时输出每一行,然后像调用它for line in run(...): print(line)
。