我想知道在交换中处理具有未知/无效路由键值的消息的最佳方法是什么?就我而言,我将所有消息发送到同一交换机内,并基于路由密钥将消息路由到相应的队列。这是我的配置(我正在使用Spring Cloud Stream):
spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.bindings.output.producer.routingKeyExpression: payload.type
spring.cloud.stream.bindings.input-type1.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type1.group: input.type1 # Queue 1
spring.cloud.stream.bindings.input-type2.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type2.group: input.type2 # Queue 2
spring.cloud.stream.rabbit.bindings.input-type1.consumer.bindingRoutingKey: FOO
spring.cloud.stream.rabbit.bindings.input-type2.consumer.bindingRoutingKey: BAR
现在,我要问的是,如果我发送消息与会发生什么payload.type='ANY'
?显然,任何消费者都不会检索到此消息,并且将其保留在交换机内部,但是跟踪这些“未知”消息的最佳方法是什么?我可以为此使用DLQ吗?
谢谢!
将保留在交易所内,
不; 交易所不“保留”消息,它们只是路由器。
默认情况下,无法路由的消息将被丢弃。
你可以配置绑定以返回无法路由的消息。
请参阅错误通道。
返回是异步的。
在即将发布的3.1版本中,你可以等待将来以确定消息是否已成功发送。请参阅发布者确认。
如果消息不可路由,returnedMessage
则设置相关数据的属性。
该框架使用mandatory
另一个答案中提到的功能。
编辑
这是一个例子:
spring.rabbitmq.publisher-returns: true
spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression: headers['rk']
spring.cloud.stream.bindings.output.producer.error-channel-enabled: true
@SpringBootApplication
@EnableBinding(Source.class)
public class So65134452Application {
public static void main(String[] args) {
SpringApplication.run(So65134452Application.class, args);
}
@Bean
public ApplicationRunner runner(MessageChannel output) {
return args -> {
output.send(MessageBuilder.withPayload("foo")
.setHeader("rk", "invalid")
.build());
};
}
@Autowired
RabbitTemplate template;
@Bean
public Queue unroutable() {
return new Queue("unroutable.messages");
}
@ServiceActivator(inputChannel = "errorChannel")
public void error(Message<?> error) {
if (error.getPayload() instanceof ReturnedAmqpMessageException) {
this.template.send(unroutable().getName(),
((ReturnedAmqpMessageException) error.getPayload()).getAmqpMessage());
}
}
}
感谢您对交流的澄清。就我而言,我希望将邮件自动重新路由到备用队列(“未路由”队列)。您是“配置绑定以返回无法路由的消息”的意思吗?是否可以仅使用Spring Cloud Stream来配置此行为?您是否碰巧有一个如何实现此目标的示例?谢谢!
@Fred没有内置功能可以执行此操作,但是可以通过几行配置和少量Java来实现。我将在美国东海岸时间上午添加一个示例。
好的,非常感谢,我很期待它。这个例子将不胜感激。
注意,
errorChannel
从所有绑定中获取错误;如果您有多个输出绑定,并希望将不可路由发送到不同的队列,请@ServiceActivator(inputChannel = "my-exchange.errors")
改用。重新发布必须在生产者端进行(这是返回的消息发送到的位置);您当然可以在使用者应用程序中配置使用者绑定以接收无法路由的消息。