温馨提示:本文翻译自stackoverflow.com,查看原文请点击:java - Kafka streams return all records where fieldx = some value
apache-kafka-streams java spring-kafka

java - Kafka流返回所有记录,其中fieldx =某个值

发布于 2020-03-27 11:57:27

我有多个非唯一字段进入 Kafka 的记录,我们称它们为Field1 ... Field n。

我想编写一个查询以返回其中fieldx =某个值的所有记录。让我们举一个简单的例子。想象一下订单进入系统,订单中的一个字段是customerId。基本操作是获取特定客户的所有订单。如何使用Kafka Streams做到这一点?

我已经有一个KTable和所有记录的实例化视图,因此我可以遍历视图中的所有记录并挑选出我想要的记录,但这似乎效率低下且成本高昂。

我真的很想创建一个物化视图,其中该视图包含由fieldx分组的记录,但是我看不到任何实现此目的的方法。看来您只能在聚合,计数,减少等条件下使用groupby。

有关如何执行此操作的任何想法?

查看更多

查看更多

提问者
mbluke
被浏览
24
Nishu Tayal 2019-07-05 21:41

您应该在“ customerID”上对订单流进行分组,并将所有订单汇总到一个列表中。结果KTable将具有<CustomerId, [List of Order]>类型事件。

使用交互式查询,您可以查询状态存储,

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orderStream = builder.stream("orders");
KTable<String,ArrayList<Order>> orderTable = orderStream
      .groupBy((key,value)-> value .get("customerId"))
      .aggregate(()-> new ArrayList<Order>(),
                 (key,val,agg)-> agg.add(val),
                  Materialized.as("customer-orders")
                  .withValueSerde(ArrayListSerde())          
       ); 

它将创建一个物化视图“客户订单”,您可以通过其余端点进行查询。

您可以点击以下链接将KTables公开为休息端点:

https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html