I'd like to know what is the best way to handle messages with unknown/invalid routing key values inside an exchange? In my case, I'm sending all my messages inside the same exchange and based on a routing key, messages are routed to the corresponding queue. Here's my configuration (I'm using Spring Cloud Stream) :
spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.bindings.output.producer.routingKeyExpression: payload.type
spring.cloud.stream.bindings.input-type1.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type1.group: input.type1 # Queue 1
spring.cloud.stream.bindings.input-type2.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type2.group: input.type2 # Queue 2
spring.cloud.stream.rabbit.bindings.input-type1.consumer.bindingRoutingKey: FOO
spring.cloud.stream.rabbit.bindings.input-type2.consumer.bindingRoutingKey: BAR
Now what I'm asking is what happens if I send a message with payload.type='ANY'
? Obviously, this message won't be retrieved by any consumer and will remain inside the exchange, but what is the best way to keep track of these "unknown" messages? Can I use a DLQ for this?
Thanks!
will remain inside the exchange,
No; exchanges don't "hold" messages, they are simply routers.
Unroutable messages are discarded by default.
You can configure the binding to return unroutable messages.
See Error Channels.
Returns are asynchronous.
In the upcoming 3.1 release, you can wait on a future to determine whether the message was sent successfully or not. See Publisher Confirms.
If the message is unroutable, the correlation data's returnedMessage
property is set.
The framework uses the mandatory
feature mentioned in another answer.
EDIT
Here is an example:
spring.rabbitmq.publisher-returns: true
spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression: headers['rk']
spring.cloud.stream.bindings.output.producer.error-channel-enabled: true
@SpringBootApplication
@EnableBinding(Source.class)
public class So65134452Application {
public static void main(String[] args) {
SpringApplication.run(So65134452Application.class, args);
}
@Bean
public ApplicationRunner runner(MessageChannel output) {
return args -> {
output.send(MessageBuilder.withPayload("foo")
.setHeader("rk", "invalid")
.build());
};
}
@Autowired
RabbitTemplate template;
@Bean
public Queue unroutable() {
return new Queue("unroutable.messages");
}
@ServiceActivator(inputChannel = "errorChannel")
public void error(Message<?> error) {
if (error.getPayload() instanceof ReturnedAmqpMessageException) {
this.template.send(unroutable().getName(),
((ReturnedAmqpMessageException) error.getPayload()).getAmqpMessage());
}
}
}
Thanks for the clarification on exchanges. In my case, I would like the message to be re-routed automatically to an alternate queue (an "unrouted" queue). Is it what you meant by "configure the binding to return unroutable messages"? Is it possible to configure this behavior using only Spring Cloud Stream? Do you happen to have an example on how to achieve this? Thanks!
@Fred There is nothing built in to do that but it is possible with a few lines of config and a little bit of Java. I will add an example in the morning, US east coast time.
Ok thanks a lot I'm looking forward to it... this example would be very appreciated.
Note that
errorChannel
gets errors from all bindings; if you have multiple output bindings and wish to send unroutables to different queues, use@ServiceActivator(inputChannel = "my-exchange.errors")
instead.The republishing has to be done on the producer side (which is where the returned message is sent to); you can, of course, configure an consumer binding in your consumer app to receive the unroutable messages.