我已经尝试诊断这个错误有一段时间了,但无法弄清楚为什么我的completed()函数在我所有的异步函数完成之前执行。我正在使用异步库:
async.forEach(data.DBInstances, function (dbInstance, fcallback) {
let dbtype = dbInstance.Engine;
let logFilename = log[dbtype].log();
let instanceId = dbInstance.DBInstanceIdentifier;
if (tagFilter) {
let arn = dbInstance.DBInstanceArn;
checkRDSTag(arn, tagFilter, function (err, found) {
if (!err) {
//tag was found, continue processing and check other filters...
if (found) {
if (noFilter || (instanceTypes && instanceTypes.indexOf(dbtype))) {
//console.log('db type is: ' + dbtype);
processOrCreateLog(instanceId, dbType, function (err, data) {
if (!err) {
console.log("Data: " + JSON.stringify(data));
completed.push(data);
fcallback(null);
} else {
cb(err, null);
}
});
}
} else {
//tag wasn't found but was specified, don't process anything...
console.log("tag specified was not found on instance: " + instanceId);
}
} else {
console.log("Error checking RDS Tag");
cb(err, null);
}
});
}
//only process filtered types...
else if (noFilter || (instanceTypes && instanceTypes.indexOf(dbtype))) {
console.log('db type is: ' + dbtype);
processOrCreateLog(instanceId, dbtype, fcallback, function (err, data, fcallback) {
if (!err) {
console.log("Data: " + JSON.stringify(data));
completed.push(data);
fcallback(null);
} else {
cb(err, null);
}
});
}
}, testme(completed));
我的异步函数运行正确,并且每个函数都正确完成,但我的 testme(completed) 在我的任何异步函数完成之前立即运行。不知道为什么..
我的测试(已完成)很简单:
function testme(completed) {
console.log("Completed: " + JSON.stringify(completed));
}
需要注意的是,我在每个元素上执行的函数本身都有异步函数(checkRDSTag()、processOrCreateLog() 等)。我猜它与异步期望/跟踪不适当的执行的回调()有关?不太确定..
我的问题最终出现在我的迭代器中的另一个异步调用(processOrCreateLog())中。我的异步调用中存在流控制逻辑,没有回调,因此 fcallback() 从未运行。
还要澄清一下,async 是异步node.js 库:https://caolan.github.io/async/docs.html#each
只要您在迭代器上为每个元素执行回调,并返回错误或 null,它就可以跟踪所有执行,然后正确运行最终回调。
仅在最后一项迭代时返回回调:
var index = 0;
async.forEach(data.DBInstances, function (dbInstance, fcallback) {
let dbtype = dbInstance.Engine;
let logFilename = log[dbtype].log();
let instanceId = dbInstance.DBInstanceIdentifier;
if (tagFilter) {
let arn = dbInstance.DBInstanceArn;
checkRDSTag(arn, tagFilter, function (err, found) {
if (!err) {
// increment index here
index++;
//tag was found, continue processing and check other filters...
if (found) {
if (noFilter || (instanceTypes && instanceTypes.indexOf(dbtype))) {
//console.log('db type is: ' + dbtype);
processOrCreateLog(instanceId, dbType, function (err, data) {
if (!err) {
console.log("Data: " + JSON.stringify(data));
completed.push(data);
//check if last item running
if (index===data.DBInstances.length) {
return fcallback(null);
} else {
fcallback()
}
} else {
cb(err, null);
}
});
}
} else {
//tag wasn't found but was specified, don't process anything...
console.log("tag specified was not found on instance: " + instanceId);
}
} else {
console.log("Error checking RDS Tag");
cb(err, null);
}
});
}
//only process filtered types...
else if (noFilter || (instanceTypes && instanceTypes.indexOf(dbtype))) {
console.log('db type is: ' + dbtype);
processOrCreateLog(instanceId, dbtype, fcallback, function (err, data, fcallback) {
if (!err) {
console.log("Data: " + JSON.stringify(data));
completed.push(data);
//check if last item running
if(index===data.DBInstances.length){
return fcallback(null);
}else{
fcallback()
}
} else {
cb(err, null);
}
});
}
}, testme(completed));