Warm tip: This article is reproduced from serverfault.com, please click

Await for function until it receives a callback in python async IO

发布于 2020-12-04 06:25:56

I have created a GATT Client in python using the Bleak Library. I subscribe to a notification characteristic and after when I get the callback in the notification handler function, I do a write to a characteristic.Until now, after I start the notification I used put a asyncio.sleep for 5 seconds and then I would do a write on the characteristic. But this is very flaky and there is no control over the code, so I want to wait until I receive a callback in notification handler function and when I get the response, I want to do the write. I am new to python async IO and I am not sure how to achieve this. Thanks for your time and effort.

Here is the Code:


import logging
import asyncio
import platform

from bleak import BleakClient
from bleak import _logger as logger
import time

from concurrent.futures import ThreadPoolExecutor

CHARACTERISTIC_UUID = "0000c305-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_UUID_WR = "0000c303-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_UUID_RD = "0000c301-0000-1000-8000-00805f9b34fb"

_executor = ThreadPoolExecutor(1)

async def notification_handler(sender, data):
    """Simple notification handler which prints the data received."""
    print("{0}: {1}".format(sender, data))
    print(hexdump(data, 16))


def hexdump(src, length=16):
        result = []
        digits = 4 if isinstance(src, str) else 2
        for i in range(0, len(src), length):
            s = src[i:i + length]
            hexa = " ".join(map("{0:0>2X}".format, src))
            text = "".join([chr(x) if 0x20 <= x < 0x7F else "." for x in s])
            result.append("%04X   %-*s   %s" % (i, length * (digits + 1), hexa, text))
        return "\n".join(result)

async def run(address, debug=False):
    if debug:
        import sys

        l = logging.getLogger("asyncio")
        l.setLevel(logging.DEBUG)
        h = logging.StreamHandler(sys.stdout)
        h.setLevel(logging.DEBUG)
        l.addHandler(h)
        logger.addHandler(h)



    async with BleakClient(address) as client:
        x = await client.is_connected()
        logger.info("Connected: {0}".format(x))

        bleDevice = client
        await client.start_notify(CHARACTERISTIC_UUID, notification_handler)


        # before: asyncio.sleep(5)


        # now: wait here until I the notification_hanlder get a callback, then do the write

        await client.write_gatt_char(CHARACTERISTIC_UUID_WR, [0xA6, 0x00, 0x02, 0xD0, 0x01, 0x77])
        
        await client.write_gatt_char(CHARACTERISTIC_UUID_WR, [0xA6])

        await client.write_gatt_char(CHARACTERISTIC_UUID_WR, [0xA7])

if __name__ == "__main__":
    import os

    os.environ["PYTHONASYNCIODEBUG"] = str(1)
    address = (
        "48:23:35:00:14:be"  # <--- Change to your device's address here if you are using Windows or Linux
        if platform.system() != "Darwin"
        else "B9EA5233-37EF-4DD6-87A8-2A875E821C46"  # <--- Change to your device's address here if you are using macOS
    )

    # for i in range(5):
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    loop.run_until_complete(run(address, True))
Questioner
Saahiil
Viewed
0
ukBaz 2020-12-04 23:50:41

I'll start with a side note. You appear to have picked some custom UUID values that are in the reserved range for Bluetooth SIG approved UUID's. There is a helpful article on this at: https://www.novelbits.io/uuid-for-custom-services-and-characteristics/

On to your main question. I am only learning asyncio myself, but I took a look and have something running. Your write_gatt_char commands were being executed when the client was initialised, not when the notification came in. Once it has done the writes then it was exiting.

I've modified your example to use a BBC micro:bit. When button "A" notification is sent this gets the Bleak client to send a random letter to the micro:bit. While a button "B" notification disconnects from the client. I've moved the write_gatt_char into the notification code and used a create_task to execute the write. To keep the client alive until button "B" is pressed on the micro:bit I've put a while loop in the client with a sleep to stop in completing. Suspect this is not the best way of doing it.

This is the example that worked for me:

import logging
import asyncio
import platform
from random import randint

from bleak import BleakClient
from bleak import _logger as logger


BTN_A_UUID = "E95DDA90-251D-470A-A062-FA1922DFA9A8"
BTN_B_UUID = "E95DDA91-251D-470A-A062-FA1922DFA9A8"
LED_TXT_UUID = "E95D93EE-251D-470A-A062-FA1922DFA9A8"



async def run(address, debug=False):
    if debug:
        import sys

        l = logging.getLogger("asyncio")
        l.setLevel(logging.DEBUG)
        h = logging.StreamHandler(sys.stdout)
        h.setLevel(logging.DEBUG)
        l.addHandler(h)
        logger.addHandler(h)


    async with BleakClient(address) as client:
        x = await client.is_connected()
        logger.info("Connected: {0}".format(x))

        def btn_a_handler(sender, data):
            """Simple notification handler for btn a events."""
            print("{0}: {1}".format(sender, data))
            # Pick random letter to send
            if int.from_bytes(data, byteorder='little', signed=False) > 0:
                letter = [randint(99, 122)]
                loop.create_task(write_txt(letter))

        def btn_b_handler(sender, data):
            """Simple notification handler for btn b events."""
            print("{0}: {1}".format(sender, data))
            if int.from_bytes(data, byteorder='little', signed=False) > 0:
                loop.create_task(client.disconnect())

        async def write_txt(data):
            await client.write_gatt_char(LED_TXT_UUID,
                                         data)

        await client.start_notify(BTN_A_UUID, btn_a_handler)
        await client.start_notify(BTN_B_UUID, btn_b_handler)

        while await client.is_connected():
            await asyncio.sleep(1)

if __name__ == "__main__":
    address = ("E9:06:4D:45:FC:8D")

    loop = asyncio.get_event_loop()
    # loop.set_debug(True)
    loop.run_until_complete(run(address, True))