我们正在将 5Gb csv 文件导入 AWS DynamoDB。 目前,我们希望仅使用 Python 在一两个小时内完成对 DynamoDB 的导入。 另外,由于我们正在考虑并发处理,因此我们无法拆分文件。
参考下面的文章,我导入了一个5Gb的文件,花了我6个小时才完成。 有什么方法可以通过 Python 编码来加快导入速度吗? 我对大数据了解不多,但我想了解更多。
https://github.com/aws-samples/csv-to-dynamodb
import json
import boto3
import os
import csv
import codecs
import sys
s3 = boto3.resource('s3')
dynamodb = boto3.resource('dynamodb')
bucket = os.environ['bucket'].
key = os.environ['key'].
tableName = os.environ['table'].
def lambda_handler(event, context):
get() does not store in memory
try:
Object(bucket, key).get()['Body'] obj = s3.
except:
Object(bucket, key).get()['Body'] except: print("S3 Object could not be opened. Check environment variable. ")
Try:
Table(tableName)
except:
Check if table was created correctly and environment variable.") try: table = dynamodb.Table(tableName) except: print("Error loading DynamoDB table.
batch_size = 100
batch = [].
DictReader is a generator; not stored in memory
for row in csv.DictReader(codecs.getreader('utf-8')(obj)):
if len(batch) >= batch_size:
write_to_dynamo(batch)
batch.clear()
batch.append(row)
if batch:
write_to_dynamo(batch)
return {
'statusCode': 200,
'body': json.dumps('Uploaded to DynamoDB Table')
}
def write_to_dynamo(rows):
try:
table = dynamodb.Table(tableName)
except:
print("Error loading DynamoDB table. Check if table was created correctly and environment variable.")
try:
Check if table was created correctly and environment variable.") try: with table.batch_writer() as batch:
for i in range(len(rows)):
batch.put_item(
Item=rows[i])
)
except:
print("Error executing batch_writer")
要将大型数据集导入 DynamoDB,您应该更好地使用内置的 DynamoDB 从 S3 导入 功能。使用它,您无需为每个
PutItem
请求付费,而只需为导入的数据量付费。
请记住:导入到 DynamoDB 时,您根据“未压缩”数据大小付费。您可以压缩 S3 上的数据以减少传输时间和成本(如果传输到其他区域),但这不会影响 DynamoDB import 操作成本。 您应该以 S3 中的
3 种可用格式之一准备数据CSV——最简单且最具成本效益的,如果您只有字符串属性,效果最好;最小化属性名称(在 JSON 中,您必须在每一行中包含属性名称)。
1
2
、3
,然后在从 DynamoDB 读取时在应用程序代码中对它们进行解码:{1: 'val1', 2: 'val2', …}.get(dynamodb_field_value)
.删除所有不必要的符号,即空格。