在Kinesis中使用分区键来确保具有相同键的记录由相同的记录处理器(lambda)处理]]

问题描述 投票:8回答:1

我正在使用AWS kinesis和lambda在实时数据管道上工作,我试图弄清楚如何才能保证相同数据生产者的记录由相同的分片,最终由相同的lambda函数实例处理。

我的方法是使用分区键来确保来自相同生产者的记录由相同的分片处理。但是,我无法通过相同的lambda函数实例来处理来自同一分片的记录。

基本设置如下:

  • 有多个数据源将数据发送到运动流。
  • 该流具有多个分片来处理负载。
  • 有一个通过事件源映射(批量大小为500)连接到尖叫的lambda函数。
  • lambda函数正在处理记录,进行一些数据转换和其他一些操作,然后将所有内容放入firehose。
  • 稍后会有更多事情发生,但这与问题无关。
  • 看起来像:

enter image description here

您可以在图中看到,有三个lambda函数实例被调用进行处理;每个分片一个。在此管道中,由相同的lambda函数实例处理来自相同数据源的记录非常重要。根据我阅读的内容,可以确保来自同一源的所有记录都使用相同的分区键,以便由相同的分片进行处理,从而可以保证这一点。

分区键

分区键用于在一个分区内按碎片对数据进行分组流。 Kinesis Data Streams服务隔离数据记录使用分区键将流属于多个分片与每个数据记录相关联以确定给定数据的分片记录属于。分区键是最大长度为Unicode的字符串长度限制为256个字节。 MD5哈希函数用于映射将键分区为128位整数值并映射相关数据记录到碎片。当应用程序将数据放入流中时,它会必须指定分区键。

来源:https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key

这有效。因此具有相同分区键的记录由相同的分片处理

。但是,它们由不同的lambda函数实例处理。因此,每个分片都会调用一个lambda函数实例,但它不仅处理来自一个分片的记录,还处理来自多个分片的记录。这里似乎没有任何模式可以将记录移交给lambda。

这是我的测试设置:我向流中发送了一堆测试数据,并在lambda函数中打印了记录。这是三个功能实例的输出(检查每行末尾的分区键。每个键应仅出现在三个日志之一中,而不应出现在多个日志中):

Lambda实例1:

{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'}

Lambda实例2:

{'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'}
{'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'}
{'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'}
{'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'}
{'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'}

Lambda实例3:

{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}

这是我将数据插入流中的方式(如您所见,分区键设置为源ID):

processed_records = []
for r in records:
    processed_records.append({
        'PartitionKey': str(r['source']),
        'Data': json.dumps(r),
    })

kinesis.put_records(
    StreamName=stream,
    Records=processed_records,
)

所以我的问题是:

  • 为什么每个lambda函数不只处理一个分片的记录?
  • 如何完成?
  • 谢谢!

我正在使用AWS kinesis和lambda在实时数据管道上工作,我试图弄清楚如何保证来自相同数据生产者的记录由相同的分片和...处理]。 [

您为什么要关心哪个Lambda实例处理一个分片? Lambda实例无论如何都没有状态,因此哪个实例读取哪个分片无关紧要。更重要的是,Lambda实例在任何时候都只能从一个分片读取。完成调用后,它可能会从另一个分片读取。
amazon-web-services aws-lambda amazon-kinesis
1个回答
1
投票
您为什么要关心哪个Lambda实例处理一个分片? Lambda实例无论如何都没有状态,因此哪个实例读取哪个分片无关紧要。更重要的是,Lambda实例在任何时候都只能从一个分片读取。完成调用后,它可能会从另一个分片读取。
© www.soinside.com 2019 - 2024. All rights reserved.