我正在尝试读取一些大型CSV文件并处理这些数据,因此在处理中存在速率限制,因此我想在每个请求之间增加1mnt的延迟。我尝试了set超时,但是最后,知道settimeout有一个限制,并得到以下错误。我不确定有任何其他方法可以处理这种情况,CSV文件中的记录超过1M。我在这里做错什么吗?
错误
超时时间设置为1。(节点:41)TimeoutOverflowWarning: 2241362000不适合32位带符号整数。
代码:
const Queue = require('bull');
const domainQueue = new Queue(config.api.crawlerQ, {
redis: connectRedis(),
});
let ctr = 0;
function processCSV (name, fileName, options) {
return new Promise((resolve, reject) => {
console.log('process csv started', new Date());
let filePath = config.api.basePath + fileName;
stream = fs.createReadStream(filePath)
.on('error', (error) => {
// handle error
console.log('error processing csv');
reject(error);
})
.pipe(csv())
.on('data', async (row) => {
ctr++
increment(row, ctr)
})
.on('end', () => {
console.log('stream processCSV end', fileName, new Date());
resolve(filePath);
})
});
}
async function increment(raw, counter) {
setTimeout(async function(){
console.log('say i am inside a function', counter, new Date());
domainQueue.add(data, options); // Add jobs to queue - Here i Need a delay say 1mnt, if i
// add jobs without delay it will hit ratelimit
}, 60000 * counter);
}
function queueWorkerProcess(value) { // Process jobs in queue and save in text file
console.log('value', value, new Date());
return new Promise(resolve => {
resolve();
});
}
这是一个基本概念。您需要跟踪正在处理的项目数,以限制使用的内存量并控制将结果存储在任何资源上的负载。
[当达到飞行中的某个限制时,您将暂停流。当您回到限制以下时,您将恢复流。您可以在.add()
上增加一个计数器,并在completed
消息上减少一个计数器以跟踪情况。您可以在此处暂停或恢复视频流。
FYI,仅将setTimeout()
插入某处将无济于事。为了控制内存使用,一旦有太多项目在处理中,就必须暂停流中的数据流。然后,当物品退回阈值以下时,您可以继续流。
这里是看起来的轮廓:
const Queue = require('bull');
const domainQueue = new Queue(config.api.crawlerQ, {
redis: connectRedis(),
});
// counter that keeps track of how many items in the queue
let queueCntr = 0;
// you tune this constant up or down to manage memory usage or tweak performance
// this is what keeps you from having too many requests going at once
const queueMax = 20;
function processCSV(name, fileName, options) {
return new Promise((resolve, reject) => {
let paused = false;
console.log('process csv started', new Date());
const filePath = config.api.basePath + fileName;
const stream = fs.createReadStream(filePath)
.on('error', (error) => {
// handle error
console.log('error processing csv');
domainQueue.off('completed', completed);
reject(error);
}).pipe(csv())
.on('data', async (row) => {
increment(row, ctr);
if (queueCntr)
})
.on('end', () => {
console.log('stream processCSV end', fileName, new Date());
domainQueue.off('completed', completed);
resolve(filePath);
});
function completed() {
--queueCntr;
// see if queue got small enough we now resume the stream
if (paused && queueCntr < queueMax) {
stream.resume();
paused = false;
}
}
domainQueue.on('completed', completed);
function increment(raw, counter) {
++queueCntr;
domainQueue.add(data, options);
if (!paused && queueCntr > queueMax) {
stream.pause();
paused = true;
}
}
});
}
而且,如果要使用不同的文件多次调用processCSV()
,则应对它们进行排序,以便在第一个完成之前不要调用第二个,在第二个完成之前不要调用第三个。完成,等等...您不会显示该代码,因此我们无法对此提出具体建议。