限制 Node.js 中的异步调用

问题描述 投票:0回答:6

我有一个 Node.js 应用程序,它可以在本地获取文件列表并将它们上传到服务器。该列表可能包含数千个文件。

for (var i = 0; i < files.length; i++) {
   upload_file(files[i]);
}

如果我对数千个文件执行此操作,upload_file 将同时被调用数千次,并且很可能会死掉(或者至少会挣扎)。在同步世界中,我们创建一个线程池并将其限制为一定数量的线程。有没有一种简单的方法来限制一次执行的异步调用数量?

node.js asynchronous
6个回答
77
投票

像往常一样,我推荐 Caolan McMahon 的 async 模块

使您的

upload_file
函数将回调作为第二个参数:

var async = require("async");

function upload_file(file, callback) {
    // Do funky stuff with file
    callback();
}

var queue = async.queue(upload_file, 10); // Run ten simultaneous uploads

queue.drain = function() {
    console.log("All files are uploaded");
};

// Queue your files for upload
queue.push(files);

queue.concurrency = 20; // Increase to twenty simultaneous uploads

26
投票

上面的答案,回复:NPM 上的async是最好的答案,但如果您想了解有关控制流的更多信息:


您应该研究控制流模式。 Mishu's Node Book第7章有关于控制流模式的精彩讨论。也就是说,我将查看 7.2.3 中的示例:有限并行 - 异步、并行、并发限制的 for 循环

我改编了他的例子:

function doUpload() {
    // perform file read & upload here...
}

var files   = [...];
var limit   = 10;       // concurrent read / upload limit
var running = 0;        // number of running async file operations

function uploader() {
    while(running < limit && files.length > 0) {
        var file = files.shift();
        doUpload(file, function() {
            running--;
            if(files.length > 0)
                uploader();
        });
        running++;
    }
}

uploader();

8
投票

你应该尝试排队。我假设当

upload_file()
完成时会触发回调。像这样的东西应该可以解决问题(未经测试):

function upload_files(files, maxSimultaneousUploads, callback) {
    var runningUploads = 0,
        startedUploads = 0,
        finishedUploads = 0;

    function next() {
        runningUploads--;
        finishedUploads++;

        if (finishedUploads == files.length) {
            callback();
        } else {
            // Make sure that we are running at the maximum capacity.
            queue();
        }
    }

    function queue() {
        // Run as many uploads as possible while not exceeding the given limit.
        while (startedUploads < files.length && runningUploads < maxSimultaneousUploads) {
            runningUploads++;
            upload_file(files[startedUploads++], next);
        }
    }

    // Start the upload!
    queue();
}

3
投票

其他答案似乎已经过时了。使用 async 中的 paralleLimit 可以轻松解决这个问题。下面介绍如何使用它。我没测试过。

var tasks = files.map(function(f) {
    return function(callback) {
        upload_file(f, callback)
    }
});

parallelLimit(tasks, 10, function(){
});

0
投票

没有外部库。只是简单的 JS。

可以使用递归来解决。

我们的想法是,最初我们立即启动允许的最大上传数量,并且每个请求都应在完成后递归地启动新的上传。

在此示例中,我将成功的响应与错误一起填充,并执行所有请求,但如果您想在第一次失败时终止批量上传,可以稍微修改算法。

async function batchUpload(files, limit) {
  limit = Math.min(files.length, limit);

  return new Promise((resolve, reject) => {
    const responsesOrErrors = new Array(files.length);
    let startedCount = 0;
    let finishedCount = 0;
    let hasErrors = false;

    function recursiveUpload() {
      let index = startedCount++;

      uploadFile(files[index])
        .then(res => {
          responsesOrErrors[index] = res;
        })
        .catch(error => {
          responsesOrErrors[index] = error;
          hasErrors = true;
        })
        .finally(() => {
          finishedCount++;
          if (finishedCount === files.length) {
            hasErrors ? reject(responsesOrErrors) : resolve(responsesOrErrors);
          } else if (startedCount < files.length) {
            recursiveUpload();
          }
        });
    }

    for (let i = 0; i < limit; i++) {
      recursiveUpload();
    }
  });
}

async function uploadFile(file) {
  console.log(`${file} started`);
  const delay = Math.floor(Math.random() * 1500);
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      if (delay <= 1000) {
        console.log(`${file} finished successfully`);
        resolve(`${file} success`);
      } else {
        console.log(`${file} finished with error`);
        reject(`${file} error`);
      }
    }, delay);
  });
}

const files = new Array(10).fill('file').map((file, index) => `${file}_${index + 1}`);

batchUpload(files, 3)
  .then(responses => console.log('All successfull', responses))
  .catch(responsesWithErrors => console.log('All with several failed', responsesWithErrors));


0
投票

在现代 Javascript 中,这是使用生成器或异步生成器为您提供隐式背压的经典案例(例如,及时完成作业)。

我编写了

@watchable/nevermore
包来帮助解决这个问题。例如,如果您想将速率限制为每秒 10 个,将请求的并发数限制为一次 10 个,并通过重试最多 3 次以静默方式处理失败,您可以使用这样的生成器来编写原始示例...

import { createSettlementSequence } from "@watchable/nevermore";
const settlementSequence = createSettlementSequence(
  {
    concurrency:10,
    intervalMs:100,
    retries: 3,
  },
  function* () {
    for (let i = 0; i < files.length; i++) {
      const file = files[i];
      yield () => upload_file(file);
    }
  }
);

for await (const settlement of settlementSequence){
   console.log(`Settlement status was ${settlement.status}`)
}

更好的是使用 AsyncGenerator,这样所有文件就不需要在数组中列出。为此,您可以将同步发电机替换为...

async function* {
  for await (const file of createFileSequence()){
    yield () => upload_file(file)
  }
}

请参阅 https://www.npmjs.com/package/@watchable/nevermore API 文档位于 https://watchable.dev/api/modules/_watchable_nevermore.html

© www.soinside.com 2019 - 2024. All rights reserved.