在 NodeJS lambda 中将 JSON 转换为 Parquet 以写入 S3

问题描述 投票:0回答:2

我正在使用 NodeJS 作为语言运行 AWS Lambda 函数。这个 lambda 接收一些 JSON 输入,我需要在将其写入 S3 之前将其转换为 Parquet 格式。

目前,我正在使用 parquetjs 库将 JSON 数据转换为 Parquet。但是parquet.ParquetWriter.openFile函数创建了一个本地文件,我需要直接写入S3。

理想情况下,我想在内存中将JSON数据转换为Parquet,然后直接发送到S3。由于此 Lambda 函数将被大量使用,因此我需要针对高负载对其进行优化并避免写入本地磁盘。

实现这一目标的最佳实践是什么?

提前感谢您的帮助!

node.js amazon-s3 aws-lambda parquet parquetjs
2个回答
2
投票

使用开箱即用的依赖项需要将 JSON 到 Parquet 的转换写入本地文件。然后,您可以流式读取文件并上传到 S3。

AWS Lambda 为您的代码包含一个 512 MB 的临时文件系统 (

/tmp
),并且不会造成任何性能影响。根据您的负载大小,您可能需要增加它,高达 10 GB

伪代码(1):

const fs = require("fs");
const bodyRequest = {
  id: 1,
  payload: [
    {
      payloadid: 1,
      name: "name-1",
      value: "value-1",
    },
    {
      payloadid: 2,
      name: "name-2",
      value: "value-2",
    },
  ],
};
const schema = new parquet.ParquetSchema({
  id: { type: "UTF8" },
  payload: {
    repeated: true,
    fields: {
      payloadid: { type: "UTF8" },
      name: { type: "UTF8" },
      value: { type: "UTF8" },
    },
  },
});
const writer = await parquet.ParquetWriter.openFile(
  schema,
  "/tmp/example.parquet"
);

await writer.appendRow({
  id: bodyRequest["id"],
  payload: bodyRequest["payload"],
});

await writer.close();

const fileStream = fs.createReadStream("/tmp/example.parquet");
const s3Key = "2022/07/07/example.parquet";
try {
  const params = { Bucket: "bucket", Key: s3Key, Body: fileStream };
  const result = await s3.putObject(params).promise();
  fs.unlink("/tmp/example.parquet", function (err) {
    if (err) {
      console.error(err);
    }
    console.log("File has been Deleted");
  });
  console.log(result);
} catch (e) {
  console.error(e);
}

根据请求的吞吐量,您可能需要服务之间的SQS来执行批量转换。例如:

Request -> Lambda -> S3/json -> S3 Notification -> SQS
batch 50 messages -> Lambda transformation -> S3/parquet


另一个解决方案是使用 AWS Glue 将 S3 对象从 JSON 转换为 Parquet:https://hkdemircan.medium.com/how-can-we-json-css-files-transform-to-parquet-through-aws- glue-465773b43dad

流程将是:

Request -> Lambda -> S3/json
S3/json <- Glue Crawler -> S3/parquet
。您可以通过计划(每 X 分钟)或通过 S3 事件触发它。


0
投票

可以尝试使用DuckDB库将JSON转成parquet,然后将parquet直接写入S3。所有这些都在一个语句中完成。

var duckdb = require('duckdb');
var db = new duckdb.Database(':memory:');

// mysource.json is the json file   
var query = "INSTALL httpfs;LOAD httpfs;SET s3_region='ap-south-1';SET s3_access_key_id='***';SET s3_secret_access_key='***';COPY 'mysource.json' TO 's3://parq-write-bucket/my_data.parq' (FORMAT 'parquet');"
db.all(query, function(err, res) {
  if (err) throw err
  console.log(res)
});

这里的

mysource.json
是lambda磁盘上的json文件。如果您想完全避免写入 lambda 磁盘,那么您可以先将 json 插入到 duckdb 表中,然后将该表作为镶木地板导出到 s3。


一些有用的链接:JSON with duckdb, duckdb s3 export, duckdb to s3.

© www.soinside.com 2019 - 2024. All rights reserved.