我有一个集成流,该流会定期轮询数据库以检索MachineLine
尚未处理的所有实体并对其进行处理。该流程检索MachineLine
对象的集合,然后我希望将其拆分为单个对象,将这些对象转换为ReportDetails
对象,并将转换后的对象持久保存到数据库中的另一个表中。流程定义如下:
@Bean
public IntegrationFlow processMachineLine() {
return IntegrationFlows
.from(Jpa.inboundAdapter(this.entityManager)
.entityClass(MachineLine.class)
.jpaQuery(this.machineService.retrieveUnprocessedLinesQuery()),
e -> e.poller(Pollers.fixedDelay(5000)))
.split()
.transform(MachineLine.class, this::transformMachineLineToReportDetails)
.handle(Jpa.outboundAdapter(this.entityManager)
.entityClass(ReportDetails.class),
ConsumerEndpointSpec::transactional)
.get();
}
上面的定义可以正常工作,但是很慢。该transformMachineLineToReportDetails
方法将HTTP请求发送到另一个服务,该服务需要花费几秒钟的时间进行响应。使用当前流定义,每个MachineLine
对象在对之前的对象进行转换之前都将等待对其进行转换和持久化。
因此,理想的解决方案是异步执行此转换和持久性。一种可能的解决方案是在.split()
和之间插入以下行.transform(...)
:
.channel(new ExecutorChannel(Executors.newCachedThreadPool()))
但是,这允许JPA入站适配器在处理和保留上次轮询的结果之前再次轮询数据库。这意味着上MachineLine
一次数据库轮询返回的,在下一次轮询之前未进行转换和保留的任何实体都将再次检索,并尝试进行第二次转换和保留。显然,这会导致不必要的资源成本,并且当多个ReportDetails
具有相同ID的对象尝试保存到数据库时,也会产生错误。
有没有一种方法可以异步转换MachineLine
对象,但要确保在上一次轮询的结果完成流中的访问(即所有MachineLine
对象都已转换并持久化)之前,不再次轮询数据库?
我通过AbstractMessageSourceAdvice
针对某个AtomicBoolean
标志(也可以是Bean)的自定义项查看它的唯一方法是检入beforeReceive()
。由于您使用的是Pollers.fixedDelay(5000)
轮询策略,因此仍然是单线程的。因此,当不允许时,最好不要让同一线程对JPA执行轮询AbstractMessageSourceAdvice
。布尔标志应该true
在乞讨中,您将其更改为false
前面提到的split()
。您可以将a publishSubscribeChannel()
作为两个订阅者使用。甚至在AbstractMessageSourceAdvice
实现中执行该操作- compareAndSet(true, false)
在该beforeReceive()
实现中还可以。
然后,如您提到的使用,在拆分后进行拆分并保持不变ExecutorChannel
。
在流程的最后,您需要放置publishSubscribeChannel()
两个订户-1 handle(Jpa.outboundAdapter(this.entityManager)
;2. aggregate()
等待所有拆分项目完成。之后,aggregate()
您放置一个简单的handle(m -> pollingFlagBean().set(true))
。
就是这样:仅当所有项目都已处理并汇总到组中时,才会进行新的轮询。只有在那之后,您才允许轮询器再次使用它AtomicBoolean
。
您也可以考虑将此标志逻辑与组合使用,SimpleActiveIdleMessageSourceAdvice
以在主动模式和被动模式之间更改轮询周期,以避免在等待聚合时出现大量空闲。
任何其他异步解决方案仍然对您不起作用,因为切换到其他线程将立即释放轮询过程以使其再次旋转。
感谢您的答复,Artem!这是一个非常有用的答案。另外,感谢您和团队其他成员在Spring Integration方面所做的出色工作。辉煌的项目!