我有多个非唯一字段进入 Kafka 的记录,我们称它们为Field1 ... Field n。
我想编写一个查询以返回其中fieldx =某个值的所有记录。让我们举一个简单的例子。想象一下订单进入系统,订单中的一个字段是customerId。基本操作是获取特定客户的所有订单。如何使用Kafka Streams做到这一点?
我已经有一个KTable和所有记录的实例化视图,因此我可以遍历视图中的所有记录并挑选出我想要的记录,但这似乎效率低下且成本高昂。
我真的很想创建一个物化视图,其中该视图包含由fieldx分组的记录,但是我看不到任何实现此目的的方法。看来您只能在聚合,计数,减少等条件下使用groupby。
有关如何执行此操作的任何想法?
您应该在“ customerID”上对订单流进行分组,并将所有订单汇总到一个列表中。结果KTable将具有<CustomerId, [List of Order]>
类型事件。
使用交互式查询,您可以查询状态存储,
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orderStream = builder.stream("orders");
KTable<String,ArrayList<Order>> orderTable = orderStream
.groupBy((key,value)-> value .get("customerId"))
.aggregate(()-> new ArrayList<Order>(),
(key,val,agg)-> agg.add(val),
Materialized.as("customer-orders")
.withValueSerde(ArrayListSerde())
);
它将创建一个物化视图“客户订单”,您可以通过其余端点进行查询。
您可以点击以下链接将KTables公开为休息端点:
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html
这种方法的问题可能是
List of Orders
可能会变得太大而超过最大消息大小。同意,为了处理该问题,OP需要更改max.message.bytes或可能更改用于优化列表的键。我遇到了类似的问题,因此必须制作更多详细信息键以减小列表大小。
我必须稍微更改答案中的代码才能使其正常工作。1.我必须为groupby添加Serialized.with,以便它可以正确地反序列化订单。2.()-> new ArrayList <Order>()不起作用。起作用的是.aggregate(ArrayList :: new,(newKey,val,agg)-> {agg.add(val); return agg;},