我有一个 Node.js 应用程序,它可以在本地获取文件列表并将它们上传到服务器。该列表可能包含数千个文件。
for (var i = 0; i < files.length; i++) {
upload_file(files[i]);
}
如果我对数千个文件执行此操作,upload_file 将同时被调用数千次,并且很可能会死掉(或者至少会挣扎)。在同步世界中,我们创建一个线程池并将其限制为一定数量的线程。有没有一种简单的方法来限制一次执行的异步调用数量?
像往常一样,我推荐 Caolan McMahon 的 async 模块。
使您的
upload_file
函数将回调作为第二个参数:
var async = require("async");
function upload_file(file, callback) {
// Do funky stuff with file
callback();
}
var queue = async.queue(upload_file, 10); // Run ten simultaneous uploads
queue.drain = function() {
console.log("All files are uploaded");
};
// Queue your files for upload
queue.push(files);
queue.concurrency = 20; // Increase to twenty simultaneous uploads
上面的答案,回复:NPM 上的async是最好的答案,但如果您想了解有关控制流的更多信息:
您应该研究控制流模式。 Mishu's Node Book第7章有关于控制流模式的精彩讨论。也就是说,我将查看 7.2.3 中的示例:有限并行 - 异步、并行、并发限制的 for 循环。
我改编了他的例子:
function doUpload() {
// perform file read & upload here...
}
var files = [...];
var limit = 10; // concurrent read / upload limit
var running = 0; // number of running async file operations
function uploader() {
while(running < limit && files.length > 0) {
var file = files.shift();
doUpload(file, function() {
running--;
if(files.length > 0)
uploader();
});
running++;
}
}
uploader();
你应该尝试排队。我假设当
upload_file()
完成时会触发回调。像这样的东西应该可以解决问题(未经测试):
function upload_files(files, maxSimultaneousUploads, callback) {
var runningUploads = 0,
startedUploads = 0,
finishedUploads = 0;
function next() {
runningUploads--;
finishedUploads++;
if (finishedUploads == files.length) {
callback();
} else {
// Make sure that we are running at the maximum capacity.
queue();
}
}
function queue() {
// Run as many uploads as possible while not exceeding the given limit.
while (startedUploads < files.length && runningUploads < maxSimultaneousUploads) {
runningUploads++;
upload_file(files[startedUploads++], next);
}
}
// Start the upload!
queue();
}
其他答案似乎已经过时了。使用 async 中的 paralleLimit 可以轻松解决这个问题。下面介绍如何使用它。我没测试过。
var tasks = files.map(function(f) {
return function(callback) {
upload_file(f, callback)
}
});
parallelLimit(tasks, 10, function(){
});
没有外部库。只是简单的 JS。
可以使用递归来解决。
我们的想法是,最初我们立即启动允许的最大上传数量,并且每个请求都应在完成后递归地启动新的上传。
在此示例中,我将成功的响应与错误一起填充,并执行所有请求,但如果您想在第一次失败时终止批量上传,可以稍微修改算法。
async function batchUpload(files, limit) {
limit = Math.min(files.length, limit);
return new Promise((resolve, reject) => {
const responsesOrErrors = new Array(files.length);
let startedCount = 0;
let finishedCount = 0;
let hasErrors = false;
function recursiveUpload() {
let index = startedCount++;
uploadFile(files[index])
.then(res => {
responsesOrErrors[index] = res;
})
.catch(error => {
responsesOrErrors[index] = error;
hasErrors = true;
})
.finally(() => {
finishedCount++;
if (finishedCount === files.length) {
hasErrors ? reject(responsesOrErrors) : resolve(responsesOrErrors);
} else if (startedCount < files.length) {
recursiveUpload();
}
});
}
for (let i = 0; i < limit; i++) {
recursiveUpload();
}
});
}
async function uploadFile(file) {
console.log(`${file} started`);
const delay = Math.floor(Math.random() * 1500);
return new Promise((resolve, reject) => {
setTimeout(() => {
if (delay <= 1000) {
console.log(`${file} finished successfully`);
resolve(`${file} success`);
} else {
console.log(`${file} finished with error`);
reject(`${file} error`);
}
}, delay);
});
}
const files = new Array(10).fill('file').map((file, index) => `${file}_${index + 1}`);
batchUpload(files, 3)
.then(responses => console.log('All successfull', responses))
.catch(responsesWithErrors => console.log('All with several failed', responsesWithErrors));
在现代 Javascript 中,这是使用生成器或异步生成器为您提供隐式背压的经典案例(例如,及时完成作业)。
我编写了
@watchable/nevermore
包来帮助解决这个问题。例如,如果您想将速率限制为每秒 10 个,将请求的并发数限制为一次 10 个,并通过重试最多 3 次以静默方式处理失败,您可以使用这样的生成器来编写原始示例...
import { createSettlementSequence } from "@watchable/nevermore";
const settlementSequence = createSettlementSequence(
{
concurrency:10,
intervalMs:100,
retries: 3,
},
function* () {
for (let i = 0; i < files.length; i++) {
const file = files[i];
yield () => upload_file(file);
}
}
);
for await (const settlement of settlementSequence){
console.log(`Settlement status was ${settlement.status}`)
}
更好的是使用 AsyncGenerator,这样所有文件就不需要在数组中列出。为此,您可以将同步发电机替换为...
async function* {
for await (const file of createFileSequence()){
yield () => upload_file(file)
}
}
请参阅 https://www.npmjs.com/package/@watchable/nevermore API 文档位于 https://watchable.dev/api/modules/_watchable_nevermore.html