Angular 订阅多个部分相互依赖的异步 http 调用并返回数组

问题描述 投票:0回答:1

我对 Angular 和 JavaScript 相当陌生,正在构建我的第一个 Angular 仪表板应用程序,该应用程序查询 Azure DevOps api 以获取测试结果:

  1. 查询 API 以获取发布定义的过滤列表(约 100 个)
  2. 对于每个版本定义,获取最新版本
  3. 对于每个版本,获取该版本中所有测试运行的集合。
  4. 收到第一组测试结果后,立即显示所有结果的表格(每个发布定义都是一行,其中测试结果作为可扩展表格)。

我设法通过可观察对象的嵌套订阅来实现这一点(见下文),但我知道应该避免这种情况,最好使用

mergeMap
/
switchMap
和/或
forkJoin
之类的东西来完成。已经为此苦苦挣扎了好几天,但还没有运气。

然后是挑战 #2:应将第二个数据流添加到此:管道。遵循相同的方法:获取管道列表,对于每个最新的管道运行,对于每个所有的测试运行。两个数据流都可以/应该单独且异步地获取,并且一旦其中一个获取了第一组测试运行,它就可以显示在仪表板上。

如何实现这个?

我的仅使用嵌套订阅的发布定义的工作解决方案:

ngOnInit(): void {   
    this.router.paramMap.pipe(takeUntil(this.ngUnsubscribe))
    .subscribe(params => {
      this.teamToFilterOn = params.get('team');
      this.apiService.getReleaseDefinitions(this.teamToFilterOn as string)
      .pipe(takeUntil(this.ngUnsubscribe))
      .subscribe((releaseDefinitions: any)=> {
        if (releaseDefinitions.length === 0) {
          this.isLoading = false
        }
        releaseDefinitions.forEach((releaseDefinition: any) => {
          if (releaseDefinition.lastRelease) {
            this.apiService.getRelease(releaseDefinition.lastRelease.id)
            .pipe(takeUntil(this.ngUnsubscribe))
            .subscribe((info: PipelineOrReleaseInfo) => {
              if (info) {
                this.apiService.getTestRunsByRelease(info.releaseId)
                .pipe(takeUntil(this.ngUnsubscribe))
                .subscribe((testruns: any) => {    
                  this.isLoading = false;              
                  this.results = [...this.results, { info: info, testruns: testruns, totals: this.calculateEnvironmentTotals(testruns.testRunResults)}];             
                  this.dataSource.data = this.results;
                });
              }              
            });
          }
        });            
      });
    });
  }   

首先尝试使用

forkJoin
,但不知道如何继续。也不确定这是否正确,因为
forkJoin
似乎要等到两个可观察量都完成,但一旦其中一个有结果,它就应该继续循环结果并执行其余的调用。

ngOnInit(): void {   
    this.router.paramMap.pipe(takeUntil(this.ngUnsubscribe))
    .subscribe(params => {
      this.teamToFilterOn = params.get('team');      
      
      let releaseDefQuery =  this.apiService.getReleaseDefinitions(this.teamToFilterOn as string)
      let pipelineDefQuery = this.apiService.getPipelineDefinitions(this.teamToFilterOn as string)

      forkJoin([releaseDefQuery, pipelineDefQuery]).subscribe(definitions => {
        let releaseDefinitions = definitions[0];
        let pipelineDefinitions = definitions[1];

        releaseDefinitions.forEach((releaseDefinition: any) => {
          if (releaseDefinition.lastRelease) {
            this.apiService.getRelease(releaseDefinition.lastRelease.id)
            .pipe(takeUntil(this.ngUnsubscribe))
            .subscribe((info: PipelineOrReleaseInfo) => {
...
javascript angular rxjs observable
1个回答
0
投票

像任何类型的

Stream
(例如
Promise
s)一样,当您看到嵌套在
Observable
s 中时,您可能会想后退一步看看它是否真的有必要。

让我们一点一点地检查你的解决方案。

我们的出发点是:

this.router.paramMap.pipe(takeUntil(this.ngUnsubscribe))

然后你订阅,但在订阅中你对给定的数据进行可观察的操作,这强烈建议你应该

pipe
进行操作,然后订阅最终结果。

在这种情况下,您想要

map
将参数传递给一些可观察的值。您还可能受益于
switchMap
提供的“提前中断”行为。否则,如果您不想“提前打断”,也可以使用
mergeMap
作为一个潜在的选择(它过去更合适地命名为
flatMap
)。

我们将添加

filter
map
以进行良好测量,以确保我们拥有
team
参数,并将其拔出(因为我们不需要其余的)。

this.router.paramMap.pipe(
    takeUntil(this.ngUnsubscribe),
    filter(params => params.has("team"))
    map(params => params.get("team"))
    switchMap(team => {
      this.teamToFilterOn = team as string;
      // We'll dissect the rest
    })
) // [...]

然后是您想与该团队一起做什么的部分。

您有多个依赖于相同输入的“任务”,并且您希望同时执行这两个任务,因此选择

forkJoin
是一个不错的选择。但也有
combineLatest
可以做类似的事情,但是“一步一步”组合结果。

您对这两项任务都使用“最新”一词,因此我们确实会使用

combineLatest
来代替:

const releaseDef$ = // [...]
const pipelineDef$ = // [...]
return combineLatest([releaseDef$, pipelineDef$]);

现在我们来剖析这两个操作。

据我所知,您只对具有

lastRelease
的版本感兴趣。当有新的进来时,您也不想“切换”,您想要全部,让我们对其进行编码:

const releaseDef$ = this.apiService.getReleaseDefinitions(this.teamToFilterOn as string).pipe(
    filter(releaseDef => releaseDef.lastRelease),
    mergeMap(lastReleaseDef => this.apiService.getRelease(releaseDefinition.lastRelease.id)),
    filter(info => !!info)
    mergeMap(info => this.apiService.getTestRunsByRelease(info.releaseId))
    tap(testruns => {
      this.isLoading = false;              
      this.results = [...this.results, { info: info, testruns: testruns, totals: this.calculateEnvironmentTotals(testruns.testRunResults)}];             
      this.dataSource.data = this.results;
    })
)

这里我将使用

tap
,假设您的操作独立于
releaseDef$
pipelineDef$
的结果。如果不这样做,请跳过
tap
并将回调传递给最终的
subscribe

为了总结这一点,让我们把它们放在一起:

ngOnInit(): void {
  this.router.paramMap.pipe(
    takeUntil(this.ngUnsubscribe),
    filter(params => params.has("team"))
    map(params => params.get("team"))
    switchMap(team => {
      this.teamToFilterOn = team as string;
      
      const releaseDef$ = this.apiService.getReleaseDefinitions(this.teamToFilterOn as string).pipe(
        filter(releaseDef => releaseDef.lastRelease),
        mergeMap(lastReleaseDef => this.apiService.getRelease(releaseDefinition.lastRelease.id)),
        filter(info => !!info)
        mergeMap(info => this.apiService.getTestRunsByRelease(info.releaseId))
        tap(testruns => {
          this.isLoading = false;              
          this.results = [...this.results, { info: info, testruns: testruns, totals: this.calculateEnvironmentTotals(testruns.testRunResults)}];             
          this.dataSource.data = this.results;
        })
      );

      const pipelineDef$ = // You didn't present code for this, but you get the idea
      return combineLatest([releaseDef$, pipelineDef$]);
    })
  ).subscribe(/* add your callback here if you don't use `tap`s */);
}

您会注意到我只在

takeUntil(this.ngUnsubscribe)
上使用,因为“主”可观察链将随之停止,这意味着操作也将停止。

如果您不确定或遇到问题,您仍然可以将它们作为每个

.pipe
的第一个参数。

© www.soinside.com 2019 - 2024. All rights reserved.