Nodejs Stream 在 AWS lambda 函数上只触发一次

问题描述 投票:0回答:0

我正在尝试运行一个 AWS Lambda 函数,该函数将从 S3 读取 JSON 文件,将其转换为可读流并将该数据保存到数据库。不确定传入数据量,因此使用流。

下面的代码应该做什么的基本概述是:

  1. 接收一个 S3 事件,可以根据该事件从文件创建可读流
  2. 使用库
    stream-json
    解析传入的 JSON 数据并使用 StreamArray 函数对其进行转换
  3. 创建指定数量的批量大小以进一步将数据处理为数组
  4. 可写流在 MySQL 中创建一个临时表并添加任何传入的数据批次
  5. Writable 流上的“完成”事件在添加所有块后触发。这会触发一个存储过程,该过程从创建的临时表传输到相应的原始表
  6. 所有流媒体功能都与库链接
    stream-chain

问题是,lambda 只触发一次。我有一个事件桥,每 x 分钟用 S3 中的一个新文件触发这个 lambda。第二次触发 lambda 时,它不会进入流管道并且绝对不会记录任何错误。

下面给出了一个最小的代码示例:

let s3 = new S3(S3_BUCKET_NAME)

export const eventFromSQS = async (eventString: string, callback?: any) => {
    try {
        const event = JSON.parse(eventString)
        console.log(`Received event from SQS ::`, { event })
        const obj = JSON.parse(JSON.parse(event.Records[0].body))
        const s3Key = decodeURIComponent(obj.s3.object.key)
        console.log(`Received s3Key from event object :: ${s3Key}`)
        let result = await transformAndSaveData(s3Key)
    } catch (err) {
        console.log(err)
        return createInternalErrorResponse(`Failed sub-function eventFromSQS :: ${err}`)
    }
}

const receiveEvent = async (s3Key: string) => {
    try {
        console.log(`Received s3Key in event :: ${s3Key}`)
        const splittedKey = s3Key.split("/")
        const actualFileName = splittedKey[splittedKey.length - 1]
        
        const getObjectResponse = await s3.s3GetObject(AWS_BUCKET_PROCESSING, actualFileName),
        
    
        console.log(`Received event body :: ${getObjectResponse.Body}`)
        return getObjectResponse.Body
    } catch (err) {
        console.log(err)
        return createInternalErrorResponse(`Failed sub-function receiveEvent :: ${err}`)
    }
}


const getData = (streamData) => {
    const cleanData = streamData.map((item) => {
        return item.value
    })
    return cleanData
}

const saveDataInDB = new Writable({
    objectMode: true,
    writev: (record, callback) => {
        // Insert query to enter record in temporary table
        console.log(record)
        callback()
    }
})

const onFinish = () => {
    console.log("Pipeline finish")
    //add code to transfer data from temporary table to tables
}

function transformAndSaveData(s3Key) {
    return new Promise(async (resolve, reject) => {
        try {
            const source: any = await receiveEvent(s3Key)
            const dataParser = parser()
            const dataFilter = streamArray({
                objectFilter: (data) => {
                const value = data.current // the value we are working on
                // the value can be incomplete, check if we have necessary properties
                if (value) {
                 return value
                }
              }
            })
            const dataBatchSize = new Batch({ batchSize: 25 })
            let count = 0
            new chain([
                source,
                dataParser,
                dataFilter,
                dataBatchSize,
                (streamData) => {
                    getData(streamData)
                    count++
                },

                saveDataInDB.on("finish", () => {
                    onFinish()
                    //the count variable should be logged here every time the lambda runs
                    console.log("count:", count)
                    resolve(`Data accessed successfully: ${count}`)
                })
            ]).on("error", (err) => {
                console.log(err)
                reject(createInternalErrorResponse(`Pipeline Failed :: ${err}`))
            })
        } catch (err) {
            console.log(err)
            // return createInternalErrorResponse(`Pipeline Failed :: ${err}`)
        }
    })
}

我什至尝试使用 lambda 处理程序提供的回调函数,但无济于事。

我在网上找不到太多资源来调试这个问题。请给我一个在 lambda 函数上处理 nodejs 流的好习惯

node.js typescript lambda node-streams stream-json
© www.soinside.com 2019 - 2024. All rights reserved.