在nodejs中使用RXJS进行批量数据调用

问题描述 投票:0回答:1

我想发送大量数据,并在完成数据处理后,移至下一个数据。例如:我有这个功能:

async function test() {
  await sample.sampleStructure()
  await sample.sampleDataAdd()
  await sample.sampleDataGet()
}

我在为每个呼叫调用3个函数的地方。但是我想发送例如await sample.sampleDataAdd()的200个数据,如果前200个数据响应为“成功”,则它将发送其余200个数据。完成1000个数据后,我想继续进行下一个函数调用。所以我看了一下,我认为RXJS可以提供​​解决方案。但是我不使用是否有可能在这种情况下使用过滤器和下一步。

async function test() {
  await sample.sampleStructure()
  // let observable = Observable.range(1,1000)
  let observable = Observable.create()
  observable
  .filter(aysnc function () {
    await sample.sampleDataAdd()
   })
  .subscribe({
    .next: async function () {
       // await sample.sampleDataAdd()
    },
    .error: function (error) {
      console.log(error)
     }
   })

我是rxjs的新手,所以也有很多错误。请谁能帮我解决这个问题?

javascript node.js rxjs rxjs6 rxjs-observables
1个回答
0
投票

您可以使用Promise.all并行运行作业

const job1Result = await job1()
const job2 = sample.sampleStructure2()
const job3 = sample.sampleStructure3()
const job4 = sample.sampleStructure4()
const [job2Result, job3Result, job4Result] = await Promise.all([job2, job3, job4])

const job5 = await job5()


然后您可以创建一次遍历10次的循环。

如果您要介绍rxjs,还有很多东西要学习。处理rxjs订阅和错误处理可能会相当艰巨。因此,在介绍它之前,请尝试解决不使用它的问题。

© www.soinside.com 2019 - 2024. All rights reserved.