Warm tip: This article is reproduced from stackoverflow.com, please click
apache-kafka-streams java spring-kafka

Kafka streams return all records where fieldx = some value

发布于 2020-03-27 10:29:34

I have records coming into kafka with multiple non-unique fields, lets call them Field1 ... Field n.

I want to write a query to return all of the records where fieldx = some value. Lets take the following simple example. Imagine that orders come into the system and one of the fields in an order is customerId. A basic operation would be to get all of the orders for a specific customer. How do I do this with Kafka Streams?

I already have a KTable and a materialized view of all the records, so I could just iterate through all of the records in the view and pick out the ones I want, but this seems like it would be inefficient and costly.

I would really like to create a materialized view where the view contained the records groupedby fieldx, but I don't see any way to do this. It looks like you can only use groupby with an aggregation, count, reduce, etc.

Any ideas on how to do this?

Questioner
mbluke
Viewed
29
Nishu Tayal 2019-07-05 21:41

You should group your order stream on "customerID" and aggregate all the orders in a list. Result KTable will have <CustomerId, [List of Order]> types events.

Using Interactive queries, you can query the state store,

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orderStream = builder.stream("orders");
KTable<String,ArrayList<Order>> orderTable = orderStream
      .groupBy((key,value)-> value .get("customerId"))
      .aggregate(()-> new ArrayList<Order>(),
                 (key,val,agg)-> agg.add(val),
                  Materialized.as("customer-orders")
                  .withValueSerde(ArrayListSerde())          
       ); 

It will create a materialized view "customer-orders", which you can query via rest endpoint.

You can follow the below link for Exposing KTables as Rest Endpoint :

https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html