您需要创建一个ConcurrentKafkaListenerContainerFactory
并设置并发参数。它KafkaMessageListenerContainers
基于并发创建1个或多个。如果ContainerProperties
配置了TopicPartitions,则TopicPartitions在实例之间平均分配。
例如
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(12);
如果您使用
@KafkaListener
,则它具有一个concurrency
属性(自2.2开始),该属性将覆盖容器工厂的默认设置。在这种情况下,将为每个侦听器动态生成clientId。