Warm tip: This article is reproduced from serverfault.com, please click

How to use asyncio.wait on an growing set of tasks?

发布于 2020-12-04 01:00:29

In the following code, a task A is created and added to a set of tasks tasks.
I then use await asyncio.wait(tasks) to wait for the tasks to be finished.

But this does not take in consideration task B1 which is created inside task A (recursive function call).

Then the following code doesn't wait for B1 to finish: see the result below, the task B1 never finishes.

I think the reason is that when tasks is evaluated on line (**), it still has one single element.

Question: how to make await asyncio.wait(tasks) work on an evolving/growing set of tasks?

import asyncio

tasks = set()
i = 0

async def mytask(s):
    global i
    print('mytask %s starting' % s)
    await asyncio.sleep(1)
    if i < 4:    # limit number of tasks
        print('mytask %s creating new task' % s)
        i += 1
        tasks.add(asyncio.create_task(mytask('B%i' % i)))
    print('mytask %s len tasks:' % s, len(tasks))
    await asyncio.sleep(0.5)
    print('mystak %s finished' % s)

async def main():
    print('main starting')
    tasks.add(asyncio.create_task(mytask('A')))
    print('len tasks:', len(tasks))
    await asyncio.wait(tasks)            # (**)
    # await asyncio.sleep(10)
    print('main finished')

asyncio.run(main())

Result:

main starting
len tasks: 1
mytask A starting
mytask A creating new task
mytask A len tasks: 2
mytask B1 starting       # <--- mytask B1 will never complete!
mystak A finished
main finished

If we replace line (**) by await asyncio.sleep(10), of course, all tasks will complete:

main starting
len tasks: 1
mytask A starting
mytask A creating new task
mytask A len tasks: 2
mytask B1 starting
mystak A finished
mytask B1 creating new task
mytask B1 len tasks: 3
mytask B2 starting
mystak B1 finished
mytask B2 creating new task
mytask B2 len tasks: 4
mytask B3 starting
mystak B2 finished
mytask B3 creating new task
mytask B3 len tasks: 5
mytask B4 starting
mystak B3 finished
mytask B4 len tasks: 5
mystak B4 finished
main finished
Questioner
Basj
Viewed
0
user4815162342 2020-12-04 16:45:49

To start off with answering your question directly, you can wait for a dynamic set of tasks with a loop, such as:

while tasks:
    prev_tasks = tasks.copy()
    # use gather() so exceptions are propagated rather than discarded
    await asyncio.gather(*tasks)
    tasks.difference_update(prev_tasks)

But you probably don't need to do that. Instead, you can have each task wait for the subtask(s) it's created, along with its own work. That way you don't need to even have a global set of tasks, nor do you need to worry about waiting all of them in main():

import asyncio

i = 0

async def mytask(s):
    global i
    print('mytask %s starting' % s)
    await asyncio.sleep(1)
    if i < 4:    # limit number of tasks
        print('mytask %s creating new task' % s)
        i += 1
        task = asyncio.create_task(mytask('B%i' % i))
    else:
        task = None
    print('mytask %s len tasks:' % s, i)
    await asyncio.sleep(0.5)   # our actual work
    print('mystak %s finished' % s)
    # after doing the work, wait for the child task if we created one
    if task is not None:
        await task

async def main():
    print('main starting')
    await mytask('A')
    # await asyncio.sleep(10)
    print('main finished')

asyncio.run(main())