处理 AWS Kinesis 流中的所有数据是否需要从多个分片读取?

问题描述 投票:0回答:1

我正在使用 Python 的 boto3 客户端处理来自 Kinesis 流的记录,并且我能够找到的每个文档都仅从第一个分片读取。代码通常如下所示:

response = self.kinesis_client.get_shard_iterator(
    StreamName=self.name,
    ShardId=self.details["Shards"][0]["ShardId"], # <-- this
    ShardIteratorType="LATEST",
)
shard_iter = response["ShardIterator"]
while True:
    response = self.kinesis_client.get_records(
        ShardIterator=shard_iter, Limit=10
    )
    process_records(response["Records"])
    try:
        shard_iter = response["NextShardIterator"]
    except KeyError:
        break

但就我而言,流中有 15 个分片,由其他人配置,目标是让该消费者读取流中的所有消息。当我只从第一个分片读取时,

get_records()
调用返回没有
NextShardIterator
的响应,所以我猜我必须从多个分片中提取。这个假设正确吗?

这里的最佳实践是什么?为每个分片创建一个线程并并行读取它们?处理速度足够快,可以处理该流中的所有内容。

amazon-web-services boto3 amazon-kinesis
1个回答
0
投票

处理 AWS Kinesis 流中的所有数据是否需要从多个分片读取?

TL;博士:是的。

Kinesis 是一种持久的事件流解决方案。其事件的状态(无论是否被处理)与其数据分离。数据是不可变的,并且状态根本不是 Kinesis 所关心的。这允许每个流有多个独立的订阅者。

一旦 Kinesis 摄取了您的事件,它就不知道(或关心)您是否已处理它们,或者您是否要再次处理它们。 Kinesis 流在流的保留期内存储记录(默认情况下为 24 小时,但最多可以延长一年)。有一个非常有力的保证,一旦您收到记录存在的回复,它就在那里(即最终可以在保留期内读取)。这些记录具有内在的顺序:如果记录 B 在 A 之后,则所有读者都会在 A 之后获得 B。您甚至可以通过将它们置于因果关系中,在客户端引入记录的强排序:在循环中,取前一条记录的响应中的序列号,并将其用作下一条记录的

SequenceNumberForOrdering

这些类型的保证是有代价的(货币和计算),这就是 Kinesis 流被分成碎片的原因。每个分片都有自己的吞吐量限制,您需要为每个分片单独付费。序列化保证也仅适用于单个分片内的记录。

为了平衡成本和吞吐量,Kinesis 允许动态更改单个流中的活动分片数量,而无需更改生产者设置。对于生产者来说,它只需要向特定的流发送

PutRecord
请求即可。流会自动将它们分配到分片中。

为了在不停止流的情况下无缝地完成此操作,同时保留顺序,Kinesis 支持重新分片:它通过执行称为分片拆分和分片合并的操作来更改分片数量。这两个操作都会使流停止写入旧分片,将其标记为已关闭,然后开始写入新分片。拆分或合并的事实也被持久化,子分片中的所有记录都被认为是在父分片中的所有记录之后发生的。

由于 Kinesis 不跟踪事件状态,因此它成为客户端的责任:您的处理器应跟踪每个分片的进度(“检查点”)以及拆分和合并。这不是太难,但也不是完全微不足道。

AWS 提供了一个名为 KCL(Kinesis 客户端库)的库,可以为您完成此操作。不幸的是,它仅在 Java 中可用,并且严格依赖 DynamoDB 作为处理状态的存储。它可能会调用用其他语言编写的子流程。

这里的最佳实践是什么?为每个分片创建一个线程并并行读取它们?处理速度足够快,可以处理该流中的所有内容。

这是一种方法,但是,您应该注意拆分和合并,并相应地重新平衡您的线程。

如果您使用 KCL,它将自动平衡分片和工作人员之间的映射(“租约”)。

© www.soinside.com 2019 - 2024. All rights reserved.