我正在使用AWS kinesis和lambda在实时数据管道上工作,我试图弄清楚如何才能保证相同数据生产者的记录由相同的分片,最终由相同的lambda函数实例处理。
我的方法是使用分区键来确保来自相同生产者的记录由相同的分片处理。但是,我无法通过相同的lambda函数实例来处理来自同一分片的记录。
基本设置如下:
看起来像:
您可以在图中看到,有三个lambda函数实例被调用进行处理;每个分片一个。在此管道中,由相同的lambda函数实例处理来自相同数据源的记录非常重要。根据我阅读的内容,可以确保来自同一源的所有记录都使用相同的分区键,以便由相同的分片进行处理,从而可以保证这一点。
分区键
分区键用于在一个分区内按碎片对数据进行分组流。 Kinesis Data Streams服务隔离数据记录使用分区键将流属于多个分片与每个数据记录相关联以确定给定数据的分片记录属于。分区键是最大长度为Unicode的字符串长度限制为256个字节。 MD5哈希函数用于映射将键分区为128位整数值并映射相关数据记录到碎片。当应用程序将数据放入流中时,它会必须指定分区键。
来源:https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key
这有效。因此具有相同分区键的记录由相同的分片处理
。但是,它们由不同的lambda函数实例处理。因此,每个分片都会调用一个lambda函数实例,但它不仅处理来自一个分片的记录,还处理来自多个分片的记录。这里似乎没有任何模式可以将记录移交给lambda。这是我的测试设置:我向流中发送了一堆测试数据,并在lambda函数中打印了记录。这是三个功能实例的输出(检查每行末尾的分区键。每个键应仅出现在三个日志之一中,而不应出现在多个日志中):
Lambda实例1:
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'} {'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'} {'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'} {'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'} {'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'} {'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'} {'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'} {'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'}
Lambda实例2:
{'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'} {'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'} {'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'} {'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'} {'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'}
Lambda实例3:
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'} {'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'} {'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'} {'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'} {'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'} {'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'} {'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'} {'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'} {'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'} {'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
这是我将数据插入流中的方式(如您所见,分区键设置为源ID):
processed_records = [] for r in records: processed_records.append({ 'PartitionKey': str(r['source']), 'Data': json.dumps(r), }) kinesis.put_records( StreamName=stream, Records=processed_records, )
所以我的问题是:
谢谢!
我正在使用AWS kinesis和lambda在实时数据管道上工作,我试图弄清楚如何保证来自相同数据生产者的记录由相同的分片和...处理]。 [