Warm tip: This article is reproduced from serverfault.com, please click

How to handle unknown/invalid binding routing key values in RabbitMQ?

发布于 2020-12-03 21:30:27

I'd like to know what is the best way to handle messages with unknown/invalid routing key values inside an exchange? In my case, I'm sending all my messages inside the same exchange and based on a routing key, messages are routed to the corresponding queue. Here's my configuration (I'm using Spring Cloud Stream) :

spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.bindings.output.producer.routingKeyExpression: payload.type

spring.cloud.stream.bindings.input-type1.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type1.group: input.type1 # Queue 1
spring.cloud.stream.bindings.input-type2.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type2.group: input.type2 # Queue 2

spring.cloud.stream.rabbit.bindings.input-type1.consumer.bindingRoutingKey: FOO
spring.cloud.stream.rabbit.bindings.input-type2.consumer.bindingRoutingKey: BAR

Now what I'm asking is what happens if I send a message with payload.type='ANY'? Obviously, this message won't be retrieved by any consumer and will remain inside the exchange, but what is the best way to keep track of these "unknown" messages? Can I use a DLQ for this?

Thanks!

Questioner
Fred
Viewed
0
Gary Russell 2020-12-04 22:23:33

will remain inside the exchange,

No; exchanges don't "hold" messages, they are simply routers.

Unroutable messages are discarded by default.

You can configure the binding to return unroutable messages.

See Error Channels.

Returns are asynchronous.

In the upcoming 3.1 release, you can wait on a future to determine whether the message was sent successfully or not. See Publisher Confirms.

If the message is unroutable, the correlation data's returnedMessage property is set.

The framework uses the mandatory feature mentioned in another answer.

EDIT

Here is an example:

spring.rabbitmq.publisher-returns: true

spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression: headers['rk']

spring.cloud.stream.bindings.output.producer.error-channel-enabled: true
@SpringBootApplication
@EnableBinding(Source.class)
public class So65134452Application {

    public static void main(String[] args) {
        SpringApplication.run(So65134452Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output) {
        return args -> {
            output.send(MessageBuilder.withPayload("foo")
                    .setHeader("rk", "invalid")
                    .build());
        };
    }

    @Autowired
    RabbitTemplate template;

    @Bean
    public Queue unroutable() {
        return new Queue("unroutable.messages");
    }

    @ServiceActivator(inputChannel = "errorChannel")
    public void error(Message<?> error) {
        if (error.getPayload() instanceof ReturnedAmqpMessageException) {
            this.template.send(unroutable().getName(),
                    ((ReturnedAmqpMessageException) error.getPayload()).getAmqpMessage());
        }
    }

}