public static void main(String[] args) throws InterruptedException {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
ctx.register(Main.class);
ctx.refresh();
DirectChannel channel1 = ctx.getBean("channel1", DirectChannel.class);
ctx.getBean("channel2", PublishSubscribeChannel.class).subscribe(message ->
System.out.println("Output: " + message));
channel1.send(MessageBuilder.withPayload("p1")
.setHeader(CORRELATION_ID, 1)
.setHeader(SEQUENCE_SIZE,2)
.setHeader(SEQUENCE_NUMBER,1)
.setHeader("a", 1)
.build());
channel1.send(MessageBuilder.withPayload("p2")
.setHeader(CORRELATION_ID, 1)
.setHeader(SEQUENCE_SIZE,2)
.setHeader(SEQUENCE_NUMBER,2)
.setHeader("a", 2)
.build());
}
@Bean
public MessageChannel channel1() {
return MessageChannels.direct().get();
}
@Bean
public MessageChannel channel2() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public IntegrationFlow flow1() {
return IntegrationFlows
.from("channel1")
.aggregate(a -> a
.releaseStrategy(new SequenceSizeReleaseStrategy())
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true))
.channel("channel2")
.get();
}
输出:GenericMessage [有效载荷= [p1,p2],标头= {sequenceNumber = 2,a = 2,correlationId = 1,id = b5e51041-c967-1bb4-1601-7e468ae28527,sequenceSize = 2,时间戳= 1580475773518}]
标头“ a”和“ sequenceNumber”被覆盖。如何聚合具有相同标题的消息?
一定是这样
输出:GenericMessage [有效载荷= [p1,p2],标头= {sequenceNumber = [1,2],a = [1,2],correlationId = 1,id = b5e51041-c967-1bb4-1601-7e468ae28527,sequenceSize = 2 ,timestamp = 1580475773518}]
见AbstractAggregatingMessageGroupProcessor
:
/**
* Specify a {@link Function} to map {@link MessageGroup} into composed headers for output message.
* @param headersFunction the {@link Function} to use.
* @since 5.2
*/
public void setHeadersFunction(Function<MessageGroup, Map<String, Object>> headersFunction) {
并且:
/**
* The {@link Function} implementation for a default headers merging in the aggregator
* component. It takes all the unique headers from all the messages in group and removes
* those which are conflicted: have different values from different messages.
*
* @author Artem Bilan
*
* @since 5.2
*
* @see AbstractAggregatingMessageGroupProcessor
*/
public class DefaultAggregateHeadersFunction implements Function<MessageGroup, Map<String, Object>> {
或只是长期存在:
/**
* This default implementation simply returns all headers that have no conflicts among the group. An absent header
* on one or more Messages within the group is not considered a conflict. Subclasses may override this method with
* more advanced conflict-resolution strategies if necessary.
* @param group The message group.
* @return The aggregated headers.
*/
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
因此,您在aggregate()
配置中需要的是一个outputProcessor(MessageGroupProcessor outputProcessor)
选项。
请参阅文档以获取更多信息:https : //docs.spring.io/spring-integration/docs/5.2.3.RELEASE/reference/html/message-routing.html#aggregatingmessagehandler