Warm tip: This article is reproduced from serverfault.com, please click

Confluent Cloud

发布于 2020-11-19 14:18:41

I am trying to build a Java Spring Boot application that would post & get the messages from Confluent Cloud Kafka.

I followed the article for publishing a Kafka message into Confluent Cloud and it works.

Below is the implementation

KafkaController.java

package com.seroter.confluentboot.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.seroter.confluentboot.dto.Product;
import com.seroter.confluentboot.engine.Producer;

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final Producer producer;
    
    private final com.seroter.confluentboot.engine.Consumer consumer;

    @Autowired
    KafkaController(Producer producer,com.seroter.confluentboot.engine.Consumer consumer) {
        this.producer = producer;
        this.consumer=consumer;
    }

    @PostMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
        this.producer.sendMessage(message);
    }
   
    
    @PostMapping(value="/publishJson")
    public ResponseEntity<Product> publishJsonMessage(@RequestBody Product product) {
        producer.sendJsonMessage(product);
        ResponseEntity<Product> responseEntity=new ResponseEntity<>(product,HttpStatus.CREATED);
        return responseEntity;
    }
    
    
}

Product.java

package com.seroter.confluentboot.dto;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonPropertyOrder(value = {"product_id","product_name","quantity","price"})
public class Product {

    @JsonProperty(value = "product_id")
    private int productId;
    @JsonProperty(value="product_name")
    private String productName;
    
    private int quantity;
    
    private double price;
    
}

Producer.java

package com.seroter.confluentboot.engine;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Service;

import com.seroter.confluentboot.dto.Product;

@Service
@EnableBinding(Source.class)
public class Producer {

    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    private static final String TOPIC = "users";

    @Autowired
    private Source source;

    public void sendMessage(String message) {
        logger.info(String.format("#### -> Producing message -> %s", message));
        this.source.output().send(new GenericMessage<>(message));
    }
    
    
    public void sendJsonMessage(Product product)
    {
        logger.info(String.format("#### -> Producing message -> %s",product.toString()));
        this.source.output().send(new GenericMessage<>(product));
    }
    
}

ConfluentBootApplication.java

package com.seroter.confluentboot;

import org.apache.tomcat.util.net.WriteBuffer.Sink;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.seroter.confluentboot.dto.Product;


@SpringBootApplication
@EnableBinding(Source.class)
@RestController
@RequestMapping(value = "/confluent")

public class ConfluentBootApplication {
    @Autowired
    private  com.seroter.confluentboot.engine.Consumer consumer;
    
    public static void main(String[] args) {
        SpringApplication.run(ConfluentBootApplication.class, args);
    }
    
     
}

application.properties

spring.cloud.stream.kafka.binder.brokers=pkc-epwny.eastus.azure.confluent.cloud:9092
spring.cloud.stream.bindings.output.destination=test
  
spring.cloud.stream.kafka.binder.configuration.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

server.port=9000

It works

enter image description here

and I could verify

enter image description here

I want to build a Spring Boot Consumer REST Endpoint? How do I do it?

Update:

ConfluentConsumer.java

package com.seroter.confluentboot.controller;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

import com.seroter.confluentboot.dto.Product;

//@RestController
@EnableBinding(Sink.class)
public class ConfluentConsumer {
    
    @StreamListener(Sink.INPUT)
    public void consumeMessage(Product product)
    {
        System.out.println("******************************");
        System.out.println("============= "+product.getProductId()+" ================");
        System.out.println("******************************");
    }

}

Consumer.java

package com.seroter.confluentboot.engine;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Service;

@Service
@PropertySource("classpath:application.properties")
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(Producer.class);
}
Questioner
Karthikeyan Vijayakumar
Viewed
0
Govinda Sakhare 2020-11-29 17:17:45

I believe what you are trying to do here is, pick the latest message from Kafka consumer via a REST endpoint i.e. you want manually poll the Kafka topic. Publishing a message via a REST endpoint is logical, but consuming messages through an endpoint doesn't sound like a good idea. If you want a queue behavior, you should use RabbitMQ instead of Kafka.

But still, if you want to use Kafka and poll the message manually. You can use one of the below 2 approaches.

Approach 1: Create a ConsumerFactory and get a Consumer from the factory, and then poll Kafka using a Consumer

@Configuration
class KafkaConsumerConfig {

    private static final String TOPIC_NAME = "test";
    private final String userName = "username";
    private final String password = "password";

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"pkc-epwny.eastus.azure.confluent.cloud:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"conumer-gp-1");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + userName + " password=" + password);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public Consumer<String, String> createConsumer(ConsumerFactory consumerFactory) {
        Consumer consumer = consumerFactory.createConsumer("consumer-group-1", "client-1");
        consumer.subscribe(List.of(TOPIC_NAME));
        return consumer;
    }
}

You can read the topic name, group-id, bootstrap servers, SSL configs, etc. from the application.properties

Now you can consume messages by injecting the consumer in the RestController.

private final Consumer<String, String> consumer;

@Autowired
ConsumerController(Consumer<String, String> consumer) {
    this.consumer = consumer;
}

@GetMapping("retrieveMessage")
public String getMessage() {
    // Kafka might return more than 1 events so be careful
    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
    if (!consumerRecords.isEmpty()) {
        Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
        String value = iterator.next().value();
        consumer.commitSync();
        return value;
    } else {
        return "no message";
    }
}

Approach 2: store the messages in an in-memory queue and then poll the in-memory queue

spring.cloud.stream.bindings.input.destination=test

Then store the messages in a Queue and retrieve it via REST endpoint

@RestController
@EnableBinding(Sink.class)
class ConsumerController {

    private final Queue<String> queue;

    ConsumerController() {
        this.queue = new ConcurrentLinkedQueue<>();
    }


    @StreamListener(target = Sink.INPUT)
    public void consume(String message) {
        this.queue.add(message);
    }

    @GetMapping("getMessage")
    public String retrieveMessage() {
        return this.queue.poll();
    }
}

Cons: you'll lose all the in-memory messages if your application restarts. Thus, storing the messages in a distributed cache such as Redis would be a better solution.