如何使rxjs暂停/恢复?

问题描述 投票:1回答:2

现在有一个数组,该数组的值是图像链接,例如:

const imageList = [
  'https://cdn.pixabay.com/photo/2020/02/16/20/29/new-york-4854718_960_720.jpg',
  'https://cdn.pixabay.com/photo/2020/02/14/16/04/mallorca-4848741_960_720.jpg',
  'https://cdn.pixabay.com/photo/2020/02/14/04/20/old-city-4847469_960_720.jpg',
  // more...
];

我想使用rxjs依次下载它们(我是电子应用,因此可以下载)

第一张图片的下载完成后,再下载第二张图片。当下载第三张图片时,用户单击暂停按钮并等待第三张图片的下载完成。然后,不再下载。当用户单击继续按钮时,下载从第四张图片开始。

我指的是本文:Buffering (lossless)中的https://medium.com/@kddsky/pauseable-observables-in-rxjs-58ce2b8c7dfd部分。本文中的代码是:

merge(
  source$.pipe( bufferToggle(off$, ()=>on$)  ),
  source$.pipe( windowToggle(on$, ()=>off$) )
).pipe(
  // then flatten buffer arrays and window Observables
  flatMap(x => x)
)

演示URL为:https://thinkrx.io/gist/cef1572743cbf3f46105ec2ba56228cd

但是此代码中有两个问题无法满足我的需求。我不知道如何修改它。

  1. 我使用redux-observable,所以我想通过两个动作来触发它们:(this.props.start() / this.props.pause()
  2. 恢复后,数据仍是一个接一个地传输,而不是一次发布

当前代码如下:

export const epicDownloadResources = (
  action$: ActionsObservable<AnyAction>,
  store$: StateObservable<RootState>,
) => {
  return action$.pipe(
    ofType(appActions.other.start()),
    of([
      'https://cdn.pixabay.com/photo/2020/02/16/20/29/new-york-4854718_960_720.jpg',
      'https://cdn.pixabay.com/photo/2020/02/14/16/04/mallorca-4848741_960_720.jpg',
      'https://cdn.pixabay.com/photo/2020/02/14/04/20/old-city-4847469_960_720.jpg',
    ]),
    mergeMap(() => {
      // code
    }),
    mergeMap((url: string) => {
      // downloading
    })
}

在实际产品中,它将下载公司的内部图片资源,而不是其他非版权图片。

reactjs redux rxjs rxjs6 redux-observable
2个回答
1
投票

这是我的尝试:

const urlArr = Array.from({ length: 10 }, (_, idx) => 'url/' + idx);
let idx = 0;

const urlEmitter = new Subject();
const url$ = urlEmitter.asObservable();
const stopEmitter = new Subject();
const stopValues$ = stopEmitter.asObservable();

const start$ = fromEvent(start, 'click');
start$.pipe(take(1)).subscribe(() => (stopEmitter.next(false), urlEmitter.next(urlArr[idx++]))); // Start emitting valeus

const stopSubscription = fromEvent(stop, 'click').pipe(mapTo(true)).subscribe(stopEmitter);

const shouldContinue$ = stopValues$.pipe(map(shouldStop => !shouldStop));

const subsequentStartClicks$ = start$.pipe(
  skip(1), // Skip the first time the `start` button is clicked
  observeOn(asyncScheduler), // Make sure it emits after the buffer has been initialized
  tap(() => stopEmitter.next(false)), // shouldContinue$ will emit `true`
);

const downloadedUrls$ = url$.pipe(
  mergeMap(url => of(url).pipe(delay(idx * 500))), // Simulate a file downloading
  combineLatest(shouldContinue$), // Make sure it acts according to `shouldContinue$`
  filter(([_, shouldContinue]) => shouldContinue),
  map(([v]) => v),
  tap((v) => console.warn(v)), // Debugging purposes...

  // Because of `combineLatest`
  // If you click `start` and wait some time, then you click `stop`
  // then you click again `start`, you might get the last value added to the array
  // this is because `shouldContinue$` emitted a new value
  // So you want to make sure you won't get the same value multiple times
  distinctUntilChanged(), 

  tap(() => urlEmitter.next(urlArr[idx++])),

  bufferToggle(
    start$,
    () => stopValues$.pipe(filter(v => !!v)),
  )
);

merge(
  subsequentStartClicks$.pipe(mapTo(false)), // Might not be interested in click events 
  downloadedUrls$
)
  .pipe(filter(v => !!v))
  .subscribe(console.log);

[我受到bufferToggle's图的启发。

我的想法是采用相同的方法,不同的是,仅当发出bufferToggle流时才发出值,而当start$发出时才应该停止值。

stop$

每次按下----X--X----------------------------------> urls$ -Y----------------------------------------> start$ -----------Z------------------------------> end$ -----------[X, X]-------------------------------> urls$ 按钮时,都会将stop值推入true流中。 stopValues$确定shouldContinue$流是否应继续推入值,具体取决于url$

stopValues$


0
投票

我采用了完全不同的方法。

如果我的理解正确,您想在用户恢复后继续进行操作。实际上,这意味着进行窗口或缓冲是没有意义的。

我认为,concatMap嵌套的一些简单使用就足够了。

StackBlitz

这将暂停开始新请求,直到resume $发出新值。我相信这是根据您的情况想要的。

我不确定在用户暂停请求后是否要在您的情况下完成第三个请求。我认为您可以,但是如果没有,您可以在请求后使用另一个concatMap来使用过滤器pauseResume $。

const pause$ = fromEvent(pauseButton, "click").pipe( mapTo(false), ); const resume$ = fromEvent(resumeButton, "click").pipe( mapTo(true), ); const pauseResume$ = merge(pause$,resume$).pipe( startWith(true), shareReplay(1), ) const source = of(...imageList).pipe( concatMap((url, i) => pauseResume$.pipe( tap(v => console.log(`should resume ${v}`)), filter(v => v), // Only resume if true take(1), concatMap(() => from(fetch(url)).pipe( delay(1000), // Simulate slow request mapTo(i) // just for logging which request we just completed ) ) ) ) ); source.subscribe(x => console.log(x));

© www.soinside.com 2019 - 2024. All rights reserved.