I am new to Python multi-threading/processing and RabbitMQ. Basically i have a RabbitMQ consumer which feeds me real time hospital data . Each message comprises patients's vitals per patient. I need to store at least five such messages per patient in order to run my logic and use to set off an alarm. Also since number of the patients are unknown,i am thinking of Multi threading or Multiprocessing in order to keep my alarm almost real-time and scale up. My approach is to create a global dataframe for each patient and then append the messages pertaining that patient into the dataframe .But now I am having problem creating the Multi thread/process and sending the data to respective patient dataframe . Here is my code
bed_list=[]
thread_list=[]
bed_df={}
alarms=0
def spo2(body,bed):
body_data= body.decode()
print(body_data)
packet= json.loads(body_data)
bed_id= packet['beds'][0]['bedId']
if bed_id=bed:
primary_attributes= json_normalize(packet)
'''some logic'''
global bed_df
bed_df[bed_id]= bed_df[bed_id].append(packet) # creating the global dataframe to store five messages
print(bed_df[bed_id])
''' some other calcuation'''
phy_channel.basic_publish(body=json.dumps(truejson),exchange='nicu')# throwing out the alarm with another queue
bed_df[bed_id]= bed_df[bed_id].tail(4) # resets the size of the dataframe
def callback(ch, method, properties, body):
body_data= body.decode()
packet= json.loads(body_data)
bed_id= packet['beds'][0]['bedId']
print(bed_id)
global bed_list
if bed_id not in bed_list:
bed_list.append(bed_id)
#pseudo code
for bed in bed_list:
proc = Process(target=spo2, args=(bed,))
procs.append(proc)
proc.start()
I am not able to figure a way out where i can create a thread/process for each patient(bed_id) so that whenever i receive the message for that patient(bed_id) i can direct it to that thread. I have checked Queues but the documentation is not very clear as to implement this case.
Before you go down this route, you should evaluate if that is even neccessary. One important limit is the rabbitmq bandwidth.
Build a single-threaded app, and start feeding it synthetic rabbitmq messages. Increase the msg/s rate until it cannot keep up anymore.
If that rate is much higher than is likely to occur in practice, you are done. :-)
If not, then you start profiling your application to find which parts of it take the most time. Those are your bottlenecks. Only when you know what the bottlenecks are, you can look at the relevant code and think about how to improve them.
Note that multiprocessing
and threading
do different things and have different applications. If your application is limited by the amount of calculations that it can do then multiprocessing
can help by spreading out the calculations over multiple CPU cores. Note that this only works well if the calculations are independant of each other. If your application spends a lot of time waiting for I/O, threading
can help you with doing calculations in one thread while the other is waiting for I/O.
But neither are free in terms of complexity. For example with threading
you have to protect reading and writing of your dataframes with locks so that only one thread at a time can read or modify said dataframe. With multiprocessing
you have to send data from the worker processes back to the parent process.
In this case, I think that multiprocessing
would be most useful. You could set up a number of processes, each responsible for part of the beds/patients. If rabbitmq can have multiple listeners, you can have each worker process only handle the messages from patients that it is responsible for. Otherwise you have to distribute the messages to the appropriate process. Each worker process now processes the messages (and keeps the dataframes) for a number of patients. When an alert is triggered based on the calcutations done on the data, the worker only has to send a message detailing the identifier of the patient and the nature of the alert to the parent process.
Yes, it works for now since i have limited beds. Only problem is when we scale up say like say 500 beds, then i don't want this to be bottle neck and i certainly don't want to break the code in production, Any way i will perform code profiling and see how things pan out...