当我使用 JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(24)) 进行左连接时遇到问题
这个方法是否应该将左流中的所有记录保留24小时才能在右流中找到匹配的记录?
我注意到许多记录在被消耗到左流中几个小时后就被发出了:
这是我的代码:
public KStream<String, KafkaJobEvent> kStream(
@Value("${input-topic}") final String inputTopic,
final StreamsBuilder builder) {
final KStream<String, KafkaJobEvent> sourceStream = builder.stream(inputTopic,
Consumed.with(Serdes.ByteArray(), Serdes.ByteArray())
.withTimestampExtractor(new DefaultTimestampExtractor(logReader)))
.selectKey((k, v) -> v.getID); // re-key the stream
final Map<String, KStream<String, KafkaJobEvent>> branches = sourceStream.split(Named.as(SUB_STREAM))
.branch((key, value) -> value.getTarget().contains("A"), Branched.as("A"))
.branch((key, value) -> value.getTarget().contains("B"), Branched.as("B"))
.defaultBranch();
final KStream<String, KafkaJobEvent> streamA = branches.get(SUB_STREAM + "A");
final KStream<String, KafkaJobEvent> streamB = branches.get(SUB_STREAM +"B");
final Duration windowDuration = Duration.ofHours(24);
return streamA
.leftJoin(
streamB,
new ValueJoiner<KafkaJobEvent, KafkaJobEvent, KafkaJobEventJoinedValue>() {
@Override
public KafkaJobEventJoinedValue apply(final KafkaJobEvent left, final KafkaJobEvent right) {
return KafkaJobEventJoinedValue.builder()
.left(left)
.right(right)
.build();
}
},
JoinWindows.ofTimeDifferenceWithNoGrace(windowDuration),
StreamJoined.with(
Serdes.String(), // Key serde
kafkaJobEventSerde, // Left value serde
kafkaJobEventSerde // Right value serde
))
.filter((key, value) -> value.getRight() == null)
.mapValues(KafkaJobEventJoinedValue::getLeft)
.process(() -> new ContextualProcessor<String, KafkaJobEvent, String, KafkaJobEvent>() {
@Override
public void process(final Record<String, KafkaJobEvent> record) {
try {
log.info("the timestamp:{} for record key:{}, record value:{}",
record.timestamp(), record.key(), record.value());
// do something ...
} catch (final Exception e) {
log.error("Exception found while handling message for record value:{}", record.value(), e);
}
}
});
}
这个方法是否应该将左流中的所有记录保留24小时才能在右流中找到匹配的记录?
是的,确实如此。不仅适用于左流,还适用于右流。
我注意到许多记录在被消耗到左流中几个小时后就被发出了:
好吧,如果左输入记录尚未加入,因为没有匹配的记录到达右侧,则不会立即发出任何内容。仅当左侧输入记录在 24 小时窗口内根本没有加入时,才会在 24 小时后发出
<key, <left,null>>
结果(您的加入窗口大小,假设您不使用宽限期)。
左侧结果无法立即发出,因为左侧记录可能仍会与未来的右侧记录连接,并且发出左连接加内连接结果将是不正确的。
请注意,24 小时是“时差”,因此该窗口可以查看过去和未来。如果您希望左侧记录仅与较旧的右侧记录连接,并在未找到匹配项时以正确的方式发出左连接结果,您可以使用
JoinWindows#before(...)
(或 #after(...)
)来定义“不对称”窗户。详情请参阅JoinWindows
的JavaDocs。
有关更多详细信息,请查看https://www.confluence.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/