Kafka Streams:如何确保处理完成后提交偏移量

问题描述 投票:0回答:2

我想使用 Kafka 流处理 Kafka 主题中存在的消息。

处理的最后一步是将结果放入数据库表中。为了避免与数据库争用相关的问题(程序将 24*7 运行并处理数百万条消息),我将使用批处理进行 JDBC 调用。

但是在这种情况下,有可能消息丢失(在一个场景中,我从一个主题读取了 500 条消息,流会标记偏移量,现在程序失败。JDBC 批量更新中存在的消息丢失,但偏移量被标记对于这些消息)。

我想在数据库插入/更新完成后手动标记最后一条消息的偏移量,但根据以下问题这是不可能的:如何使用Kafka Stream手动提交?.

有人可以建议任何可能的解决方案吗

apache-kafka apache-kafka-streams
2个回答
4
投票

正如@sun007的答案中提到的,我宁愿稍微改变你的方法:

  • 使用Kafka Streams处理输入数据。让 Kafka Streams 应用程序将其输出写入 Kafka,而不是关系数据库。
  • 使用 Kafka Connect(例如,即用型 JDBC 连接器)将数据从 Kafka 提取到关系数据库。根据需要配置和调整连接器,例如用于批量插入数据库。

这种processing(Kafka Streams)和ingestion(Kafka Connect)的解耦通常是一种更可取的设计。例如,您不再将处理步骤与数据库的可用性结合起来:如果数据库关闭,为什么您的 KStreams 应用程序应该停止?这是一个操作问题,与处理逻辑无关,您当然不想处理超时、重试等问题。 (即使您使用 Kafka Streams 以外的工具进行处理,这种解耦仍然是一个更好的设置。)


3
投票

Kafka Stream不支持手动提交,同时也不支持批处理。 就您的用例而言,可能性很少:

  1. 使用Normal Consumer,实现批量处理并控制手动偏移。

  2. 使用 Spark Kafka 结构化流,如下所示 Kafka Spark 结构化流

  3. 尝试 Spring Kafka [Spring Kafka]2

  4. 在这种场景下,也可以考虑 JDBC Kafka Connector。 Kafka JDBC 连接器

© www.soinside.com 2019 - 2024. All rights reserved.