Warm tip: This article is reproduced from serverfault.com, please click

spring-integration-kafka message-driven-channel-adapter XML config

发布于 2020-12-08 16:31:35

I am trying to write an app to consume messages from Kafka topic and then enrich the inbound message to add new headers. The versions in use are:

spring-integration-core  -- 5.3.4.RELEASE
spring-integration-kafka -- 3.3.1.RELEASE
spring-boot              -- 2.3.5.RELEASE

When in my integration XML, int-kafka:message-driven-channel-adapter doesn't allow me to supply an output channel or channel property. I am not sure how to tie this to next channel in the flow which can be a router, transformer, enricher, or anything else for that matter.

This is a sample app that will just receive a message from Kafka topic, do some transformation, enrichment, etc. and then persist the parts of message in the database.

Can you please give some XML config template for a multi-channel integration?

For the sake of completion, here's what I am trying but not able to proceed further:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:int-file="http://www.springframework.org/schema/integration/file"
    xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
    xmlns:int-xml="http://www.springframework.org/schema/integration/xml"
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
    xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
    http://www.springframework.org/schema/integration/ftp http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd
    http://www.springframework.org/schema/integration/xml http://www.springframework.org/schema/integration/xml/spring-integration-xml.xsd
    http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
    http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
    http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    
    <int-kafka:message-driven-channel-adapter
        listener-container="messageListenerContainer"
        send-timeout="5000"
        mode="record"
        retry-template="template"
        recovery-callback="callback"
        error-message-strategy="ems"
        error-channel="errorChannel" />
</beans>
Questioner
Rajnish Jain
Viewed
0
Gary Russell 2020-12-09 01:11:14

Are you having a runtime problem or an IDE problem?

There's a test case that has a channel property.

https://github.com/spring-projects/spring-integration/blob/15c2d16866a72470b35c65a34e869998f5823f81/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaMessageDrivenChannelAdapterParserTests-context.xml#L12-L23

I am seeing some false errors in eclipse right now; investigating...

EDIT

If you are using Spring Tool Suite 4, XML namespace support is disabled by default.

Enabling it solved the problem for me, but I had to restart the IDE.

enter image description here