我目前正在使用 Athena 以及
Kinesis Firehose
、Glue Crawler
。 Kinesis Firehose
正在将 JSON 保存到单行文件,如下所示
{"name": "Jone Doe"}{"name": "Jane Doe"}{"name": "Jack Doe"}
但是我注意到雅典娜查询
select count(*) from db.names
返回1而不是3。在搜索问题之后。我找到了以下文件。
文章说 JSON 文件应该用换行符存储。
{"name": "Jone Doe"}
{"name": "Jane Doe"}
{"name": "Jack Doe"}
有一些聪明的技巧可以在单行 JSON 文件上运行 athena 查询吗?
感谢@Constantine,AWS Athena 正在执行分布式处理。由于单行JSON文件没有分隔符,因此无法进行分布式处理。因此,您必须在保存之前转换文件。
Kinesis Firehose 提供使用 Lambda 的转换,我添加了以下转换,以便从 AWS Athena 查询数据。
const addNewLine = (data) => {
const parsedData = JSON.parse(new Buffer.from(data,'base64').toString('utf8'));
return new Buffer.from(JSON.stringify(parsedData) + '\n').toString('base64')
}
exports.handler = async (event, context) => {
const output = event.records.map((record) => ({
recordId: record.recordId,
result: 'Ok',
data: addNewLine(record.data),
}));
return { records: output };
};
我通过以下链接想出了这段代码 AWS Firehose 换行符
我相信无法正确处理具有此类 JSON 的文件,因为需要分隔符才能分发工作。文档中没有关于如何提供自定义分隔符的明确信息,并且很可能在支持的 JSON SerDe 库中不可能实现。除此之外,给定的 JSON 对象之间没有 JSON 本身未使用的明显分隔符。事实上,根本没有分隔符。
但是,可以使用 Firehose 数据转换来缓冲传入数据并异步调用每个缓冲区的 Lambda 函数。有预定义的 Lambda 蓝图,在这种情况下可以使用
Kinesis Firehose Processing
在 JSON 对象之间添加换行符。
每个转换后的记录应该包含
recordId
、result
和 Base64 编码的 data
以及转换后的有效负载。
这种 Lambda 函数有多个示例,例如这个 python 示例位于 GitHub 上的 Amazon AWS 示例存储库中。
我们这里不需要任何处理:
创建物联网主题时。
aws iot create-topic-rule --rule-name aws_iot_streaming --topic-rule-payload file://rule.json
将此用作rule.json 文件:
{
"sql": "SELECT * FROM 'aws-iot-streaming'",
"ruleDisabled": false,
"awsIotSqlVersion": "2016-03-23",
"actions": [{
"firehose": {
"roleArn": "arn:aws:iam::XXXXXXXXXXXX:role/iot-rule",
"deliveryStreamName": "PUT-S3-fggfn",
"separator": "\n"
}
}]
}
使用分隔符作为“ “