I have records coming into kafka with multiple non-unique fields, lets call them Field1 ... Field n.
I want to write a query to return all of the records where fieldx = some value. Lets take the following simple example. Imagine that orders come into the system and one of the fields in an order is customerId. A basic operation would be to get all of the orders for a specific customer. How do I do this with Kafka Streams?
I already have a KTable and a materialized view of all the records, so I could just iterate through all of the records in the view and pick out the ones I want, but this seems like it would be inefficient and costly.
I would really like to create a materialized view where the view contained the records groupedby fieldx, but I don't see any way to do this. It looks like you can only use groupby with an aggregation, count, reduce, etc.
Any ideas on how to do this?
You should group your order stream on "customerID" and aggregate all the orders in a list. Result KTable will have <CustomerId, [List of Order]>
types events.
Using Interactive queries, you can query the state store,
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orderStream = builder.stream("orders");
KTable<String,ArrayList<Order>> orderTable = orderStream
.groupBy((key,value)-> value .get("customerId"))
.aggregate(()-> new ArrayList<Order>(),
(key,val,agg)-> agg.add(val),
Materialized.as("customer-orders")
.withValueSerde(ArrayListSerde())
);
It will create a materialized view "customer-orders", which you can query via rest endpoint.
You can follow the below link for Exposing KTables as Rest Endpoint :
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html
The problem of this approach might be, that the
List of Orders
may grow too large and exceed the maximum message size.Agree, in order to handle that problem, OP needs to change the max.message.bytes or potentially change the keys for optimizing the list. I had the similar issue so had to make more details keys to reduce the list size.
I had to change the code in the answer a bit to make it work. 1. I had to add in Serialized.with for teh groupby so that it would de/serialize the orders correctly 2. ()-> new ArrayList<Order>() didn't work. What worked was .aggregate(ArrayList::new, (newKey,val,agg) -> { agg.add(val); return agg; },