In the following code, a task A
is created and added to a set of tasks tasks
.
I then use await asyncio.wait(tasks)
to wait for the tasks to be finished.
But this does not take in consideration task B1
which is created inside task A
(recursive function call).
Then the following code doesn't wait for B1
to finish: see the result below, the task B1
never finishes.
I think the reason is that when tasks
is evaluated on line (**), it still has one single element.
Question: how to make await asyncio.wait(tasks)
work on an evolving/growing set of tasks?
import asyncio
tasks = set()
i = 0
async def mytask(s):
global i
print('mytask %s starting' % s)
await asyncio.sleep(1)
if i < 4: # limit number of tasks
print('mytask %s creating new task' % s)
i += 1
tasks.add(asyncio.create_task(mytask('B%i' % i)))
print('mytask %s len tasks:' % s, len(tasks))
await asyncio.sleep(0.5)
print('mystak %s finished' % s)
async def main():
print('main starting')
tasks.add(asyncio.create_task(mytask('A')))
print('len tasks:', len(tasks))
await asyncio.wait(tasks) # (**)
# await asyncio.sleep(10)
print('main finished')
asyncio.run(main())
Result:
main starting
len tasks: 1
mytask A starting
mytask A creating new task
mytask A len tasks: 2
mytask B1 starting # <--- mytask B1 will never complete!
mystak A finished
main finished
If we replace line (**) by await asyncio.sleep(10)
, of course, all tasks will complete:
main starting
len tasks: 1
mytask A starting
mytask A creating new task
mytask A len tasks: 2
mytask B1 starting
mystak A finished
mytask B1 creating new task
mytask B1 len tasks: 3
mytask B2 starting
mystak B1 finished
mytask B2 creating new task
mytask B2 len tasks: 4
mytask B3 starting
mystak B2 finished
mytask B3 creating new task
mytask B3 len tasks: 5
mytask B4 starting
mystak B3 finished
mytask B4 len tasks: 5
mystak B4 finished
main finished
To start off with answering your question directly, you can wait for a dynamic set of tasks with a loop, such as:
while tasks:
prev_tasks = tasks.copy()
# use gather() so exceptions are propagated rather than discarded
await asyncio.gather(*tasks)
tasks.difference_update(prev_tasks)
But you probably don't need to do that. Instead, you can have each task wait for the subtask(s) it's created, along with its own work. That way you don't need to even have a global set of tasks, nor do you need to worry about waiting all of them in main()
:
import asyncio
i = 0
async def mytask(s):
global i
print('mytask %s starting' % s)
await asyncio.sleep(1)
if i < 4: # limit number of tasks
print('mytask %s creating new task' % s)
i += 1
task = asyncio.create_task(mytask('B%i' % i))
else:
task = None
print('mytask %s len tasks:' % s, i)
await asyncio.sleep(0.5) # our actual work
print('mystak %s finished' % s)
# after doing the work, wait for the child task if we created one
if task is not None:
await task
async def main():
print('main starting')
await mytask('A')
# await asyncio.sleep(10)
print('main finished')
asyncio.run(main())
Thakns for this great answer @user4815162342! Sidenote: I notice that it's faster (a few seconds) with your
task = asyncio.create_task(mytask('B%i' % i))
instead oftask = mytask('B%i' % i)
but why? In both cases it's an async job, and we await for it at the end ofmytask
. What's the real difference here?@Basj
create_task()
is actually needed here because it spawns the task in the background and allows it to run while we're awaiting "actual work". On the other hand, if you just callmytask()
, it is not yet submitted to the event loop, so it won't magically run while we're doing the other await. (This is one dfifference between how Python and JavaScript do async.) In other words, by removing that call tocreate_task()
, you inadvertently removed parallelism as well. See this answer for a more detailed discussion.