如何与生成器函数中的 limit 并行执行一些异步任务?

问题描述 投票:0回答:3

我正在尝试并行执行一些异步任务,但同时运行的任务的最大数量受到限制。

有一个我想要实现的目标的例子:

目前该任务正在陆续运行中。它是这样实现的:

export function signData(dataItem) {
  cadesplugin.async_spawn(async function* (args) {
    //... nestedArgs assignment logic ...

    for (const id of dataItem.identifiers) {
      yield* idHandler(dataItem, id, args, nestedArgs);
    }
    
    // some extra logic after all tasks were finished
  }, firstArg, secondArg);
}

async function* idHandler(edsItem, researchId, args, nestedArgs) {
  ...
  let oDocumentNameAttr = yield cadesplugin.CreateObjectAsync("CADESCOM.CPAttribute");
  yield oDocumentNameAttr.propset_Value("Document Name");
  ...
  // this function mutates some external data, making API calls and returns void
}

不幸的是,我无法对

cadesplugin.*
函数进行任何更改,但我可以在代码中使用任何外部库(或内置
Promise
)。

我在async库中找到了一些可能对我有用的方法(eachLimitparallelLimit),并且答案展示了如何处理它。

但是还有两个问题我无法解决:

  1. 如何将主要参数传递到嵌套函数中?
  2. Main 函数是一个 生成器函数,所以我仍然需要在主函数和嵌套函数中使用 yield 表达式

有一个指向 cadesplugin.* 源代码的链接,您可以在其中找到我的代码中使用的 async_spawn(和另一个

cadesplugin.*
)函数。

这是我尝试过但没有运气的代码:

await forEachLimit(dataItem.identifiers, 5, yield* async function* (researchId, callback) { 
  //... nested function code 
});

它会导致对象不是异步可迭代的错误。

另一次尝试:

let functionArray = [];
dataItem.identifiers.forEach(researchId => {
  functionArray.push(researchIdHandler(dataItem, id, args, nestedArgs))
});
await parallelLimit(functionArray, 5);

它什么也没做。

С我能以某种方式解决这个问题吗,或者生成器函数不允许我这样做?

javascript asynchronous async-await parallel-processing async.js
3个回答
1
投票

方钉,圆孔

您不能使用异步迭代来解决此问题。

串联
for await .. of的本质。
await
阻塞,循环不会继续,直到等待的 Promise 完成。您需要更精确的控制级别,以便执行这些特定要求。

首先,我们有一个模拟

myJob
来模拟长计算。这很可能是对您应用程序中某些 API 的网络请求 -

// any asynchronous task
const myJob = x =>
  sleep(rand(5000)).then(_ => x * 10)

使用

此问答
中定义的Pool,我们实例化
Pool(size=4)
,其中
size
是要运行的并发线程数-

const pool = new Pool(4)

为了人体工程学,我在

run
类中添加了
Pool
方法,使包装和运行作业变得更容易 -

class Pool {
  constructor (size) ...
  open () ...
  deferNow () ...
  deferStacked () ...

  // added method
  async run (t) {
    const close = await this.open()
    return t().then(close)
  }
}

现在我们需要编写一个使用

pool
来运行
myJob
的效果。在这里您还将决定如何处理结果。请注意,promise must 必须包含在 thunk 中,否则池无法控制它何时开始 -

async function myEffect(x) {
  // run the job with the pool
  const r = await pool.run(_ => myJob(x))

  // do something with the result
  const s = document.createTextNode(`${r}\n`)
  document.body.appendChild(s)

  // return a value, if you want
  return r
}

现在通过将

myEffect
映射到输入列表来运行所有内容。在我们的示例中
myEffect
我们
return r
这意味着在获取 all 结果之后结果也可用。这是可选的,但演示了程序如何知道一切何时完成 -

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
  .then(JSON.stringify)
  .then(console.log, console.error)

完整程序演示

在下面的功能演示中,我浓缩了定义,以便我们可以立即看到它们。运行程序在您自己的浏览器中验证结果 -

class Pool {
  constructor (size = 4) { Object.assign(this, { pool: new Set, stack: [], size }) }
  open () { return this.pool.size < this.size ? this.deferNow() : this.deferStacked() }
  async run (t) { const close = await this.open(); return t().then(close) }
  deferNow () { const [t, close] = thread(); const p = t.then(_ => this.pool.delete(p)).then(_ => this.stack.length && this.stack.pop().close()); this.pool.add(p); return close }
  deferStacked () { const [t, close] = thread(); this.stack.push({ close }); return t.then(_ => this.deferNow()) }
}
const rand = x => Math.random() * x
const effect = f => x => (f(x), x)
const thread = close => [new Promise(r => { close = effect(r) }), close]
const sleep = ms => new Promise(r => setTimeout(r, ms))

const myJob = x =>
  sleep(rand(5000)).then(_ => x * 10)

async function myEffect(x) {
  const r = await pool.run(_ => myJob(x))
  const s = document.createTextNode(`${r}\n`)
  document.body.appendChild(s)
  return r
}
  
const pool = new Pool(4)

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
  .then(JSON.stringify)
  .then(console.log, console.error)

放慢速度

上面的

Pool
会尽快运行并发作业。您可能还对
throttle
感兴趣,这也在原帖中介绍过。我们可以使用
Pool
来包装我们的作业,而不是让
throttle
变得更复杂,让调用者控制作业应该花费的最短时间 -

const throttle = (p, ms) =>
  Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)

我们可以在

throttle
中添加一个
myEffect
。现在,如果
myJob
运行得非常快,则在运行下一个作业之前至少会经过 5 秒 -

async function myEffect(x) {
  const r = await pool.run(_ => throttle(myJob(x), 5000))
  const s = document.createTextNode(`${r}\n`)
  document.body.appendChild(s)
  return r
}

0
投票

一般来说,申请应该比较好@木兰解答

但是,如果您也沉迷于

cadesplugin.*
generator 函数并且并不真正关心重量级外部库,那么这个答案也可能会有所帮助。

(如果您担心重量级外部库,您仍然可以将此答案与@Mulan的答案混合)

异步任务运行可以简单地使用

bluebird库
中的Promise.map函数和双重使用
cadesplugin.async_spawn
函数来解决。

代码如下所示:

export function signData(dataItem) {
  cadesplugin.async_spawn(async function* (args) {
    // some extra logic before all of the tasks

    await Promise.map(dataItem.identifiers,
      (id) => cadesplugin.async_spawn(async function* (args) {
        // ...
        let oDocumentNameAttr = yield cadesplugin.CreateObjectAsync("CADESCOM.CPAttribute");
        yield oDocumentNameAttr.propset_Value("Document Name");
        // ...
        // this function mutates some external data and making API calls
      }),
      {
        concurrency: 5 //Parallel tasks count
      });
    
    // some extra logic after all tasks were finished
  }, firstArg, secondArg);
}

魔法来自

async_spawn
函数,其定义为:

function async_spawn(generatorFunction) {
  async function continuer(verb, arg) {
    let result;
    try {
      result = await generator[verb](arg);
    } catch (err) {
      return Promise.reject(err);
    }
    if (result.done) {
      return result.value;
    } else {
      return Promise.resolve(result.value).then(onFulfilled, onRejected);
    }
  }

  let generator = generatorFunction(Array.prototype.slice.call(arguments, 1));
  let onFulfilled = continuer.bind(continuer, "next");
  let onRejected = continuer.bind(continuer, "throw");
  return onFulfilled();
}

它可以暂停yield表达式上的internal生成器函数的执行,而不暂停整个生成器函数。


0
投票

这是一个非常短的普通 JS 函数

mapParallel
,它将为
fn(v, i)
的每个元素调用
values
,同时确保最多
max
承诺在每个时刻都在运行:

const mapParallel = async (values, fn, max=10) => {
  const promises = new Set();

  for (const i in values) {
    while (promises.size >= max)
      await Promise.race(promises.values());

    let promise = fn(values[i], i).finally(() => promises.delete(promise));
    promises.add(promise);
  }

  return Promise.all(promises.values());
};

通过将异步函数与常规 for 循环一起使用,我们可以使用

await
来阻止循环,直到在开始每个新调用之前有一个槽空闲。

使用示例,一次运行两个延迟:

mapParallel(
  [5000,2000,1000,3000,4000],
  async (t, i) => {
    console.log('starting', i);
    await new Promise(res => setTimeout(res, t));
    console.log('finished', i);
  },
  2
);

打印:

starting 0
starting 1
finished 1
starting 2
finished 2
starting 3
finished 0
starting 4
finished 3
finished 4
© www.soinside.com 2019 - 2024. All rights reserved.