When using asyncio for receiving multiple files over a TCP socket I struggle with the call order of received_data. When sending 3 data streams at once my output is the following:
DEBUG connection_made ('127.0.0.1', 33972) new connection was made
DEBUG connection_made ('127.0.0.1', 33974) new connection was made
DEBUG connection_made ('127.0.0.1', 33976) new connection was made
DEBUG data_received ('127.0.0.1', 33976) data received from new connection
DEBUG data_received ('127.0.0.1', 33974) data received from new connection
DEBUG data_received ('127.0.0.1', 33972) data received from new connection
I assume that its behaviour is analog to a stack where the data is received from the newest to the oldest connection made, but this is only a guess.
Is it possible to change that behaviour in a way that the data is received in the order that the connections were made? This is important because I need the data received from the first connection for further processing the following connections.
My code is the following:
import asyncio
class AIO(asyncio.Protocol):
def __init__(self):
self.extra = bytearray()
def connection_made(self, transport):
global request_time
peer = transport.get_extra_info('peername')
logger.debug('{} new connection was made'.format(peer))
self.transport = transport
request_time = str(time.time())
def data_received(self, request, msgid=1):
peer = self.transport.get_extra_info('peername')
logger.debug('{} data received from new connection'.format(peer))
self.extra.extend(request)
First, I would recommend using the higher-level streams API instead of the transport/protocols. Then if you need to maintain the order observed when connections were made, you can enforce it yourself using a series of asyncio.Event
s. For example:
import asyncio, itertools
class Comm:
def __init__(self):
self.extra = bytearray()
self.preceding_evt = None
async def serve(self, r, w):
preceding_evt = self.preceding_evt
self.preceding_evt = this_evt = asyncio.Event()
while True:
request = await r.read(4096)
# wait for the preceding connection to receive and store data
# before we store ours
if preceding_evt is not None:
await preceding_evt.wait()
preceding_evt.clear()
# self.extra now contains data from previous connections
self.extra.extend(request)
# inform the subsequent connection that we got data
this_evt.set()
w.close()
async def main():
comm = Comm()
server = await asyncio.start_server(comm.serve, '127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
What does Comm() return? And how can I include functionality that I did within eof_received before, in the higher-level streams API/in this solution?
@Daniel Since
Comm
is a class,Comm()
returns its instance. The instance created inmain()
will be shared among all the connections, just like yourAIO
class in the code in the question. The equivalent ofeof_received()
is reading an empty bytes object fromr.read()
.@Daniel Also, note that
r.read(4096)
will read as much data as provided by the kernel, but no more than 4096 bytes. There is no guarantee that one whole request will be read in one read, so you might want to take steps to ensure that the whole response was read. (The approach will depend on what the request looks like.)But why would you want to read the request in steps? Couldn't the whole response be read with
r.read()
?@Daniel
r.read()
will return all data sent by the peer until they've closed their side of the socket, i.e. until end-of-file. The consequence is that afterr.read()
nothing else can be read, so it doesn't allow the peer to continue communicating with you over the same connection. If that works for you, great, and you get to eliminate thewhile
loop as well. If not, you have to design a way to detect where your request ends. Another downside ofr.read()
is that an untrusted peer can DOS you by feeding you with infinite data which will cause your server to run out of memory.