Warm tip: This article is reproduced from stackoverflow.com, please click
python python-multithreading rabbitmq multiprocess

Running multiprocessing/Multi threading in pika consumer and directing the data to specific datafram

发布于 2020-04-10 10:15:47

I am new to Python multi-threading/processing and RabbitMQ. Basically i have a RabbitMQ consumer which feeds me real time hospital data . Each message comprises patients's vitals per patient. I need to store at least five such messages per patient in order to run my logic and use to set off an alarm. Also since number of the patients are unknown,i am thinking of Multi threading or Multiprocessing in order to keep my alarm almost real-time and scale up. My approach is to create a global dataframe for each patient and then append the messages pertaining that patient into the dataframe .But now I am having problem creating the Multi thread/process and sending the data to respective patient dataframe . Here is my code


bed_list=[]
thread_list=[]
bed_df={}
alarms=0

def spo2(body,bed):
    body_data= body.decode()
    print(body_data)
    packet= json.loads(body_data)
    bed_id= packet['beds'][0]['bedId']
    if bed_id=bed:
        primary_attributes= json_normalize(packet)
         '''some logic'''
        global bed_df
        bed_df[bed_id]= bed_df[bed_id].append(packet) # creating the global dataframe to store five messages
        print(bed_df[bed_id])

        ''' some other calcuation'''

            phy_channel.basic_publish(body=json.dumps(truejson),exchange='nicu')# throwing out the alarm with another queue
            bed_df[bed_id]= bed_df[bed_id].tail(4)  # resets the size of the dataframe 


def callback(ch, method, properties, body):
    body_data= body.decode()
    packet= json.loads(body_data)
    bed_id= packet['beds'][0]['bedId']
    print(bed_id)
    global bed_list
    if bed_id not in bed_list:
      bed_list.append(bed_id)


#pseudo code
 for bed in bed_list:
     proc = Process(target=spo2, args=(bed,))
     procs.append(proc)
     proc.start()

I am not able to figure a way out where i can create a thread/process for each patient(bed_id) so that whenever i receive the message for that patient(bed_id) i can direct it to that thread. I have checked Queues but the documentation is not very clear as to implement this case.

Questioner
Prem Patrick
Viewed
170
Roland Smith 2020-02-01 21:57

Before you go down this route, you should evaluate if that is even neccessary. One important limit is the rabbitmq bandwidth.

Build a single-threaded app, and start feeding it synthetic rabbitmq messages. Increase the msg/s rate until it cannot keep up anymore.

If that rate is much higher than is likely to occur in practice, you are done. :-)

If not, then you start profiling your application to find which parts of it take the most time. Those are your bottlenecks. Only when you know what the bottlenecks are, you can look at the relevant code and think about how to improve them.

Note that multiprocessing and threading do different things and have different applications. If your application is limited by the amount of calculations that it can do then multiprocessing can help by spreading out the calculations over multiple CPU cores. Note that this only works well if the calculations are independant of each other. If your application spends a lot of time waiting for I/O, threading can help you with doing calculations in one thread while the other is waiting for I/O.

But neither are free in terms of complexity. For example with threading you have to protect reading and writing of your dataframes with locks so that only one thread at a time can read or modify said dataframe. With multiprocessing you have to send data from the worker processes back to the parent process.

In this case, I think that multiprocessing would be most useful. You could set up a number of processes, each responsible for part of the beds/patients. If rabbitmq can have multiple listeners, you can have each worker process only handle the messages from patients that it is responsible for. Otherwise you have to distribute the messages to the appropriate process. Each worker process now processes the messages (and keeps the dataframes) for a number of patients. When an alert is triggered based on the calcutations done on the data, the worker only has to send a message detailing the identifier of the patient and the nature of the alert to the parent process.