读取具有settimeout最大值的流达到错误

问题描述 投票:-1回答:1

我正在尝试读取一些大型CSV文件并处理这些数据,因此在处理中存在速率限制,因此我想在每个请求之间增加1mnt的延迟。我尝试了set超时,但是最后,知道settimeout有一个限制,并得到以下错误。我不确定有任何其他方法可以处理这种情况,CSV文件中的记录超过1M。我在这里做错什么吗?

错误

超时时间设置为1。(节点:41)TimeoutOverflowWarning: 2241362000不适合32位带符号整数。

代码:

   const Queue = require('bull');
const domainQueue = new Queue(config.api.crawlerQ, {
  redis: connectRedis(),
});
let ctr = 0;
function processCSV (name, fileName, options)  {
  return new Promise((resolve, reject) => {
    console.log('process csv started', new Date());
    let filePath = config.api.basePath + fileName;
    stream = fs.createReadStream(filePath)
        .on('error', (error) => {
          // handle error
          console.log('error processing csv');
          reject(error);
        })
        .pipe(csv())
        .on('data', async (row) => {
          ctr++
          increment(row, ctr)
        })
        .on('end', () => {
          console.log('stream processCSV end', fileName, new Date());
          resolve(filePath);
        })
  });

}

async function increment(raw, counter) {
  setTimeout(async function(){
    console.log('say i am inside a function', counter, new Date());
    domainQueue.add(data, options); // Add jobs to queue - Here i Need a delay say 1mnt, if i
    // add jobs without delay it will hit ratelimit 
  }, 60000 * counter);

}

function queueWorkerProcess(value) { // Process jobs in queue and save in text file 
  console.log('value', value, new Date());
  return new Promise(resolve => {
    resolve();
  });

}
javascript node.js loops settimeout delay
1个回答
1
投票

这是一个基本概念。您需要跟踪正在处理的项目数,以限制使用的内存量并控制将结果存储在任何资源上的负载。

[当达到飞行中的某个限制时,您将暂停流。当您回到限制以下时,您将恢复流。您可以在.add()上增加一个计数器,并在completed消息上减少一个计数器以跟踪情况。您可以在此处暂停或恢复视频流。

FYI,仅将setTimeout()插入某处将无济于事。为了控制内存使用,一旦有太多项目在处理中,就必须暂停流中的数据流。然后,当物品退回阈值以下时,您可以继续流。

这里是看起来的轮廓:

const Queue = require('bull');
const domainQueue = new Queue(config.api.crawlerQ, {
    redis: connectRedis(),
});

// counter that keeps track of how many items in the queue
let queueCntr = 0;

// you tune this constant up or down to manage memory usage or tweak performance
// this is what keeps you from having too many requests going at once
const queueMax = 20;

function processCSV(name, fileName, options) {
    return new Promise((resolve, reject) => {
        let paused = false;

        console.log('process csv started', new Date());
        const filePath = config.api.basePath + fileName;

        const stream = fs.createReadStream(filePath)
            .on('error', (error) => {
                // handle error
                console.log('error processing csv');
                domainQueue.off('completed', completed);
                reject(error);
            }).pipe(csv())
            .on('data', async (row) => {
                increment(row, ctr);
                if (queueCntr)
            })
            .on('end', () => {
                console.log('stream processCSV end', fileName, new Date());
                domainQueue.off('completed', completed);
                resolve(filePath);
            });

        function completed() {
            --queueCntr;
            // see if queue got small enough we now resume the stream
            if (paused && queueCntr < queueMax) {
                stream.resume();
                paused = false;
            }
        }

        domainQueue.on('completed', completed);

        function increment(raw, counter) {
            ++queueCntr;
            domainQueue.add(data, options);
            if (!paused && queueCntr > queueMax) {
                stream.pause();
                paused = true;
            }
        }
    });
}

而且,如果要使用不同的文件多次调用processCSV(),则应对它们进行排序,以便在第一个完成之前不要调用第二个,在第二个完成之前不要调用第三个。完成,等等...您不会显示该代码,因此我们无法对此提出具体建议。

© www.soinside.com 2019 - 2024. All rights reserved.