我是Python多线程/处理和RabbitMQ的新手。基本上,我有RabbitMQ消费者,可以为我提供实时医院数据。每个消息包含每个患者的生命体征。为了运行我的逻辑并用于触发警报,我需要为每个患者至少存储5条此类消息。另外,由于患者数量未知,我考虑使用多线程或多处理,以使我的警报几乎实时并按比例放大。我的方法是为每个患者创建一个全局数据框,然后将与该患者有关的消息附加到该数据框中。但是现在,我在创建多线程/进程并将数据发送到相应的患者数据框时遇到了问题。这是我的代码
bed_list=[]
thread_list=[]
bed_df={}
alarms=0
def spo2(body,bed):
body_data= body.decode()
print(body_data)
packet= json.loads(body_data)
bed_id= packet['beds'][0]['bedId']
if bed_id=bed:
primary_attributes= json_normalize(packet)
'''some logic'''
global bed_df
bed_df[bed_id]= bed_df[bed_id].append(packet) # creating the global dataframe to store five messages
print(bed_df[bed_id])
''' some other calcuation'''
phy_channel.basic_publish(body=json.dumps(truejson),exchange='nicu')# throwing out the alarm with another queue
bed_df[bed_id]= bed_df[bed_id].tail(4) # resets the size of the dataframe
def callback(ch, method, properties, body):
body_data= body.decode()
packet= json.loads(body_data)
bed_id= packet['beds'][0]['bedId']
print(bed_id)
global bed_list
if bed_id not in bed_list:
bed_list.append(bed_id)
#pseudo code
for bed in bed_list:
proc = Process(target=spo2, args=(bed,))
procs.append(proc)
proc.start()
我无法找到一种方法,可以在其中为每个患者(bed_id)创建线程/进程,以便每当我收到该患者(bed_id)的消息时,都可以将其定向到该线程。我已经检查了队列,但是文档对于实现这种情况不是很清楚。
在走这条路线之前,您应该评估这是否是必要的。一个重要的限制是Rabbitmq带宽。
构建一个单线程应用程序,并开始向其提供合成Rabbitmq消息。提高msg / s速率,直到无法维持为止。
如果该比率远高于实际发生的比率,那么您就完成了。:-)
如果不是,则开始对应用程序进行性能分析,以查找花费最多时间的部分。这些是您的瓶颈。仅当您知道瓶颈所在时,您才能查看相关代码并考虑如何改善它们。
请注意,multiprocessing
和threading
做不同的事情并且有不同的应用程序。如果您的应用程序受到计算量的限制,那么multiprocessing
可以通过将计算量分布在多个CPU内核上来提供帮助。请注意,这仅在计算彼此独立时才有效。如果您的应用程序花费大量时间等待I / O,则threading
可以帮助您在一个线程中进行计算而另一个线程在等待I / O。
但是就复杂性而言,它们都不是免费的。例如,threading
您必须使用锁来保护数据帧的读取和写入,以使一次只能有一个线程读取或修改该数据帧。随着multiprocessing
你必须发送从工作进程将数据备份到父进程。
在这种情况下,我认为这multiprocessing
将是最有用的。您可以设置许多过程,每个过程负责部分床位/患者。如果rabbitmq可以有多个侦听器,则可以让每个工作进程仅处理来自其负责的患者的消息。否则,您必须将消息分发到适当的过程。现在,每个工作进程都为许多患者处理消息(并保留数据框)。当基于对数据进行的计算触发警报时,工作人员只需向父进程发送一条详细说明患者标识符和警报性质的消息。
是的,由于我的床很有限,它现在可以使用。唯一的问题是,当我们扩大规模(比如说500张床)时,我不希望这成为瓶颈,我当然也不想破坏生产中的代码,无论如何,我将执行代码分析并查看情况如何出来...