Warm tip: This article is reproduced from stackoverflow.com, please click
asynchronous java jpa spring-integration spring-integration-dsl

Spring Integration complete asynchronous transformation before the next

发布于 2020-04-18 10:02:12

I have an Integration flow that regularly polls a database to retrieve any MachineLine entities that have not yet been processed and processes them. The flow retrieves a collection of MachineLine objects, which I would then like to be split into individual objects, transform these objects into ReportDetails objects and persist the transformed objects into another table in the database. The flow is defined as below:

@Bean
public IntegrationFlow processMachineLine() {
    return IntegrationFlows
            .from(Jpa.inboundAdapter(this.entityManager)
                            .entityClass(MachineLine.class)
                            .jpaQuery(this.machineService.retrieveUnprocessedLinesQuery()),
                    e -> e.poller(Pollers.fixedDelay(5000)))
            .split()
            .transform(MachineLine.class, this::transformMachineLineToReportDetails)
            .handle(Jpa.outboundAdapter(this.entityManager)
                            .entityClass(ReportDetails.class),
                    ConsumerEndpointSpec::transactional)
            .get();
}

The above definition is working fine, but it is slow. The transformMachineLineToReportDetails method sends an HTTP request to another service that takes a number of seconds to respond. With the current flow definition, each MachineLine object waits for the previous object to be transformed and persisted before the same is done to them.

So, the ideal solution would be to perform this transformation and persistence asynchronously. A possible solution would be to insert the following line between .split() and .transform(...):

.channel(new ExecutorChannel(Executors.newCachedThreadPool()))

However, this allows the JPA inbound adapter to poll the database again, before the results of the last poll are processed and persisted. This means that any MachineLine entities returned by the previous database poll that were not transformed and persisted before the next poll will be retrieved a second time and attempt to be transformed and persisted a second time. This obviously causes unnecessary resource costs and also produces an error when multiple ReportDetails objects with the same ID attempt to be persisted to the database.

Is there a way I can asynchronously transform the MachineLine objects but make sure the database is not polled again until the results of the previous poll have completed their journey through the flow (i.e. all the MachineLine objects are transformed and persisted)?

Questioner
Rafe
Viewed
51
Artem Bilan 2020-02-05 03:26

The only way I see it via a custom AbstractMessageSourceAdvice against some AtomicBoolean flag (could be a bean, too) to check in the beforeReceive(). Since you use a Pollers.fixedDelay(5000) your polling policy is still single-threaded. Therefore we are good to not let the same thread to perform a poll against JPA when it is not allowed by the AbstractMessageSourceAdvice. The boolean flag should be true in the begging and you change it to false before the mentioned split(). You can do that using a publishSubscribeChannel() as two subscribers. Or even do that in the AbstractMessageSourceAdvice implementation - kinda compareAndSet(true, false) in that beforeReceive() implementation.

Then you split and persist after transformation as you mentioned using an ExecutorChannel.

In the end of your flow you need to place a publishSubscribeChannel() with two subscribers - 1. handle(Jpa.outboundAdapter(this.entityManager); 2. aggregate() to wait for all the splitted items to be completed. After that aggregate() you place a simple handle(m -> pollingFlagBean().set(true)).

That's all: your new polling will happen only when all the items are processed and aggregated back to the group. Only after that you let you poller to go again using that AtomicBoolean.

You also may consider to combine this flag logic with a SimpleActiveIdleMessageSourceAdvice to change a polling period between active and passive modes to avoid big idle when you wait for an aggregation.

Any other async solution still won't work for your because switching to other thread will release a polling process immediately to let it to spin again.