作为 Kinesis Firehose 工作流程的一部分,我有一个链接到我的 Kinesis 数据流的 Lambda 函数。 lambda 函数真正做的就是获取来自 DynamoDB 的序列化记录,并对它们进行反序列化(以及通常的 S3 编码/解码内容):
def lambda_handler(event, context):
deserializer = TypeDeserializer()
output = []
try:
for record in event["records"]:
# Everything stored in S3 is base64 encoded, so we must first decode, deserialize, and
# then encode the records again before we send them to S3
decoded_payload = json.loads(base64.b64decode(record["data"]).decode())
if decoded_payload["dynamodb"] and decoded_payload["dynamodb"]["NewImage"]:
updated_record = decoded_payload["dynamodb"]["NewImage"]
deserialized_record = {
k: deserializer.deserialize(v) for k, v in updated_record.items()
}
# Add newline after each record??? (otherwise Athena will only "see" the first?)
encoded_record_with_line_break = (json.dumps(deserialized_record, cls=DecimalEncoder) + "\n").encode()
output_record = {
"recordId": record["recordId"],
"result": "Ok",
"data": base64.b64encode(encoded_record_with_line_break).decode(),
}
else:
print(
f"Dropping payload containing no updated DynamoDB image: {decoded_payload}"
)
output_record = {
"recordId": record["recordId"],
"result": "Dropped",
"data": record["data"],
}
output.append(output_record)
print(f"Successfully processed {len(event['records'])} records.")
except Exception as exc:
print(f"Unhandled exception raised during data transformation: {exc}")
finally:
return {"records": output}
您可能已经注意到,我添加了 ' ' 在我对其进行编码并将其附加到输出数组之前添加到记录中。
这是因为我遇到了一个问题,我的 Glue Crawlers 没有注册我的 lambda 输出到 S3 的所有记录更新,特别是当同时创建/更新多个记录时。
基本上,Kinesis Data Firehose 发送内联 JSON 记录;它们不以逗号或换行符分隔。在与 AWS 支持取得联系后,他们发现了一篇 AWS 博文是这样说的:
默认情况下,Kinesis Data Firehose 以内联方式发送 JSON 记录,这会导致 Athena 仅查询每个 S3 对象中的第一条记录。为了克服这个问题,我们使用 Lambda 函数通过添加行尾 (EOL) 字符来转换记录,然后再将它们发送到 Amazon S3。
所以我们添加了换行符,正如您在我上面的代码中看到的那样。不幸的是,这似乎并没有解决我的问题——通过添加换行符,我注意到我的 lambda Cloudwatch 日志似乎在记录重复项。对单个记录的每次更新都会导致创建 2 个文件,而不仅仅是 1 个,并且由于数据格式无效 (HIVE_BAD_DATA),在 Athena 中查询这些记录将失败。
处理这种情况的正确方法是什么?
我认为我的问题实际上源于我的原始函数中一些不正确的编码/解码逻辑。
你最终需要' ' 在每个处理过的记录之间,如我链接的文档中所述。
我还找到了一篇支持这个理论的文章,所以这是两个链接:
这是有效的 Python 代码:
decoded_payload = json.loads(base64.b64decode(record["data"]).decode())
if decoded_payload["dynamodb"] and decoded_payload["dynamodb"]["NewImage"]:
updated_record = decoded_payload["dynamodb"]["NewImage"]
deserialized_record = {
k: deserializer.deserialize(v) for k, v in updated_record.items()
}
# Add newline after each record, otherwise Athena will only "see" the first
record_with_line_break = json.dumps(deserialized_record, cls=DecimalEncoder) + "\n"
output_record = {
"recordId": record["recordId"],
"result": "Ok",
"data": base64.b64encode(record_with_line_break.encode()),
}
output.append(output_record)