如何使用 Python boto3 客户端正确读取和检查具有多个分片的 Kinesis 流?

问题描述 投票:0回答:1

AWS Kinesis 似乎是为与 Lambda 一起使用而设计的,其分片的概念与带宽和延迟相关 - 但我试图从非 Lambda 服务中读取它,这是一个长期运行的进程,最终会调用

get_records() 
循环。

但显然需要一些手动工作才能正确读取所有分片,包括处理重新分片和检查点,我认为这些工作记录不足。我的问题是:如何从单个长期运行的 Python boto3 客户端正确读取 Kinesis 流中的所有分片并处理所有特殊步骤?

amazon-web-services boto3 amazon-kinesis
1个回答
0
投票

AWS Kinesis 似乎是为与 Lambda 一起使用而设计的。

Kinesis 比 Lambda 早一年。

我的问题是:如何从单个长期运行的 Python boto3 客户端正确读取 Kinesis 流中的所有分片并处理所有特殊步骤?

简要大纲是这样的:

  1. 所有 Kinesis 事件都有一个名为
    SequenceNumber
    的属性。它是一个可以转换为十进制数的字符串。它保证在单个分片内是唯一的,但不能跨分片。
  2. 应用程序第一次启动时,您将检索流中存在的所有分片的列表。这些分片可以是父子关系:一些分片是两个父分片合并的产物;其他是单个分片分裂的产物。
  3. 每个分片都有起始序列号,也可能有结束序列号。
  4. 消息的顺序定义如下:
    1. 单个分片内的消息按序列号排序
    2. 后代分片中的消息晚于其所有祖先分片(父分片、祖父母分片等)中的消息
    3. 如果两个分片不具有可传递的子父关系,则它们之间的消息顺序是未定义的

考虑到所有这些限制,您的应用程序应该这样做:

  1. 通过调用

    ListShards
    获取流中现有分片的列表。

  2. 首先调用

    GetShardIterator
    ,然后按顺序循环调用
    GetRecords
    ,从分片中读取记录。每个
    GetRecords
    的结果是分片的一批消息、下一个迭代器令牌,以及可能的子分片列表,当当前分片耗尽时返回。

    1. 对于第一次调用
      GetShardIterator
      ,请使用
      AFTER_SEQUENCE_NUMBER
      并提供您上次处理的检查点序列号(见下文)。
    2. 如果您尚未在此分片中处理任何内容(没有检查点记录),请使用
      AT_SEQUENCE_NUMBER
      以及
      ListShards
      返回的开始 id。

    GetRecords
    可能不会返回任何记录,这是正常的。如果是这种情况,您可能需要在下一次通话之前引入延迟。

  3. 处理

    GetRecords
    返回的消息。如果某些消息无法处理,则由您的应用程序决定如何定义重试逻辑。 Kinesis 思想假设消息排序很重要,因此您可以逐条处理消息,即使它们是在一批中返回的。如果多次重试后仍无法处理某条消息,则可能需要放弃该消息并继续前进。

  4. 处理消息后,您应该检查您在分片中的位置。这是一种奇特的说法,您应该在某处写下分片 ID 和最后处理的消息的序列号。它可以是数据库、磁盘上的文件、键值存储或类似的东西。如果您的应用程序死机,下次重新启动时,它将仅从最后一个检查点位置开始处理分片。您可能希望更少地执行检查点,而不是每条消息执行一次,只要您可以在应用程序终止并重新启动的罕见情况下进行双重处理即可。

  5. 一旦分片耗尽(只有在拆分或合并时才会发生),这意味着您已经处理了它的最后一条记录,您应该将此事实记录在检查点存储中,以便可以将其从处理列表中删除。

  6. 只要您介意约束,如何并行化分片之间的负载取决于您的应用程序。如果你有两个分片1和2,合并生成分片3,那么1和2可以并行处理,但是3只能在1和2之后处理。这是这个业务中最难的部分。您可能想从这里获得一些实施它的灵感。

AWS 有一款专门为此设计的产品,称为 KCL(Kinesis 客户端库)。

不幸的是,它严格依赖 DynamoDB 作为检查点存储,并且只能与 Java 代码本机集成(尽管它有多种方法可以通过调用以任何语言编写的子进程来处理消息并通过标准进程流与它们进行通信)。

© www.soinside.com 2019 - 2024. All rights reserved.