Kafka:如何使用 Java API 从主题中删除记录?

问题描述 投票:0回答:6

我正在寻找一种从 Kafka 主题中删除(完全删除)已使用记录的方法。我知道有多种方法可以做到这一点,例如更改主题的保留时间或删除 Kafka-logs 文件夹。但我正在寻找一种使用 Java API 删除某个主题的一定数量记录的方法(如果可能的话)。

我尝试测试 AdminClient API,特别是 adminclient.deleteRecords(recordsToDelete) 方法。但如果我没记错的话,该方法仅更改主题中的偏移量,而不是实际从硬盘驱动器中删除所述记录。

是否有 Java API 可以真正从硬盘驱动器中删除记录?

java apache-kafka
6个回答
7
投票

我可以删除。如果 linux 位于计算机上,则会将其从硬盘中删除。当我从网上搜索时,我发现windows有一个bug。但是我在windows下找不到这个bug的解决方案。 如果 kafka 在 Linux 机器上运行,则此代码可以工作。

Windows Bug 链接https://issues.apache.org/jira/browse/KAFKA-1194

public void deleteMessages(String topicName, int partitionIndex, int beforeIndex) {
       TopicPartition topicPartition = new TopicPartition(topicName, partitionIndex);
       Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
       deleteMap.put(topicPartition, RecordsToDelete.beforeOffset(beforeIndex));
       kafkaAdminClient.deleteRecords(deleteMap);
}

5
投票

一开始这也让我有点困惑,为什么包含的 bin/kafka-delete-records.sh 能够删除,但我无法使用 Java API

缺少的部分是您需要调用 KafkaFuture.get() 因为 deleteRecords 返回 Futures 的映射

这是代码

在此代码中,您需要调用

entry.getValue().get().lowWatermark()

DeleteRecordsResult result = adminClient.deleteRecords(recordsToDelete);
Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = result.lowWatermarks();
try {
    for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : lowWatermarks.entrySet()) {
        System.out.println(entry.getKey().topic() + " " + entry.getKey().partition() + " " + entry.getValue().get().lowWatermark());
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
adminClient.close();

1
投票

Kafka 主题是不可变的,这意味着您只能向其中添加新消息。本身并没有删除。

但是,为了避免“磁盘耗尽”,Kafka 提供了两个降低主题大小的概念:保留策略和压缩。

保留 如果您的主题不需要永远保留数据,则只需设置需要保留数据的保留策略即可,即 72 小时。然后 Kafka 会自动为您删除超过 72 小时的消息。

压实 如果您确实需要数据永远保留,或者至少保留很长一段时间,但您只需要 latest 值,那么您可以将主题设置为压缩。一旦使用已存在的密钥添加新消息,这将自动删除旧消息。

规划 Kafka 架构的核心部分是思考如何将数据存储在主题中。例如,如果您将更新推送到 kafka 主题中的客户记录,假设客户的上次登录日期(非常人为的示例......),那么您只对最后一个条目感兴趣(因为所有先前的条目都不是)更长的“最后”登录)。如果此分区键是客户 ID,并且启用了日志压缩,那么一旦用户登录并且 kafka 主题收到此事件,具有相同分区键(客户 ID)的任何其他先前消息将被自动删除从主题来看。


1
投票

我在 Red Hat 7.6 上使用 Kafka 2.1.1,对

AdminClient.deleteRecords()
的调用确实有效地从 /tmp/kafka-logs 中的相应文件夹中删除了文件。剩下的唯一文件是leader-epoch-checkpoint,里面有关于最后一个记录偏移量的信息:在我的例子中是96。

请注意,在调用

AdminClient.deleteRecords()
时,您不应传递大于分区现有高水位线的偏移量。如果这样做,调用将失败并显示
"org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested offset is not within the range of offsets maintained by the server."
,但您不会知道这一点,直到您尝试通过
Future.get()
检查结果 - 有关详细信息,请参阅 Trix 的答案。


-3
投票

Kafka 不提供删除主题中特定偏移量的功能,并且没有可用的 API。


-4
投票

Kafka不支持从主题中删除记录。因此 Kafka 中的客户端基本上处于“只读”模式,无法更改缓冲区。考虑这样的情况:多个不同的客户端(不同的客户端组)读取同一主题并且每个客户端都保存自己的偏移量。如果有人开始从共享缓冲区中删除消息会发生什么?这是不可能的。

消息将根据保留策略从主题中删除。这将确保消息也按顺序删除。

© www.soinside.com 2019 - 2024. All rights reserved.