我正在尝试并行执行一些异步任务,但同时运行的任务的最大数量受到限制。
有一个我想要实现的目标的例子:
目前该任务正在陆续运行中。它是这样实现的:
export function signData(dataItem) {
cadesplugin.async_spawn(async function* (args) {
//... nestedArgs assignment logic ...
for (const id of dataItem.identifiers) {
yield* idHandler(dataItem, id, args, nestedArgs);
}
// some extra logic after all tasks were finished
}, firstArg, secondArg);
}
async function* idHandler(edsItem, researchId, args, nestedArgs) {
...
let oDocumentNameAttr = yield cadesplugin.CreateObjectAsync("CADESCOM.CPAttribute");
yield oDocumentNameAttr.propset_Value("Document Name");
...
// this function mutates some external data, making API calls and returns void
}
不幸的是,我无法对
cadesplugin.*
函数进行任何更改,但我可以在代码中使用任何外部库(或内置 Promise
)。
我在async库中找到了一些可能对我有用的方法(eachLimit和parallelLimit),并且答案展示了如何处理它。
但是还有两个问题我无法解决:
有一个指向 cadesplugin.* 源代码的链接,您可以在其中找到我的代码中使用的 async_spawn(和另一个
cadesplugin.*
)函数。
这是我尝试过但没有运气的代码:
await forEachLimit(dataItem.identifiers, 5, yield* async function* (researchId, callback) {
//... nested function code
});
它会导致对象不是异步可迭代的错误。
另一次尝试:
let functionArray = [];
dataItem.identifiers.forEach(researchId => {
functionArray.push(researchIdHandler(dataItem, id, args, nestedArgs))
});
await parallelLimit(functionArray, 5);
它什么也没做。
С我能以某种方式解决这个问题吗,或者生成器函数不允许我这样做?
方钉,圆孔
您不能使用异步迭代来解决此问题。
串联是
for await .. of
的本质。 await
阻塞,循环不会继续,直到等待的 Promise 完成。您需要更精确的控制级别,以便执行这些特定要求。
首先,我们有一个模拟
myJob
来模拟长计算。这很可能是对您应用程序中某些 API 的网络请求 -
// any asynchronous task
const myJob = x =>
sleep(rand(5000)).then(_ => x * 10)
使用
此问答中定义的
Pool
,我们实例化Pool(size=4)
,其中size
是要运行的并发线程数-
const pool = new Pool(4)
为了人体工程学,我在
run
类中添加了 Pool
方法,使包装和运行作业变得更容易 -
class Pool {
constructor (size) ...
open () ...
deferNow () ...
deferStacked () ...
// added method
async run (t) {
const close = await this.open()
return t().then(close)
}
}
现在我们需要编写一个使用
pool
来运行 myJob
的效果。在这里您还将决定如何处理结果。请注意,promise must 必须包含在 thunk 中,否则池无法控制它何时开始 -
async function myEffect(x) {
// run the job with the pool
const r = await pool.run(_ => myJob(x))
// do something with the result
const s = document.createTextNode(`${r}\n`)
document.body.appendChild(s)
// return a value, if you want
return r
}
现在通过将
myEffect
映射到输入列表来运行所有内容。在我们的示例中 myEffect
我们 return r
这意味着在获取 all 结果之后结果也可用。这是可选的,但演示了程序如何知道一切何时完成 -
Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
.then(JSON.stringify)
.then(console.log, console.error)
完整程序演示
在下面的功能演示中,我浓缩了定义,以便我们可以立即看到它们。运行程序在您自己的浏览器中验证结果 -
class Pool {
constructor (size = 4) { Object.assign(this, { pool: new Set, stack: [], size }) }
open () { return this.pool.size < this.size ? this.deferNow() : this.deferStacked() }
async run (t) { const close = await this.open(); return t().then(close) }
deferNow () { const [t, close] = thread(); const p = t.then(_ => this.pool.delete(p)).then(_ => this.stack.length && this.stack.pop().close()); this.pool.add(p); return close }
deferStacked () { const [t, close] = thread(); this.stack.push({ close }); return t.then(_ => this.deferNow()) }
}
const rand = x => Math.random() * x
const effect = f => x => (f(x), x)
const thread = close => [new Promise(r => { close = effect(r) }), close]
const sleep = ms => new Promise(r => setTimeout(r, ms))
const myJob = x =>
sleep(rand(5000)).then(_ => x * 10)
async function myEffect(x) {
const r = await pool.run(_ => myJob(x))
const s = document.createTextNode(`${r}\n`)
document.body.appendChild(s)
return r
}
const pool = new Pool(4)
Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
.then(JSON.stringify)
.then(console.log, console.error)
放慢速度
上面的Pool
会尽快运行并发作业。您可能还对 throttle
感兴趣,这也在原帖中介绍过。我们可以使用 Pool
来包装我们的作业,而不是让 throttle
变得更复杂,让调用者控制作业应该花费的最短时间 -
const throttle = (p, ms) =>
Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)
我们可以在
throttle
中添加一个myEffect
。现在,如果 myJob
运行得非常快,则在运行下一个作业之前至少会经过 5 秒 -
async function myEffect(x) {
const r = await pool.run(_ => throttle(myJob(x), 5000))
const s = document.createTextNode(`${r}\n`)
document.body.appendChild(s)
return r
}
一般来说,申请应该比较好@木兰解答。
但是,如果您也沉迷于
cadesplugin.*
generator 函数并且并不真正关心重量级外部库,那么这个答案也可能会有所帮助。
(如果您担心重量级外部库,您仍然可以将此答案与@Mulan的答案混合)
异步任务运行可以简单地使用
bluebird库中的
Promise.map
函数和双重使用cadesplugin.async_spawn
函数来解决。
代码如下所示:
export function signData(dataItem) {
cadesplugin.async_spawn(async function* (args) {
// some extra logic before all of the tasks
await Promise.map(dataItem.identifiers,
(id) => cadesplugin.async_spawn(async function* (args) {
// ...
let oDocumentNameAttr = yield cadesplugin.CreateObjectAsync("CADESCOM.CPAttribute");
yield oDocumentNameAttr.propset_Value("Document Name");
// ...
// this function mutates some external data and making API calls
}),
{
concurrency: 5 //Parallel tasks count
});
// some extra logic after all tasks were finished
}, firstArg, secondArg);
}
魔法来自
async_spawn
函数,其定义为:
function async_spawn(generatorFunction) {
async function continuer(verb, arg) {
let result;
try {
result = await generator[verb](arg);
} catch (err) {
return Promise.reject(err);
}
if (result.done) {
return result.value;
} else {
return Promise.resolve(result.value).then(onFulfilled, onRejected);
}
}
let generator = generatorFunction(Array.prototype.slice.call(arguments, 1));
let onFulfilled = continuer.bind(continuer, "next");
let onRejected = continuer.bind(continuer, "throw");
return onFulfilled();
}
它可以暂停yield表达式上的internal生成器函数的执行,而不暂停整个生成器函数。
这是一个非常短的普通 JS 函数
mapParallel
,它将为 fn(v, i)
的每个元素调用 values
,同时确保最多 max
承诺在每个时刻都在运行:
const mapParallel = async (values, fn, max=10) => {
const promises = new Set();
for (const i in values) {
while (promises.size >= max)
await Promise.race(promises.values());
let promise = fn(values[i], i).finally(() => promises.delete(promise));
promises.add(promise);
}
return Promise.all(promises.values());
};
通过将异步函数与常规 for 循环一起使用,我们可以使用
await
来阻止循环,直到在开始每个新调用之前有一个槽空闲。
使用示例,一次运行两个延迟:
mapParallel(
[5000,2000,1000,3000,4000],
async (t, i) => {
console.log('starting', i);
await new Promise(res => setTimeout(res, t));
console.log('finished', i);
},
2
);
打印:
starting 0
starting 1
finished 1
starting 2
finished 2
starting 3
finished 0
starting 4
finished 3
finished 4