我正在尝试运行一个 AWS Lambda 函数,该函数将从 S3 读取 JSON 文件,将其转换为可读流并将该数据保存到数据库。不确定传入数据量,因此使用流。
下面的代码应该做什么的基本概述是:
stream-json
解析传入的 JSON 数据并使用 StreamArray 函数对其进行转换stream-chain
问题是,lambda 只触发一次。我有一个事件桥,每 x 分钟用 S3 中的一个新文件触发这个 lambda。第二次触发 lambda 时,它不会进入流管道并且绝对不会记录任何错误。
下面给出了一个最小的代码示例:
let s3 = new S3(S3_BUCKET_NAME)
export const eventFromSQS = async (eventString: string, callback?: any) => {
try {
const event = JSON.parse(eventString)
console.log(`Received event from SQS ::`, { event })
const obj = JSON.parse(JSON.parse(event.Records[0].body))
const s3Key = decodeURIComponent(obj.s3.object.key)
console.log(`Received s3Key from event object :: ${s3Key}`)
let result = await transformAndSaveData(s3Key)
} catch (err) {
console.log(err)
return createInternalErrorResponse(`Failed sub-function eventFromSQS :: ${err}`)
}
}
const receiveEvent = async (s3Key: string) => {
try {
console.log(`Received s3Key in event :: ${s3Key}`)
const splittedKey = s3Key.split("/")
const actualFileName = splittedKey[splittedKey.length - 1]
const getObjectResponse = await s3.s3GetObject(AWS_BUCKET_PROCESSING, actualFileName),
console.log(`Received event body :: ${getObjectResponse.Body}`)
return getObjectResponse.Body
} catch (err) {
console.log(err)
return createInternalErrorResponse(`Failed sub-function receiveEvent :: ${err}`)
}
}
const getData = (streamData) => {
const cleanData = streamData.map((item) => {
return item.value
})
return cleanData
}
const saveDataInDB = new Writable({
objectMode: true,
writev: (record, callback) => {
// Insert query to enter record in temporary table
console.log(record)
callback()
}
})
const onFinish = () => {
console.log("Pipeline finish")
//add code to transfer data from temporary table to tables
}
function transformAndSaveData(s3Key) {
return new Promise(async (resolve, reject) => {
try {
const source: any = await receiveEvent(s3Key)
const dataParser = parser()
const dataFilter = streamArray({
objectFilter: (data) => {
const value = data.current // the value we are working on
// the value can be incomplete, check if we have necessary properties
if (value) {
return value
}
}
})
const dataBatchSize = new Batch({ batchSize: 25 })
let count = 0
new chain([
source,
dataParser,
dataFilter,
dataBatchSize,
(streamData) => {
getData(streamData)
count++
},
saveDataInDB.on("finish", () => {
onFinish()
//the count variable should be logged here every time the lambda runs
console.log("count:", count)
resolve(`Data accessed successfully: ${count}`)
})
]).on("error", (err) => {
console.log(err)
reject(createInternalErrorResponse(`Pipeline Failed :: ${err}`))
})
} catch (err) {
console.log(err)
// return createInternalErrorResponse(`Pipeline Failed :: ${err}`)
}
})
}
我什至尝试使用 lambda 处理程序提供的回调函数,但无济于事。
我在网上找不到太多资源来调试这个问题。请给我一个在 lambda 函数上处理 nodejs 流的好习惯