使用KAFKA和非事务数据库进行自动扩展

问题描述 投票:0回答:3

比如,我有一个从KAFKA读取一批数据的应用程序,它使用传入消息的密钥并对HBase进行查询(从HBase读取当前数据以获取这些密钥),进行一些计算并将数据写回HBase对于同一组键。对于例如

{K1,V1},{K2,V2},{K3,V3}(来自KAFKA的传入消息) - >我的应用程序(从HBase读取K1,K2和K3的当前值,使用输入值V1,V2和V3做了一些计算,并在处理完成后将K1(V1 + x),K2(V2 + y)和K3(V3 + z)的新值写回HBase。

现在,假设我有一个分区用于KAFKA主题和一个消费者。我的应用程序有一个处理数据的消费者线程。

问题是说HBase失效,此时我的应用程序停止处理消息,并且KAFKA存在巨大的延迟。即使我有能力增加分区数量和相应的消费者数量,但由于HBase中的RACE条件,我无法增加其中任何一个。 HBase不支持行级锁定,所以现在如果我增加分区数量,相同的密钥可能会转到两个不同的分区,相应的是两个不同的消费者可能最终处于RACE状态,而最后写入的是胜利者。我必须等到所有消息都得到处理才能增加分区数量。

对于例如

HBase关闭 - >最初我有一个分区用于主题,并且在分区0中有未处理的消息 - > {K3,V3} - >现在我增加了分区的数量,而现在存在密钥K3的消息让我们说在分区0和1 - >然后消费者从分区0消费而另一个消费者从分区1消费将最终竞争写入HBase。

这个问题有解决方案吗?当然,处理消息的消费者锁定密钥K3不是解决方案,因为我们正在处理大数据。

apache-kafka kafka-consumer-api
3个回答
1
投票

增加多个分区时,只有新消息才会进入新添加的分区。 Kafka负责处理一条消息一次


1
投票

消息将仅出现在一个且仅一个kafka分区中。它在消息上使用散列函数模数分区数。我相信这个保证解决了你的问题。

但请记住,如果更改分区数,则可以将相同的消息密钥分配给不同的分区。如果您关心每个分区仅保证的消息排序,这可能很重要。如果您关心消息的排序,则重新分区(例如,增加分区数量)不是一种选择。


0
投票

正如Vassilis所说,Kafka保证单个密钥只在一个分区中。有different strategies如何在分区上分配密钥。 当您增加分区号或更改分区策略时,可能会发生可能影响正在工作的消费者的重新平衡过程。如果您停止消费者一段时间,您可以避免两个消费者处理相同密钥的可能性。

© www.soinside.com 2019 - 2024. All rights reserved.