如何在 Angular RxJS 中替换订阅中的订阅?

问题描述 投票:0回答:2

我有一个关于 RxJS 的问题。 我正在创建一个网络应用程序来管理协会的成员。 我想创建一个按钮来“重置”站点的数据库。 步骤如下:

  • 发送电子邮件给所有会员重新注册
  • 删除数据
  • 刷新页面

这是我编写的代码,它可以工作,但我知道有一些问题。我是 RxJS 的新手,所以我不太明白所有原理......

newYear(){
    this.amicalisteService.getAmicalistesValides("").subscribe({
      // Envoyer un mail à l'ensemble des amicalistes valides
      next: amicalistes => {
        for(const amicaliste of amicalistes) {
          const to = amicaliste.email;
          const cc = null;
          const subject = "Adhère à l'AEIR";
          const body = ""
          this.amicalisteService.sendMail(null, to, cc, subject, body).subscribe({
            next: response => {
              console.log('E-mail envoyé avec succès !', response);
            },
            error: error => {
              console.error('Erreur lors de l\'envoi de l\'e-mail :', error);
            }
          });
        }
      },
      complete: () => {
        // Supprimer amicalistes, photos et cartes amicalistes
        this.amicalisteService.getAmicalistes().subscribe(amicalistes  => {
          for(const amicaliste of amicalistes){
            this.amicalisteService.deleteAmicalisteById(amicaliste.id).subscribe();
          }
        });
        this.imageService.getImages("amicaliste").subscribe(images => {
          for(const image of images){
            this.imageService.deleteImageByName("amicaliste", this.imageService.getImageName(image.toString())).subscribe();
          }
        });
        this.imageService.getImages("pdf").subscribe(pdfs => {
          for(const pdf of pdfs){
            this.imageService.deleteImageByName("pdf", this.imageService.getImageName(pdf.toString())).subscribe();
          }
        })
      }
      //Refresh...
    })
  }

我听说在 subscribe() 中使用 subscribe() 不是一个好习惯,但我不知道如何以不同的方式做到这一点。 不过,我想在这段代码中保留几件事。总的来说,如果我没记错的话,3 个 subscribe() 是并行运行的。我想保留它。 否则,我知道使用 switchMap 可以帮助我,但我似乎无法实现它。有人可以给我一些建议吗?

非常感谢您!

angular rxjs observable subscribe switchmap
2个回答
1
投票

可观察量是事件流。

远程调用(例如调用服务器发送邮件或调用数据库来清理一些数据)被实现为事件流,仅通知一个事件(即远程调用的响应),然后

complete
或只是“错误” ○s.

使用 RxJs 运算符,您可以组合这些流。例如,您执行以下操作:

  • 您从一个流 Stream_A 开始,它发出 event_A 然后完成
  • 您可以拥有第二个流,Stream_B,它发出 event_B,然后完成
  • 然后将 Stream_AStream_B 组合起来创建第三个流,Stream_A_B,它首先触发执行 Stream_A 并发出 event_A,一旦 event_A 被通知,触发执行Stream_B并发出Stream_B通知的所有事件,在本例中只是event_B
  • 为了在 RxJ 中创建这个组合流,我们使用操作符
    concatMap
    (注意:人们经常使用
    switchMap
    来连接流 - 通常结果是相同的,但含义和潜在行为略有不同 - 序列为对远程服务的调用必须一个接一个地进行,
    concatMap
    通常是首选方法)

组合更多流以获得新流的另一个示例如下:

  • 有 2 个流,Stream_1 Stream_2Stream_3。这些流中的每一个都会发出一个值,然后完成。
  • 我们可以组合这 3 个流,等待所有 3 个流发出并完成,然后仅发出一个值,即流发出的所有值的数组,然后完成。
  • 使用 RxJs,可以通过函数
    forkJoin
  • 获得这样的新组合流

Havin 说,希望能够澄清 RxJ 和 Observables,这就是我在你的情况下会做的事情

newYear(){
    // assume getAmicalistesValides returns an Observable that emits the result
    // of a remote call
    this.amicalisteService.getAmicalistesValides("")
    // to combine Observables we need to "pipe" operators, i.e. to execute
    // operators one after the other
    .pipe(
      // first thing to do here seems to send an email for each amicaliste
      // assuming we want to send the emails all in parallel, we can first
      // create one Observable for each mail to be sent and then use forkJoin
      // to execute them all in parallel
      // But all this has to happen after the Observable returned by getAmicalistesValides
      // has emitted its value, hence we use concatMap
      concatMap(amicalistes => {
        for(const amicaliste of amicalistes) {
          const to = amicaliste.email;
          const cc = null;
          const subject = "Adhère à l'AEIR";
          const body = ""
          // here we create the array of Observables
          const sendMailObs = this.amicalisteService.sendMail(null, to, cc, subject, body)
          // each of these Observables can print something or react to errors
          .pipe(tap({
            next: response => {
              console.log('E-mail envoyé avec succès !', response);
            },
            error: error => {
              console.error('Erreur lors de l\'envoi de l\'e-mail :', error);
            }))
          });
          // now we trigger the concurrent execution of all sendMail observables
          return forkJoin(sendMailObs)
      }),
      // after having sent the mails you want to do more stuff: delete data, images
      // and so on - assume each of these operations is an Observable
      // you will have to use concatMap and within it create the new Observables
      // and trigger them in parallel using forkJoin, as above
      concatMap(mailSentResults => {
         const deleteDataObs = ....
         const deleteImagesObs = ...
         ...
         return forkJoin([deleteDataObs, deleteImagesObs, // maybe other Obsevables])
      })
    )
    // up to here you have created a new stream, composing various other streams
    // and now is the time to subscribe to this new stream, which is the only stream 
    // you want to explicitely subscribe
    .subscribe({
      next: res => ... // manage the value notified by upstream
      error: err => ... // manage error
      complete: () => ... // do something when all is completed, if required
    })
  }

我希望我已经理解了你的情况,这一切都有道理


1
投票
this.amicalisteService.getAmicalistesValides("").pipe(
  concatMap(amicaliste => {
    // [...]
    return this.amicalisteService.sendMail(/* [...] */);
  }),
  finalize(() => forkJoin([
    this.amicalisteService.getAmicalistes().subscribe(amicalistes => { for(const amicaliste of amicalistes){ this.amicalisteService.deleteAmicalisteById(amicaliste.id).subscribe(); } }), this.imageService.getImages("amicaliste").subscribe(images => { for(const image of images){ this.imageService.deleteImageByName("amicaliste", this.imageService.getImageName(image.toString())).subscribe(); } }), this.imageService.getImages("pdf").subscribe(pdfs => { for(const pdf of pdfs){ this.imageService.deleteImageByName("pdf", this.imageService.getImageName(pdf.toString())).subscribe(); } }) 
  ]).subscribe()), 
);
© www.soinside.com 2019 - 2024. All rights reserved.