我建立了一个新的Kinesis数据流,并向其中写入了一些数据。然后我意识到我还没有设置Kinesis Firehose,所以我也进行了设置。我希望Firehose收集以前已写入流中的数据并将其转储到S3中,但是我发现Firehose仅转储在附加流后写入流中的数据。
没有连接任何使用者时,写入Kinesis数据流的数据会怎样?该流的保留期配置为24小时,因此我假设数据仍在某处(至少现在)。它还在吗?我有什么办法去了解它吗?
是,数据保留在Kinesis流中,直到保留期到期。
[不幸的是,Kinesis Firehose无法访问它。当Firehose初始化其分片迭代器时,它将在开始运行时使用模式LATEST
开始拾取数据。似乎没有任何方法可以配置Firehose来拾取早先发送到Kinesis的记录。
但是!使用可以在TRIM_HORIZON
或AT_TIMESTAMP
模式下初始化分片迭代器的任何过程,完全有可能从Kinesis流中提取数据。可以用awscli
来完成,但是我最终用boto3编写了一个Python脚本来捕获分片中的数据。它看起来像这样。
response = client.get_shard_iterator(
StreamName=StreamName,
ShardId=ShardId,
ShardIteratorType='AT_TIMESTAMP',
Timestamp=datetime(2020, 4, 7, 1, 0, 0),
)
while True:
response = client.get_records(
ShardIterator=ShardIterator,
Limit=150
)
if response['MillisBehindLatest'] == 0:
break
for record in response['Records']:
sys.stdout.buffer.write(record['Data'])
ShardIterator = response['NextShardIterator']
[还请注意:如果迭代器在没有可用数据的时候启动,则get_records()
可能会为许多迭代返回一组空的零记录[]
,直到迭代器赶上有数据的时间为止。每当我使用TRIM_HORIZON
时,这种情况就发生了,这就是为什么我切换到使用AT_TIMESTAMP
的原因,所以我可以直接跳到数据开始的位置。