Warm tip: This article is reproduced from serverfault.com, please click

java-如何处理RabbitMQ中未知/无效的绑定路由键值?

(java - How to handle unknown/invalid binding routing key values in RabbitMQ?)

发布于 2020-12-03 21:30:27

我想知道在交换中处理具有未知/无效路由键值的消息的最佳方法是什么?就我而言,我将所有消息发送到同一交换机内,并基于路由密钥将消息路由到相应的队列。这是我的配置(我正在使用Spring Cloud Stream):

spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.bindings.output.producer.routingKeyExpression: payload.type

spring.cloud.stream.bindings.input-type1.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type1.group: input.type1 # Queue 1
spring.cloud.stream.bindings.input-type2.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type2.group: input.type2 # Queue 2

spring.cloud.stream.rabbit.bindings.input-type1.consumer.bindingRoutingKey: FOO
spring.cloud.stream.rabbit.bindings.input-type2.consumer.bindingRoutingKey: BAR

现在,我要问的是,如果我发送消息与会发生什么payload.type='ANY'显然,任何消费者都不会检索到此消息,并且将其保留在交换机内部,但是跟踪这些“未知”消息的最佳方法是什么?我可以为此使用DLQ吗?

谢谢!

Questioner
Fred
Viewed
0
Gary Russell 2020-12-04 22:23:33

将保留在交易所内,

不; 交易所不“保留”消息,它们只是路由器。

默认情况下,无法路由的消息将被丢弃。

你可以配置绑定以返回无法路由的消息。

请参阅错误通道

返回是异步的。

在即将发布的3.1版本中,你可以等待将来以确定消息是否已成功发送。请参阅发布者确认

如果消息不可路由,returnedMessage则设置相关数据的属性。

该框架使用mandatory另一个答案中提到功能。

编辑

这是一个例子:

spring.rabbitmq.publisher-returns: true

spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression: headers['rk']

spring.cloud.stream.bindings.output.producer.error-channel-enabled: true
@SpringBootApplication
@EnableBinding(Source.class)
public class So65134452Application {

    public static void main(String[] args) {
        SpringApplication.run(So65134452Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output) {
        return args -> {
            output.send(MessageBuilder.withPayload("foo")
                    .setHeader("rk", "invalid")
                    .build());
        };
    }

    @Autowired
    RabbitTemplate template;

    @Bean
    public Queue unroutable() {
        return new Queue("unroutable.messages");
    }

    @ServiceActivator(inputChannel = "errorChannel")
    public void error(Message<?> error) {
        if (error.getPayload() instanceof ReturnedAmqpMessageException) {
            this.template.send(unroutable().getName(),
                    ((ReturnedAmqpMessageException) error.getPayload()).getAmqpMessage());
        }
    }

}