如何在Apache Storm中处理kafka消息时确保一次语义

问题描述 投票:0回答:1

我需要在我的应用程序中交付一次。我浏览了kafka,意识到要使消息产生一次,必须在生产者配置中设置idempotence=true。这也会设置acks=all,使生产者重新发送消息,直到所有副本都提交为止。为了确保使用者不进行重复处理或不处理任何消息,建议在同一数据库事务中将处理输出和偏移量提交给外部数据库,以便两者都将被保留,或者不保留以避免重复和不进行处理。

在使用者中,如果使用者首先提交但在处理之前失败,则留给消息处理;如果使用者首先处理但在提交之前失败,则对消息进行多次处理。

Q1。现在,我正在猜测如何使用Apache Storm进行模仿。我想通过在idemptence=true中设置KafkaBolt可以确保一次消息的产生。我对吗?

我正在猜测如何确保在Storm中丢失和重复的消息处理。例如,this doc page表示如果我锚定一个元组(通过将其作为第一个参数传递给OutputCollector.emit()),然后将该元组传递给OutputCollector.ack()OutputCollector.fail(),Storm将确保数据丢失。这就是它的确切意思:

现在您已经了解了可靠性算法,让我们仔细研究所有失败案例,然后看看Storm在每种情况下如何避免数据丢失:

  • 一个元组未被确认,因为任务死了:在这种情况下,失败元组在树根处的喷嘴元组ID将超时并被重播。

  • Acker任务死亡:在这种情况下,acker跟踪的所有spout元组都将超时并被重播。

  • Spout任务死亡:在这种情况下,与Spout交谈的源负责重播消息。例如,当客户端断开连接时,诸如Kestrel和RabbitMQ的队列会将所有待处理的消息放回队列。

Q2。我猜这可以确保不对消息进行处理,但不能避免重复处理消息。我对此正确吗? Storm还提供其他任何方法来确保我所缺少的像kafka这样的语义吗?

apache-kafka apache-storm
1个回答
1
投票

关于Q1:是的,通过设置该属性,您可以从KafkaBolt获得相同的行为,KafkaBolt只需包装KafkaProducer

关于使用方的语义,Storm的选择与Kafka的选择相同。当您从Kafka阅读邮件时,可以选择在处理之前或之后进行提交(例如,写入数据库)。如果您以前这样做,并且程序崩溃,则将丢失该消息。我们称其为at-most-once processing。如果在此之后执行此操作,则如果程序在处理之后但在提交之前崩溃,则有两次处理相同消息的风险,称为at-least-once processing

因此,关于问题2:是的,使用锚定元组和确认将为您提供at-least-once语义。不使用锚定元组将给您at-most-once

是的,Storm还提供其他功能来确保一次称为Trident的语义,但是它要求您以不同的方式编写拓扑,并且您的数据存储必须适应它,这样才能进行重复数据删除。请参阅https://storm.apache.org/releases/2.0.0/Trident-tutorial.html中的文档。

也只是警告您:当Storm(或Kafka)的文档讨论一次语义时,会做出一些假设,说明您将执行哪种处理。例如,当Storm的Trident文档仅讨论一次时,就假设您将调整数据库,以便可以在给定消息时决定是否已存储该消息。当Kafka的文档只讨论一次时,我们的假设是您的处理过程将从Kafka读取,进行一些计算(很可能没有副作用)并写回Kafka。

这只是说,对于某些类型的处理,您可能仍需要在at-least-onceat-most-once之间进行选择。如果可以使处理成为幂等,则at-least-once是一个不错的选择。

最后,如果您的处理符合“从Kafka读取,进行计算,写入Kafka”模型,则您可能会从Kafka Streams中获得比Storm更好的语义,因为Storm无法提供Kafka可以提供的完全一次语义这种情况。

© www.soinside.com 2019 - 2024. All rights reserved.