因此,问题出在路由器上。当路由器尝试向通道发送消息时,出现错误:调度程序没有通道'newTypingNotificationHandler.input'的订阅者。但是我对此频道名称具有IntegrationFlow定义。
@Bean
public IntegrationFlow routeEcmIncomeRq(AbstractMessageRouter typeNotificationRouter) {
return IntegrationFlows.from(FROM_CHANNEL)
.routeToRecipients(r -> r
.recipientFlow(p -> p instanceof TypingNotificationDto,
f -> f.route(typeNotificationRouter)
)
.defaultOutputChannel(DEFAULT_SERVICE_CHANNEL)
).get();
}
@Bean
public AbstractMessageRouter typeNotificationRouter(IncomeRepository incomeRepository) {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
TypingNotificationDto messagePayload = (TypingNotificationDto) message.getPayload();
if (!incomeRepository.existsById(StringUtil.ecdFileIdToUuid(messagePayload.getEcdDocumentImage().getDocumentSource()))) {
return Collections.singleton(MessageChannels.direct("newTypingNotificationHandler.input").get());
} else {
return Collections.singleton(MessageChannels.direct("existsTypingNotificationHandler.input").get());
}
}
};
}
@Bean
public IntegrationFlow newTypingNotificationHandler() {
return f -> f.log("need's create new Income");
}
@Bean
public IntegrationFlow existsTypingNotificationHandler() {
return f -> f.log("exist income process");
}
stackTrace原因:
org.springframework.integration.MessageDispatchingException:Dispatcher在org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)上没有订阅者~~ spring-integration-core-5.2.6.RELEASE.jar:5.2.6 [RELEASE],位于org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)〜[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE],位于org.springframework.integration .channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)〜[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel。 java:461)〜[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE],位于org.springframework.integration.channel.AbstractMessageChannel。在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)上发送[AbstractMessageChannel.java:403)〜[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]〜 [spring-messaging-5.2.6.RELEASE.jar:5.2.6.RELEASE]位于org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)〜[spring-messaging-5.2.6.RELEASE。 jar:5.2.6.RELEASE]在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)〜[spring-messaging-5.2.6.RELEASE.jar:5.2.6.RELEASE]在org。 springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)〜[spring-messaging-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter .java:206)〜[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
你每次都返回一个新频道,而不是由Spring管理的频道
MessageChannels.direct("newTypingNotificationHandler.input").get();
用
return Collections.singleton(getChannelResolver().resolveDestination("newTypingNotificationHandler.input"));
反而。但是,最好在解决后进行缓存,而不是为每个消息返回一个新的集合。
我看不出采用这种逻辑来实现整体的理由
AbstractMessageRouter
。只需使用返回String
这些通道名称的方法的自定义POJO服务就足够了。然后在route(Object service, String methodName)
DSL API中使用此方法。谢谢,你们很多。我决定只在一项服务中处理这两种情况,而不必发送到单独的渠道。