我正在使用 NodeJS 作为语言运行 AWS Lambda 函数。这个 lambda 接收一些 JSON 输入,我需要在将其写入 S3 之前将其转换为 Parquet 格式。
目前,我正在使用 parquetjs 库将 JSON 数据转换为 Parquet。但是parquet.ParquetWriter.openFile函数创建了一个本地文件,我需要直接写入S3。
理想情况下,我想在内存中将JSON数据转换为Parquet,然后直接发送到S3。由于此 Lambda 函数将被大量使用,因此我需要针对高负载对其进行优化并避免写入本地磁盘。
实现这一目标的最佳实践是什么?
提前感谢您的帮助!
使用开箱即用的依赖项需要将 JSON 到 Parquet 的转换写入本地文件。然后,您可以流式读取文件并上传到 S3。
AWS Lambda 为您的代码包含一个 512 MB 的临时文件系统 (
/tmp
),并且不会造成任何性能影响。根据您的负载大小,您可能需要增加它,高达 10 GB。
伪代码(1):
const fs = require("fs");
const bodyRequest = {
id: 1,
payload: [
{
payloadid: 1,
name: "name-1",
value: "value-1",
},
{
payloadid: 2,
name: "name-2",
value: "value-2",
},
],
};
const schema = new parquet.ParquetSchema({
id: { type: "UTF8" },
payload: {
repeated: true,
fields: {
payloadid: { type: "UTF8" },
name: { type: "UTF8" },
value: { type: "UTF8" },
},
},
});
const writer = await parquet.ParquetWriter.openFile(
schema,
"/tmp/example.parquet"
);
await writer.appendRow({
id: bodyRequest["id"],
payload: bodyRequest["payload"],
});
await writer.close();
const fileStream = fs.createReadStream("/tmp/example.parquet");
const s3Key = "2022/07/07/example.parquet";
try {
const params = { Bucket: "bucket", Key: s3Key, Body: fileStream };
const result = await s3.putObject(params).promise();
fs.unlink("/tmp/example.parquet", function (err) {
if (err) {
console.error(err);
}
console.log("File has been Deleted");
});
console.log(result);
} catch (e) {
console.error(e);
}
根据请求的吞吐量,您可能需要服务之间的SQS来执行批量转换。例如:
Request -> Lambda -> S3/json -> S3 Notification -> SQS
和batch 50 messages -> Lambda transformation -> S3/parquet
另一个解决方案是使用 AWS Glue 将 S3 对象从 JSON 转换为 Parquet:https://hkdemircan.medium.com/how-can-we-json-css-files-transform-to-parquet-through-aws- glue-465773b43dad
流程将是:
Request -> Lambda -> S3/json
和S3/json <- Glue Crawler -> S3/parquet
。您可以通过计划(每 X 分钟)或通过 S3 事件触发它。
可以尝试使用DuckDB库将JSON转成parquet,然后将parquet直接写入S3。所有这些都在一个语句中完成。
var duckdb = require('duckdb');
var db = new duckdb.Database(':memory:');
// mysource.json is the json file
var query = "INSTALL httpfs;LOAD httpfs;SET s3_region='ap-south-1';SET s3_access_key_id='***';SET s3_secret_access_key='***';COPY 'mysource.json' TO 's3://parq-write-bucket/my_data.parq' (FORMAT 'parquet');"
db.all(query, function(err, res) {
if (err) throw err
console.log(res)
});
这里的
mysource.json
是lambda磁盘上的json文件。如果您想完全避免写入 lambda 磁盘,那么您可以先将 json 插入到 duckdb 表中,然后将该表作为镶木地板导出到 s3。
一些有用的链接:JSON with duckdb, duckdb s3 export, duckdb to s3.