putRecords 后 AWS Kinesis 上没有数据

问题描述 投票:0回答:1

免责声明:这是我第一次使用 AWS Kinesis,因此我的预期可能不正确。

我有一个非常简单的 AWS Lambda 函数,可以将数据插入到 Kinesis 中。 API响应表明没有错误,也没有抛出异常; Kinesis 只是确认一切顺利。然而,当我进入AWS控制台并尝试查询数据时,什么也没有!

const records = [
  { partitionKey: '1', data: 'Record 1' },
  { partitionKey: '2', data: 'Record 2' },
  { partitionKey: '3', data: 'Record 3' },
]

const params = {
  Records: records.map((record) => ({
    Data: record.data,
    PartitionKey: record.partitionKey,
  })),
  StreamName: streamName,
}

const response = await kinesis.putRecords(params).promise()

const suscess = response.FailedRecordCount === 0

suscess
是真的。

因此,我尝试使用同一 Lambda 函数中的代码检索数据。将数据插入 Kinesis 后,我添加了以下代码:

const params2 = {
  ShardIteratorType: 'LATEST',
  ShardId: 'shardId-000000000000',
  StreamName: streamName,
}

const response2 = await kinesis.getShardIterator(params2).promise()
const shardIterator = response2.ShardIterator

const records2 = await kinesis.getRecords({ ShardIterator: shardIterator! }).promise()

records2
是一个空数组。

令我惊讶的是,没有返回任何记录;响应为空,没有检索到任何有用的信息。此外,没有引发任何异常。

我检查了shardId,它确实存在。

所以我的问题是,我做错了什么?

为什么 AWS Kinesis 显示已插入数据,但似乎没有任何反应?在 AWS 控制台中,我可以从使用图表中看到 put 和 get 操作中的活动。

javascript node.js amazon-web-services amazon-kinesis
1个回答
0
投票

由于您最近开始使用 Kinesis Data Streams,我建议您阅读架构和术语文档。它将让您深入了解数据如何在系统内部传输。

根据 AWS 文档:

Kinesis 数据流是一组分片。每个分片都有一系列数据记录。每个数据记录都有一个由 Kinesis Data Streams 分配的序列号。

基于此定义,Kinesis 数据流在内部使用分片来传输数据。如果您使用“按需”容量模式,它将最初分配至少 4 个分片,并根据应用程序的吞吐量增加分片数量。但是,如果您使用 Provisioned 模式创建 Kinesis Data Streams,则必须至少使用 2 个分片。因此,Kinesis 数据流始终有多个分片来处理数据流。 关于您在消费者端使用的代码,您将分片 ID 值显式硬编码为

shardId-000000000000

,但您无法确定您发布的数据是否已发送到该特定分片 ID。因此,除了检查发布者端的

FailedRecords
计数之外,还记录
putRecords
方法的完整响应,它将向您显示哪个
shardId
用于传输数据。
为了简化操作,我使用 AWS CLI 命令来发布数据,您可以轻松地解释/转换为 Node.js 以实现相同的结果。

放置记录命令:

aws kinesis put-records \ --stream-name <ENTER_STREAM_NAME_HERE> \ --records Data="Record 1",PartitionKey="1" Data="Record 2",PartitionKey="2" Data="Record 3",PartitionKey="3" --cli-binary-format raw-in-base64-out

PutRecords 响应:

{ "FailedRecordCount": 0, "Records": [ { "SequenceNumber": "49643231014796346134140772956682328171122645057423802450", "ShardId": "shardId-000000000005" }, { "SequenceNumber": "49643231014796346134140772956683537096942259686598508626", "ShardId": "shardId-000000000005" }, { "SequenceNumber": "49643231014796346134140772956684746022761874315773214802", "ShardId": "shardId-000000000005" } ] }

如果您观察上面的响应,当我执行
put-records

命令时,数据被发送到

shardId-000000000005
假设我之前没有消耗过该分片中的任何记录,因此我将使用 

TRIM_HORIZON

作为

ShardIteratorType
来获取数据指针值。
GetShardIterator 命令:

aws kinesis get-shard-iterator \ --stream-name <ENTER_STREAM_NAME_HERE> \ --shard-id shardId-000000000005 \ --shard-iterator-type TRIM_HORIZON

GetShardIterator 响应:

{ "ShardIterator": "AAAAAAAAAAH2b4HgeaV/7klnxSTYd3/T9YcQ2eKxjELpkEgXAy1k0hVidh05ZeIUdMBHo0SdJOjBq5HWwGG3dZPCKM8kTBYCWYLhv7OrC9PQo6qdRuhC8uY4LH6GEBenMgf7dzS1wD/oep8EKZvSblDYVCfcpoXT4NbWIt8D5mvx4ZlPssmyuRR92DM0ywU6PjTM8tgOoixD5kEDro/SANFc5ohKIiOHxWjUsfpgvMoJFIFtLpkgQQ==" }

上述响应包含从分片读取最旧(未修剪)数据记录的指针记录:
shardId-000000000005

。一旦我们有了迭代器值,我们就可以使用

get-records
方法来获取记录。
获取记录命令:

aws kinesis get-records \ --shard-iterator AAAAAAAAAAH2b4HgeaV/7klnxSTYd3/T9YcQ2eKxjELpkEgXAy1k0hVidh05ZeIUdMBHo0SdJOjBq5HWwGG3dZPCKM8kTBYCWYLhv7OrC9PQo6qdRuhC8uY4LH6GEBenMgf7dzS1wD/oep8EKZvSblDYVCfcpoXT4NbWIt8D5mvx4ZlPssmyuRR92DM0ywU6PjTM8tgOoixD5kEDro/SANFc5ohKIiOHxWjUsfpgvMoJFIFtLpkgQQ==

获取记录响应:

{ "Records": [ { "SequenceNumber": "49643231014796346134140772956682328171122645057423802450", "ApproximateArrivalTimestamp": "2023-08-02T22:15:34.035000+00:00", "Data": "UmVjb3JkIDE=", "PartitionKey": "1" }, { "SequenceNumber": "49643231014796346134140772956683537096942259686598508626", "ApproximateArrivalTimestamp": "2023-08-02T22:15:34.038000+00:00", "Data": "UmVjb3JkIDI=", "PartitionKey": "2" }, { "SequenceNumber": "49643231014796346134140772956684746022761874315773214802", "ApproximateArrivalTimestamp": "2023-08-02T22:15:34.038000+00:00", "Data": "UmVjb3JkIDM=", "PartitionKey": "3" } ], "NextShardIterator": "AAAAAAAAAAHIu30Hail1drAR8L9vok/zazMmRawSMqVACRymRKho+06rk6PHZ0G9JbYJLzIjUoo3UVT3XiqcfTL/QO6Dt1SJhY7p2P50V8Dhv2pkGavpNnh43114Mp4i3HAUSsYkwNRW8EJSIcJ/LZysNG1z0KLmbBp+Vau5UOj9mbZu4aU7H+97WqJkoHvK8/BC2AcMnVUlR03/xVHS8zy9fer8v6bRCjDgJMCU9CHyJamX5Douqg==", "MillisBehindLatest": 0 }

在代码中,您将 
ShardIteratorType

值用作

LATEST
,这会在分片中最后发布的记录之后创建一个指针。因此,如果您使用
LATEST
迭代器类型,请确保在发布数据之前先获取迭代器值。您还可以考虑使用其他迭代器类型,如
this
文档中所述。 我认为您现在已经确定了代码中的问题,可能存在于两个地方:

    ShardId
  1. => 发布者可能使用了与您在代码中指定的值不同的 shardId
  2. ShardIteratorType
  3. => 由于您在发布数据后执行了消费者代码,因此它会在最近发布的记录后面创建一个指针。因此,即使发布者向 shardId-0000000000000 发送消息,您也无法检索之前发布的值。
    
        
© www.soinside.com 2019 - 2024. All rights reserved.