在每个事件类型上创建单个可观察对象并在上次取消订阅时销毁

问题描述 投票:3回答:1

想象一下消息流,每个消息都有相关的用户ID。对于每条消息,请获取关联的用户信息('user-fetch'可观察)。这些用户获取的可观察对象将保持活动状态,并监视目标用户的任何未来更改。

问题:

  1. 如何防止为给定的user-id创建重复的'user-fetch'可观察对象(并重用可能已经创建的observable)?
  2. 如何正确清理所有用户获取的observable以取消订阅和/或完成?

我在哪里:

  1. 我无法确定现有的运算符或方法来防止重复的observable,所以我写了一个类似于switchMap的运算符。我不喜欢它。这在实践中是如何完成的?
  2. 如果我能解决1,我相信正确的清理和重用的解决方案是refCount()
rxjs reactive-programming
1个回答
1
投票

如果我正确理解了问题,你有一个流发出id-s并基于该流事件,另一个流从远程位置(服务器)接收与id相关的一些数据。

我建议的解决方案是创建某种store来保存缓存数据,并在收到来自id流的消息时检查它并返回来自新请求或缓存数据的响应。

/** 
 * callBack end mocks an http request
 */
let callBackEnd$ = id => {
  customLog("___________________");
  customLog("Calling the server for " + id);
  customLog("___________________");

  return of({ id: id, data: `Some data about ${id}` });
};

/**
 * idStream$ mock the stream of id-s to be called trough http request
 */
let idStream$ = from([1, 2, 2, 3, 1, 5, 3, 4, 5]);

/**
 * We use reqStore$ to cache the already retrieved data
 */
let reqStore$ = new BehaviorSubject([]);

/**
 *  1. We subscribe to the message stream ( the stream that will tell us what to load )
 *
 *  2. With `latestFrom` we take the current store and check for any cached data, and return
 *  the cached data or the response of the new request
 *
 *  3. If the response of the `switchMap` doesn't exist in our store we add it.
 */
idStream$
  .pipe(
    tap(message => customLog(`Receiving command to retrieve : ${message}`)),
    withLatestFrom(reqStore$),
    switchMap(([e, store]) => {
      let elementSaved = store.find(x => x.id === e);
      return elementSaved ? of(elementSaved) : callBackEnd$(e);
    }),
    withLatestFrom(reqStore$),
    tap(([response, store]) => {
      if (!store.find(x => x.id === response.id)) {
        reqStore$.next([...store, response]);
      }
    })
  )
  .subscribe(([currentResponse, currentStore]) => {
    customLog("Receiving response for " + currentResponse.data);
  });

这是Codesandbox的现场演示我希望能帮助你:)

© www.soinside.com 2019 - 2024. All rights reserved.