如何在加入Kafka Streams时访问原始记录

问题描述 投票:1回答:1

我有一个工作的Kafka Streams应用程序,目前正在从两个不同的主题创建两个KStream。那部分工作正常。

现在,我想加入它们,并获得第一个值的“聚合记录”和第二个值。键是简单的Java字符串,值是avro编码的GenericRecords。

根据文档,我应该能够做到这样的事情:

    KStream<String, GenericAvroSerde> joined =
        inputTopicStartKStream.leftJoin(inputTopicEndKStream,
        (left, right) -> { ??? }
        JoinWindows.of(Duration.ofHours(24)),
        Joined.with(
            stringSerde,
            genericAvroSerde,
            genericAvroSerde)
    );

但是,我在网上找到的文档或教程中并不清楚我在上面的{ ??? }部分可以做些什么。我尝试了以上的多种变体,没有运气。如果重要的话,我正在使用Kakfa Streams 2.2.0版本。

我只想拥有一个<key, merge value1 + value2>的输出流,用于使用相同的密钥在两个流上下来的记录。我可以手动合并值,但是不清楚如何访问lambda右侧的值。

java apache-kafka apache-kafka-streams
1个回答
3
投票

在ValueJoiner (left, right) -> { ??? }中,left表示来自左流的值,right表示来自右流的值

您所要做的就是在ValueJoiner中添加您的代码,如下所示:

import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;

KStream<String, GenericAvroSerde> joined =
    inputTopicStartKStream.leftJoin(inputTopicEndKStream,
    (left, right) -> {
             // You can get access to the generic Avro record by
             // casting both left and right values 
             Record leftRecord = (Record) left;
             Record rightRecord = (Record) right;

             // For the original question, you can simply create a new GenericRecord 
             // with the contents of left and right records
             GenericRecord record = new GenericData.Record(schema);
             record.put("left", left);
             record.put("right", right);
    }
    JoinWindows.of(Duration.ofHours(24)),
    Joined.with(
        stringSerde,
        genericAvroSerde,
        genericAvroSerde)
);
© www.soinside.com 2019 - 2024. All rights reserved.