温馨提示:本文翻译自stackoverflow.com,查看原文请点击:python - Running multiprocessing/Multi threading in pika consumer and directing the data to specific datafram
python python-multithreading rabbitmq multiprocess

python - 在pika使用者中运行多处理/多线程并将数据定向到特定的datafram

发布于 2020-04-10 11:05:25

我是Python多线程/处理和RabbitMQ的新手。基本上,我有RabbitMQ消费者,可以为我提供实时医院数据。每个消息包含每个患者的生命体征。为了运行我的逻辑并用于触发警报,我需要为每个患者至少存储5条此类消息。另外,由于患者数量未知,我考虑使用多线程或多处理,以使我的警报几乎实时并按比例放大。我的方法是为每个患者创建一个全局数据框,然后将与该患者有关的消息附加到该数据框中。但是现在,我在创建多线程/进程并将数据发送到相应的患者数据框时遇到了问题。这是我的代码


bed_list=[]
thread_list=[]
bed_df={}
alarms=0

def spo2(body,bed):
    body_data= body.decode()
    print(body_data)
    packet= json.loads(body_data)
    bed_id= packet['beds'][0]['bedId']
    if bed_id=bed:
        primary_attributes= json_normalize(packet)
         '''some logic'''
        global bed_df
        bed_df[bed_id]= bed_df[bed_id].append(packet) # creating the global dataframe to store five messages
        print(bed_df[bed_id])

        ''' some other calcuation'''

            phy_channel.basic_publish(body=json.dumps(truejson),exchange='nicu')# throwing out the alarm with another queue
            bed_df[bed_id]= bed_df[bed_id].tail(4)  # resets the size of the dataframe 


def callback(ch, method, properties, body):
    body_data= body.decode()
    packet= json.loads(body_data)
    bed_id= packet['beds'][0]['bedId']
    print(bed_id)
    global bed_list
    if bed_id not in bed_list:
      bed_list.append(bed_id)


#pseudo code
 for bed in bed_list:
     proc = Process(target=spo2, args=(bed,))
     procs.append(proc)
     proc.start()

我无法找到一种方法,可以在其中为每个患者(bed_id)创建线程/进程,以便每当我收到该患者(bed_id)的消息时,都可以将其定向到该线程。我已经检查了队列,但是文档对于实现这种情况不是很清楚。

查看更多

提问者
Prem Patrick
被浏览
98
Roland Smith 2020-02-01 21:57

在走这条路线之前,您应该评估这是否是必要的。一个重要的限制是Rabbitmq带宽

构建一个单线程应用程序,并开始向其提供合成Rabbitmq消息。提高msg / s速率,直到无法维持为止。

如果该比率远高于实际发生的比率,那么您就完成了。:-)

如果不是,则开始应用程序进行性能分析,以查找花费最多时间的部分。这些是您的瓶颈。仅当您知道瓶颈所在时,您才能查看相关代码并考虑如何改善它们。

请注意,multiprocessingthreading做不同的事情并且有不同的应用程序。如果您的应用程序受到计算量的限制,那么multiprocessing可以通过将计算量分布在多个CPU内核上来提供帮助。请注意,这仅在计算彼此独立时才有效如果您的应用程序花费大量时间等待I / O,则threading可以帮助您在一个线程中进行计算而另一个线程在等待I / O。

但是就复杂性而言,它们都不是免费的。例如,threading您必须使用锁来保护数据帧的读取和写入,以使一次只能有一个线程读取或修改该数据帧。随着multiprocessing你必须发送从工作进程将数据备份到父进程。

在这种情况下,我认为这multiprocessing将是最有用的。您可以设置许多过程,每个过程负责部分床位/患者。如果rabbitmq可以有多个侦听器,则可以让每个工作进程仅处理来自其负责的患者的消息。否则,您必须将消息分发到适当的过程。现在,每个工作进程都为许多患者处理消息(并保留数据框)。当基于对数据进行的计算触发警报时,工作人员只需向父进程发送一条详细说明患者标识符和警报性质的消息。