RxJS 6-取消/结束管道

问题描述 投票:5回答:3

特别是使用RxJS 6的新版本,特别是管道运算符。当前正在使用管道来获取API调用的结果,并将其传递给一系列其他任务。

一切正常,但是在遇到问题时似乎无法找到取消或终止管道的方法。例如,我正在使用tap运算符检查该值是否为null。然后,我抛出一个错误,但是管道仍然似乎移至下一个任务,在本例中为concatmap。

因此,您如何过早终止或取消管道?预先感谢。

getData(id: String): Observable<any[]> {
return this.http.get<any>(`${this.baseUrl}/path/${id}`).pipe(
   tap(evt => {
    if (evt == null) {
      return throwError(new Error("No data found..."));
    }
  }),
concatMap(
  evt =>
     <Observable<any[]>>(
        this.http.get<any[]>(
    `${this.baseUrl}/path/relatedby/${evt.child_id}`
      ).map(res =>( {"response1":evt, "response2":res}) )
 )
),
retry(3),
catchError(this.handleError("getData", []))
);}
angular rxjs pipe tap
3个回答
5
投票

我尝试了这种堆叠闪电战,从中获得了基本的概念,并且奏效了。它取消了剩余的操作。请参阅下面的链接。

https://stackblitz.com/edit/angular-4ctwsd?file=src%2Fapp%2Fapp.component.ts

我在您的代码和我的代码之间看到的区别是,我使用的是throw而不是throwError(这是您写的东西吗?而我只是抛出错误...不返回抛出的错误。

这里是参考代码:

import { Component } from '@angular/core';
import { of, from } from 'rxjs';
import { map, catchError, tap, retry} from 'rxjs/operators';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent {
  name = 'Angular 6';

  constructor() {
    of('a', 'b', 'c', 'd')
      .pipe(
       map(x => {
        if (x === 'c') {
          throw 'An error has occurred';
        }
        return x;
       }),
       tap(x => console.log('In tap: ', x)),
       retry(3),
       catchError(() => of('caught error!'))
      )
      .subscribe(x => console.log(x));
  }
}

1
投票

RXJS会抛出错误并停止执行。您在管道中应用的操作员是错误的选择。

您正在使用的'tap'运算符仅用于产生副作用,即对DOM,console.log进行更改,或更改组件上具有的某些变量的值(即this.counter = someValue)。 tap运算符并不是要更改RXJS“流”,它只会返回与接收到的相同的可观察值。 https://rxjs-dev.firebaseapp.com/api/operators/tap

另一方面,在流上起作用的运算符,例如'map',可能会引发错误。看到这个stackblitz

总而言之,代码是

getData(): Observable<any[]> {
    return this.httpGet('basePath').pipe(
      map(evt => {
        if (evt === null) {
          return throwError(new Error("No data found..."));
        } else {
          return evt;
        }
      }),
      concatMap(evt => {
        return this.httpGet(`childPath/${evt.serverResult}`); 
      }),
      map(res => {
        return {
          "response2": res.serverResult
        };
      }),
      retry(3),
      catchError(error => {
        console.log('error handled!');
        return of(null);
      })
    )
  }

  private httpGet(someString: string): Observable<{ serverResult: number }> {
    return timer(1000).pipe( // fake a server call that takes 1 second
      map(_ => { // fake a server result
      // Comment out the valid return and uncomment the null return to see it in action
        //return null;
        return { 
          serverResult: 1 
        }
      }) 
    );
  }

如果您不想将错误转换为流中的有效值,请在订阅函数的错误回调中进行处理。


-1
投票

可观察的函数总是内部包装在try / catch块中。流中抛出的任何错误都将终止流,并向订阅者或操作员调用任何错误回调。

这里的问题是throwError()。我不知道该函数是什么,也无法将其识别为Observable运算符,但看起来它正在被用作一个(并且从未订阅过)。

tap通常仅用于副作用,因为它完全无法影响流中的值。但是,正如我之前在try / catch块中提到的那样,您应该只能够throw一个新的Error,然后流将处理其余的内容。

getData(id: String): Observable<any[]> {
return this.http.get<any>(`${this.baseUrl}/path/${id}`).pipe(
   tap(evt => {
    if (evt == null) {
      throw new Error("No data found...");
    }
  }),
concatMap(
  evt =>
     <Observable<any[]>>(
        this.http.get<any[]>(
    `${this.baseUrl}/path/relatedby/${evt.child_id}`
      ).map(res =>( {"response1":evt, "response2":res}) )
 )
),
retry(3),
catchError(this.handleError("getData", []))
);}
© www.soinside.com 2019 - 2024. All rights reserved.