我有一个一百万条记录的文件,其中我必须逐个传递一个记录以进行弹性搜索并将结果数据保存到数据库中。但是问题是,这样做需要花费很长时间,因为记录是逐一流式传输到Elasticsearch,然后将数据保存到PSQL数据库中。我需要一些建议,以期对此进行改进或应使用其他一些工具。
现在我正在将Nodejs与某些软件包一起使用:
我在nodejs应用程序中上传文件,并使用将其转换为json文件const csv=require('csvtojson')
我使用
const StreamArray = require('stream-json/streamers/StreamArray');
const {Writable} = require('stream');
用于读取json并通过流使用这些包对其进行解析,因为文件太大。我使用此代码
const fileStream = fs.createReadStream(this.fileName);
const jsonStream = StreamArray.withParser();
const incomingThis = this;
const processingStream = new Writable({
write({key, value}, encoding, callback) {
incomingThis.recordParser(value, (val, data) => { // pass the data to elasticsearch to get search data
incomingThis.processQueue(data); // save the data to the PSQL database
callback();
});
},
//Don't skip this, as we need to operate with objects, not buffers
objectMode: true
});
//Pipe the streams as follows
fileStream.pipe(jsonStream.input);
jsonStream.pipe(processingStream);
//So we're waiting for the 'finish' event when everything is done.
processingStream.on('finish', async () => {
console.log('stream end');
const statistics = new Statistics(jobId);
await statistics.update(); // update the job table for completion of data
});
[请提出建议,我要如何改进才能在几个小时而不是几天或更短的时间内解析一百万个记录文件。我愿意使用其他工具,例如redis,如果可以帮助我,请发火花。
谢谢。
而不是从流中一一按下。使用批处理方法(创建多个批处理)以获取[elastic][1]
中的数据并批量保存。