Warm tip: This article is reproduced from serverfault.com, please click

What is right way to call async code in regular callback?

发布于 2016-10-25 13:16:25

What is right way to call async code in regular callback? This code works, but looks not pretty. I do not like how respond called: it needs to pass caller address through all functions. How to set timeout to handler?

I placed questions in comments.

import asyncio
import logging
logger = logging.getLogger('protocol')

async def echo(data):
    #external lib emulator
    data = b'>'+data
    await asyncio.sleep(1)
    return data


class Protocol(asyncio.DatagramProtocol):
    def connection_made(self, transport):
        logger.debug('connection_made called')
        self.transport = transport

    def respond(self,task):
        logger.debug('respond called')
        # i want to get data in attrs, not task

        resp,caller = task.result()

        self.transport.sendto(resp, caller)


    async def handler(self,data, caller):
        logger.debug('handler called')

        # async needed for `await` and `async for` external library such motor, etc
        # do some awaits
        data = await echo(data)

        # simple echo
        return (data, caller)


    def datagram_received(self, data, addr):
        logger.debug('datagram_received called')

        # handler selected by data header
        handler = self.handler

        # how to do run async handler?
        loop = asyncio.get_event_loop()
        c = handler(data, addr) #coroutine

        f = asyncio.ensure_future(c,loop=loop) #attach coroutine to loop
        f.add_done_callback(self.respond)

        # How to get response here?

        # i cant loop.run_until_complete(...) because loop.run_forever() running

        #def wakeup():
        #    pass
        #loop.call_soon(wakeup)
        # without this call_soon future is not executed in first programm code, but works in test and after refactor


def main(HOST,PORT):

    loop = asyncio.get_event_loop()
    t = asyncio.Task(loop.create_datagram_endpoint(
        Protocol, local_addr=(HOST,PORT)))
    transport, server = loop.run_until_complete(t)

    sock = transport.get_extra_info('socket')
    # socket tuning here

    try:
        loop.run_forever()
    finally:
        transport.close()
        loop.close()

logging.basicConfig(level=logging.DEBUG)     
main('0.0.0.0',10012)

Use netcat to test nc -u 127.0.0.1 10012

Questioner
eri
Viewed
0
eri 2021-03-20 19:07:40

Best way i found is create task on each datagram with async function

class Protocol(AbstractProtocol):

    async def datagram_received_async(self, data, addr):
        await a()
        await b()

    def datagram_received(self, data, addr):
        self.loop.create_task(self.datagram_received_async( data, addr))

Futures has warnings that not awaited. Tasks ok. Queue not needed in this case.