我有一个工作的Kafka Streams应用程序,目前正在从两个不同的主题创建两个KStream。那部分工作正常。
现在,我想加入它们,并获得第一个值的“聚合记录”和第二个值。键是简单的Java字符串,值是avro编码的GenericRecords。
根据文档,我应该能够做到这样的事情:
KStream<String, GenericAvroSerde> joined =
inputTopicStartKStream.leftJoin(inputTopicEndKStream,
(left, right) -> { ??? }
JoinWindows.of(Duration.ofHours(24)),
Joined.with(
stringSerde,
genericAvroSerde,
genericAvroSerde)
);
但是,我在网上找到的文档或教程中并不清楚我在上面的{ ??? }
部分可以做些什么。我尝试了以上的多种变体,没有运气。如果重要的话,我正在使用Kakfa Streams 2.2.0版本。
我只想拥有一个<key, merge value1 + value2>
的输出流,用于使用相同的密钥在两个流上下来的记录。我可以手动合并值,但是不清楚如何访问lambda右侧的值。
在ValueJoiner (left, right) -> { ??? }
中,left表示来自左流的值,right表示来自右流的值
您所要做的就是在ValueJoiner中添加您的代码,如下所示:
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
KStream<String, GenericAvroSerde> joined =
inputTopicStartKStream.leftJoin(inputTopicEndKStream,
(left, right) -> {
// You can get access to the generic Avro record by
// casting both left and right values
Record leftRecord = (Record) left;
Record rightRecord = (Record) right;
// For the original question, you can simply create a new GenericRecord
// with the contents of left and right records
GenericRecord record = new GenericData.Record(schema);
record.put("left", left);
record.put("right", right);
}
JoinWindows.of(Duration.ofHours(24)),
Joined.with(
stringSerde,
genericAvroSerde,
genericAvroSerde)
);