如何仅删除已消费的消息以及如何在kafka主题中显示未消费的消息?

问题描述 投票:0回答:1

我们将一个项目从ActiveMQ迁移到Kafka。 过去我们向很多队列写入了太多的消息,消费完之后,ActiveMQ会自动删除消费的消息。只有未消费的消息仍在队列中。 我们如何在 Kafka 中实现这种行为。我们不再需要消耗数百万条消息。我们只需要查看未消费的消息。

我使用属性 log.retention.hours=1 配置 Kafka,以尽快删除所有已使用的消息。

  1. 我有一个主题 crs.reporting.intern,我首先向其发送了 2 条消息并收到它们 -> 好的。
C:\Java\apache-kafka_2.13-3.3.1\bin\windows>kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group groupid.crs.reporting.intern
GROUP                        TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                   HOST            CLIENT-ID
groupid.crs.reporting.intern crs.reporting.intern 0          2               2               0               consumer-groupid.crs.reporting.intern-12-8fc98e12-d884-4c59-b41c-768721f98e51 /172.31.0.1     consumer-groupid.crs.reporting.intern-12

  1. 我停止了消费者,正如我们所看到的,没有消费者收听该主题。
C:\Java\apache-kafka_2.13-3.3.1\bin\windows>kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group groupid.crs.reporting.intern
Consumer group 'groupid.crs.reporting.intern' has no active members.
GROUP                        TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
groupid.crs.reporting.intern crs.reporting.intern 0          2               2               0               -               -               -

  1. 我向该主题发送了第三条消息,但没有活跃的消费者成员。 我仍然看到与 #2 相同的结果
C:\Java\apache-kafka_2.13-3.3.1\bin\windows>kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group groupid.crs.reporting.intern
Consumer group 'groupid.crs.reporting.intern' has no active members.
GROUP                        TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
groupid.crs.reporting.intern crs.reporting.intern 0          2               2               0               -               -               -

  1. 等了一个多小时后,我启动了消费者。然后我看到 3 条消息全部被删除(2 条已消费,一条尚未消费)
C:\Java\apache-kafka_2.13-3.3.1\bin\windows>kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group groupid.crs.reporting.intern
GROUP                        TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                   HOST            CLIENT-ID
groupid.crs.reporting.intern crs.reporting.intern 0          2               2               0               consumer-groupid.crs.reporting.intern-60-60a4ffb9-7dea-44c6-a34e-7fb52ef61223 /172.31.0.1     consumer-groupid.crs.reporting.intern-60

问题#3:我们如何在删除第三条未使用的消息之前显示它?

问题#4:有没有办法防止删除未使用的消息? 在这种情况下,我预计只有前 2 条消费消息将被删除,而不是第三条!

虽然第三条消息仍在Kafka-Broker中,但已被删除: [数据]-发送->[Kafka生产者]-发送->[Kafka Broker]<-Read-[Kafka Consumer]-Write->[数据存储]

最诚挚的问候, 阿姆贾德

apache-kafka
1个回答
0
投票

kafka 中删除日志段(批量消息),而不是单个消息

保留配置特定于关闭主题的日志段,默认情况下,段大小为 1GB 数据(3 个事件将少于该值),

Segment删除不知道数据是否已被消耗,这就是为什么所有数据都会被删除,并且在不修改整个主题保留设置的情况下无法阻止它

© www.soinside.com 2019 - 2024. All rights reserved.