Warm tip: This article is reproduced from serverfault.com, please click

Setting the call order of mutliple asyncio.Protocol.data_received

发布于 2020-12-01 11:38:27

When using asyncio for receiving multiple files over a TCP socket I struggle with the call order of received_data. When sending 3 data streams at once my output is the following:

DEBUG connection_made ('127.0.0.1', 33972) new connection was made
DEBUG connection_made ('127.0.0.1', 33974) new connection was made
DEBUG connection_made ('127.0.0.1', 33976) new connection was made
DEBUG data_received ('127.0.0.1', 33976) data received from new connection
DEBUG data_received ('127.0.0.1', 33974) data received from new connection
DEBUG data_received ('127.0.0.1', 33972) data received from new connection

I assume that its behaviour is analog to a stack where the data is received from the newest to the oldest connection made, but this is only a guess.

Is it possible to change that behaviour in a way that the data is received in the order that the connections were made? This is important because I need the data received from the first connection for further processing the following connections.

My code is the following:

import asyncio

class AIO(asyncio.Protocol):
    def __init__(self):
        self.extra = bytearray()

    def connection_made(self, transport):
        global request_time
        peer = transport.get_extra_info('peername')
        logger.debug('{} new connection was made'.format(peer))
        self.transport = transport
        request_time = str(time.time())

    def data_received(self, request, msgid=1):
        peer = self.transport.get_extra_info('peername')
        logger.debug('{} data received from new connection'.format(peer))
        self.extra.extend(request)
Questioner
Daniel
Viewed
0
user4815162342 2020-12-02 04:34:02

First, I would recommend using the higher-level streams API instead of the transport/protocols. Then if you need to maintain the order observed when connections were made, you can enforce it yourself using a series of asyncio.Events. For example:

import asyncio, itertools

class Comm:
    def __init__(self):
        self.extra = bytearray()
        self.preceding_evt = None

    async def serve(self, r, w):
        preceding_evt = self.preceding_evt
        self.preceding_evt = this_evt = asyncio.Event()
        while True:
            request = await r.read(4096)
            # wait for the preceding connection to receive and store data
            # before we store ours
            if preceding_evt is not None:
                await preceding_evt.wait()
                preceding_evt.clear()
            # self.extra now contains data from previous connections
            self.extra.extend(request)
            # inform the subsequent connection that we got data
            this_evt.set()
        w.close()

async def main():
    comm = Comm()
    server = await asyncio.start_server(comm.serve, '127.0.0.1', 8888)
    async with server:
        await server.serve_forever()

asyncio.run(main())