RxJs - 将Observables数组转换为发射值数组

问题描述 投票:1回答:2

抱歉标题,我想不出更好的一个。 我有这段代码,基本上:

  1. 过滤有效(非空)cron epressions'数组
  2. 将每个cron表达式映射到对服务的调用

this.formGroup.valueChanges.pipe(
    op.filter(v => !!v.cronExpressions),
    op.map((v): string[] => v.cronExpressions),
    op.map((v: string[]) => v.map(cron =>
            this.cronService.getReadableForm(cron).pipe(
                op.map(this.toDescription),
                op.map((description): CronExpressionModel => ({ cron, description }))
            )
        )
    ),
    // What now?
).subscribe((cronExpressions: CronExpressionModel[]) => ...) // Expected result

我想在subscribe()上获得从所有服务电话中返回的CronExpressionModel阵列。

我无法绕过这个。


目前的解决方案,根据马丁答案:

filter(v => !!v.cronExpressions),
map(v => v.cronExpressions),
map(cronExprs => cronExprs.map(c => this.invokeCronService(c))),
mergeMap(serviceCalls => forkJoin(serviceCalls).pipe(defaultIfEmpty([])))
rxjs
2个回答
2
投票

要将流转换为数组,可以使用toArray运算符。

这是一个建议:

this.formGroup.valueChanges.pipe(
    filter(v => !!v.cronExpressions),
    // transform [item1, item2...] into a stream ----item1----item2----> 
    concatMap((v): Observable<string> => from(v.cronExpressions).pipe(
        // for each of the items, make a request and wait for it to respond
        concatMap((cron: string) => this.cronService.getReadableForm(cron)),
        map(this.toDescription),
        map((description): CronExpressionModel => ({ cron, description })),
        // wait for observables to complete. When all the requests are made, 
        // return an array containing all responses
        toArray()
      )
    ).subscribe((cronExpressions: CronExpressions[]) => ...) // Expected result

注意 :

您可以使用mergeMap而不是concatMap来并行化请求。但是你需要知道你在做什么;)


1
投票

如果你不介意并行运行所有请求,你可以添加forkJoin

switchMap(observables => forkJoin(...observables))

或者,如果您想按顺序运行所有这些:

switchMap(observables => concat(...observables).pipe(toArray()))

而不是switchMap你可能想要使用concatMapmergeMap取决于你想要的行为。

© www.soinside.com 2019 - 2024. All rights reserved.