Spring Kafka - offsetsForTimes 方法对某些分区返回 null

问题描述 投票:0回答:2

我使用 spring-boot 2.2.11、spring-kafka 2.4.11 和 apache kafka-clients 2.4.1

我有我的消费者

implements ConsumerAwareRebalanceListener
,我试图通过调用
onPartitionsAssigned
来寻求在
offsetsForTimes
方法内的某个时间戳之后的偏移量。

我发现方法的这种奇怪行为

offsetsForTimes

当我寻找更早的时间戳时

1607922415534L
(GMT 2020 年 12 月 14 日上午 5:06:55.534 AM),如下所示:

@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
    // calling assignment just to ensure my consumer is actually assigned the partitions
    Set<TopicPartition> tps = consumer.assignment();
    Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = new HashMap<>();
    offsetsForTimes.putAll(consumer.offsetsForTimes(partitions.stream()
        .collect(Collectors.toMap(tp -> tp, epoch -> 1607922415534L))));
}

通过设置断点,我可以看到我得到了下面的地图:

{TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} "(timestamp=1607922521082, leaderEpoch=282, offset=22475886)"
{TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} "(timestamp=1607922523035, leaderEpoch=328, offset=25587551)"
{TopicPartition@5498} "My.Data.Topic-5" -> null
{TopicPartition@5500} "My.Data.Topic-4" -> {OffsetAndTimestamp@5501} "(timestamp=1607924819752, leaderEpoch=323, offset=24578937)"
{TopicPartition@5503} "My.Data.Topic-3" -> {OffsetAndTimestamp@5504} "(timestamp=1607922522143, leaderEpoch=299, offset=23439914)"
{TopicPartition@5506} "My.Data.Topic-2" -> {OffsetAndTimestamp@5507} "(timestamp=1607938218461, leaderEpoch=318, offset=23415078)"
{TopicPartition@5509} "My.Data.Topic-9" -> {OffsetAndTimestamp@5510} "(timestamp=1607922521019, leaderEpoch=298, offset=22002124)"
{TopicPartition@5512} "My.Data.Topic-8" -> {OffsetAndTimestamp@5513} "(timestamp=1607922520780, leaderEpoch=332, offset=23406692)"
{TopicPartition@5515} "My.Data.Topic-7" -> {OffsetAndTimestamp@5516} "(timestamp=1607922522800, leaderEpoch=285, offset=22215781)"
{TopicPartition@5518} "My.Data.Topic-6" -> null

但是,如果我寻找更新的时间戳,如

1607941818423L
(GMT 2020 年 12 月 14 日 10:30:18.423 AM),我会得到所有分区的偏移量:

{TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} "(timestamp=1607942934371, leaderEpoch=282, offset=22568732)"
{TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} "(timestamp=1607941818435, leaderEpoch=328, offset=25685999)"
{TopicPartition@5498} "My.Data.Topic-5" -> {OffsetAndTimestamp@5499} "(timestamp=1607941818424, leaderEpoch=309, offset=24333860)"
{TopicPartition@5501} "My.Data.Topic-4" -> {OffsetAndTimestamp@5502} "(timestamp=1607941818424, leaderEpoch=323, offset=24666385)"
{TopicPartition@5504} "My.Data.Topic-3" -> {OffsetAndTimestamp@5505} "(timestamp=1607941818433, leaderEpoch=299, offset=23529597)"
{TopicPartition@5507} "My.Data.Topic-2" -> {OffsetAndTimestamp@5508} "(timestamp=1607941818423, leaderEpoch=318, offset=23431817)"
{TopicPartition@5510} "My.Data.Topic-9" -> {OffsetAndTimestamp@5511} "(timestamp=1607941818517, leaderEpoch=298, offset=22082849)"
{TopicPartition@5513} "My.Data.Topic-8" -> {OffsetAndTimestamp@5514} "(timestamp=1607941818423, leaderEpoch=332, offset=23491462)"
{TopicPartition@5516} "My.Data.Topic-7" -> {OffsetAndTimestamp@5517} "(timestamp=1607942934371, leaderEpoch=285, offset=22306422)"
{TopicPartition@5519} "My.Data.Topic-6" -> {OffsetAndTimestamp@5520} "(timestamp=1607941818424, leaderEpoch=317, offset=24677423)"

所以我很困惑,当我第二次尝试时确实存在具有较晚时间戳的消息时,为什么寻求较旧的时间戳会给我空值?这个调用有搜索范围限制还是我做错了什么?

非常感谢!

java apache-kafka spring-kafka
2个回答
0
投票

这只是意味着没有小于或等于该时间戳的提交偏移量;在这种情况下你可以使用零。


0
投票

根据文档,如果指定时间戳中没有消息,它可以为分区返回 null。所以,如果你想调用

consumer#seek
进行分区,你需要检查它是否为空,如果是,则调用
consumer#seekToEnd
来代替。

        if (offsetAndTimestamp != null) {
          consumer.seek(partition, offsetAndTimestamp.offset)
        } else {
          consumer.seekToEnd(Collections.singletonList(partition))
        }
© www.soinside.com 2019 - 2024. All rights reserved.