我需要从 AWS 存储中快速(亚秒级)读取 10,000 个大小约为 40KB 的小对象。读取是通过在与存储相同的区域 (us-east-1) 中运行的 Lambda 函数完成的。我之前曾发布过关于当 S3 用作存储时我尝试使用多线程来加快速度的文章。在无法扩展到超过 20 个线程之后,我现在尝试使用 AWS DynamoDB。 我对代码进行了最小的更改,使其能够与 DynamoDB 配合使用,并利用 DynamoDB 的功能批量读取最多 100 个键:
import boto3
import numpy as np
import threading
import math
import json
# For reading and writing NumPy arrays of np.byte type
class DynamoDB:
def __init__(self, table_name):
self.client = boto3.client('dynamodb', region_name="us-east-1")
self.table_name = table_name
def write(self, key, arr):
self.client.put_item(
TableName=self.table_name,
Item={"content_id": key, "vector": arr.tobytes()})
# Batch-read
def read(self, keys):
batch_keys = {
self.table_name: {'Keys': [{'content_id': {'S': id}} for id in keys]}}
response = self.client.batch_get_item(RequestItems=batch_keys)
result = list(response.items())[0][1][self.table_name]
for r in result:
r['vector'] = np.frombuffer(bytes(r['vector']['B']), dtype = np.byte)
return result
dynamodb = DynamoDB("mytable")
def read_vectors(keys):
batch_size = 100
n_batches = math.ceil(len(keys)/batch_size)
for i in range(n_batches):
my_keys = keys[i * batch_size : (i + 1) * batch_size]
dynamodb.read(my_keys)
def concurrent_reads(keys, n_threads):
threads = []
keys_per_thread = math.ceil(len(keys)/n_threads)
for i in range(n_threads):
my_keys = keys[i * keys_per_thread : (i + 1) * keys_per_thread]
thread = threading.Thread(target=read_vectors, args=(my_keys,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
def lambda_handler(event, context):
keys = [f"vectors/{content_id}" for content_id in range(10000)]
concurrent_reads(keys, 10) # 10 threads
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
以下是时间测量:
Threads Time, sec
1 10.5
2 7.1
3 5.5
4 5.2
5 5.4
因此,虽然我使用 S3 的代码没有扩展到超过 20 个线程,但使用 DynamoDB 的代码也没有扩展到超过几个线程。和以前一样,我为 Lambda 提供了 10240 MB RAM,这应该为其提供 25 GiB/s 的网络带宽。鉴于所有对象加起来为 400MB,我绝对不会使带宽饱和。
此外,我使用 2048 MB RAM 获得了相同的性能,因此瓶颈可能不是 Lambda 的资源。
那么,为什么它不能扩展?