Warm tip: This article is reproduced from stackoverflow.com, please click
multiprocessing multithreading python

subprocess

发布于 2020-04-05 23:35:00

run.py

def work(repo,cpuid):
    my_tool_subprocess = subprocess.Popen('./scan.py {} {}'.format(repo,cpuid),shell=True, stdout=subprocess.PIPE)
    line = True
    while line:
        myline = my_tool_subprocess.stdout.readline()
        if "scan is done" in myline:
            break

num = 10  # set to the number of workers you want (it defaults to the cpu count of your machine)
tp = ThreadPool(num)
cpuid=1
for repo in repos:
    tp.apply_async(work, (repo[0],"1-240"))
    print('Runing {} at core {}'.format(repo[0],"1-240"))
tp.close()
tp.join()

scan.py

 completed = subprocess.run(['git', 'clone', repo],env=my_env)
 bunch of other subprocess.run()

 # at the end:

 print('returncode:', completed.returncode)
 print('scan is done')

I was expecting number of active process to be at 10 (10 threads), but somehow ... it is not. It seems it does not wait until "scan is done", last statement in scan.py but goes through the list of repos (for loop) cloning all the repos from the repos list. To repeat, it does not wait for 1st-10th repo be cloned and processed (maintaining a moving window of 10 processes) it just goes ... creating additional processes and repos clone.

Anybody has an idea what is wrong here?

Questioner
dev
Viewed
67
Iguananaut 2020-02-01 00:05

Try refactoring your code like this:

In scan.py, move all the module level code into a function e.g. like:

def run(repo, cpuid):
    # do whatever scan.py does given a repo path and cpuid
    # instead of printing to stdout, have this return a value

If you still care about scan.py having a command-line interface as well, add:

import argparse

def main(argv=None):
    parser = argparse.ArgumentParser()
    # ... implement command-line argument parsing here
    args = parser.parse_args(argv)
    value = run(args.repo, args.cpuid)
    print(value)

if __name__ == '__main__':
    main()

Now in your run.py do something like:

import multiprocessing
import scan  # maybe give this a more specialized name

def work(args):
    repo, cpuid = args
    output = scan.run(repo, cpuid)
    for line in output.splitlines():
         # Do whatever you want to do here...

def main():
    repos = ... # You didn't show us where this comes from
    pool = multiprocessing.Pool()  # Or pass however many processes
    pool.map(work, [(r[0], '1-240') for r in repos])

if __name__ == '__main__':
    main()

Something like this. The point I'm trying to make here is that if you factor your code wisely it will make multiprocessing much simpler. Some of the details here are slightly opinionated, however.