温馨提示:本文翻译自stackoverflow.com,查看原文请点击:java - Spring Integration complete asynchronous transformation before the next
asynchronous java jpa spring-integration spring-integration-dsl

java - Spring Integration在下一步之前完成异步转换

发布于 2020-04-18 20:56:37

我有一个集成流,该流会定期轮询数据库以检索MachineLine尚未处理的所有实体并对其进行处理。该流程检索MachineLine对象的集合,然后我希望将其拆分为单个对象,将这些对象转换为ReportDetails对象,并将转换后的对象持久保存到数据库中的另一个表中。流程定义如下:

@Bean
public IntegrationFlow processMachineLine() {
    return IntegrationFlows
            .from(Jpa.inboundAdapter(this.entityManager)
                            .entityClass(MachineLine.class)
                            .jpaQuery(this.machineService.retrieveUnprocessedLinesQuery()),
                    e -> e.poller(Pollers.fixedDelay(5000)))
            .split()
            .transform(MachineLine.class, this::transformMachineLineToReportDetails)
            .handle(Jpa.outboundAdapter(this.entityManager)
                            .entityClass(ReportDetails.class),
                    ConsumerEndpointSpec::transactional)
            .get();
}

上面的定义可以正常工作,但是很慢。transformMachineLineToReportDetails方法将HTTP请求发送到另一个服务,该服务需要花费几秒钟的时间进行响应。使用当前流定义,每个MachineLine对象在对之前的对象进行转换之前都将等待对其进行转换和持久化。

因此,理想的解决方案是异步执行此转换和持久性。一种可能的解决方案是在.split()之间插入以下行.transform(...)

.channel(new ExecutorChannel(Executors.newCachedThreadPool()))

但是,这允许JPA入站适配器在处理和保留上次轮询的结果之前再次轮询数据库。这意味着上MachineLine一次数据库轮询返回的,在下一次轮询之前未进行转换和保留的任何实体都将再次检索,并尝试进行第二次转换和保留。显然,这会导致不必要的资源成本,并且当多个ReportDetails具有相同ID的对象尝试保存到数据库时,也会产生错误

有没有一种方法可以异步转换MachineLine对象,但要确保在上一次轮询的结果完成流中的访问(即所有MachineLine对象都已转换并持久化)之前,不再次轮询数据库

查看更多

提问者
Rafe
被浏览
72
Artem Bilan 2020-02-05 03:26

我通过AbstractMessageSourceAdvice针对某个AtomicBoolean标志(也可以是Bean)的自定义项查看它的唯一方法是检入beforeReceive()由于您使用的是Pollers.fixedDelay(5000)轮询策略,因此仍然是单线程的。因此,当不允许时,最好不要让同一线程对JPA执行轮询AbstractMessageSourceAdvice布尔标志应该true在乞讨中,您将其更改为false前面提到的split()您可以将a publishSubscribeChannel()作为两个订阅者使用。甚至在AbstractMessageSourceAdvice实现中执行该操作- compareAndSet(true, false)在该beforeReceive()实现中还可以。

然后,如您提到的使用,在拆分后进行拆分并保持不变ExecutorChannel

在流程的最后,您需要放置publishSubscribeChannel()两个订户-1 handle(Jpa.outboundAdapter(this.entityManager)2. aggregate()等待所有拆分项目完成。之后,aggregate()您放置一个简单的handle(m -> pollingFlagBean().set(true))

就是这样:仅当所有项目都已处理并汇总到组中时,才会进行新的轮询。只有在那之后,您才允许轮询器再次使用它AtomicBoolean

您也可以考虑将此标志逻辑与组合使用,SimpleActiveIdleMessageSourceAdvice以在主动模式和被动模式之间更改轮询周期,以避免在等待聚合时出现大量空闲。

任何其他异步解决方案仍然对您不起作用,因为切换到其他线程将立即释放轮询过程以使其再次旋转。