RxJs - 创建一个Observable数组的正确模式是什么?

问题描述 投票:0回答:1

我是RxJs的新手,我想知道创建一个Observable数组的正确模式是什么。

我想检索一个用户的帖子列表。帖子本身应该是Observable,我想把它们都保存在一个Observable数组中,这样当数组发生变化时,调用代码应该得到通知,并更新订阅到帖子 "列表 "中的任何内容。这很简单,但我也希望每个 "帖子 "都是Observable,所以如果我从中检索一个特定的帖子[i],我也应该能够订阅这些单独的对象。正确的方法是什么?

我使用的是Angular 9,我有。

public getPosts(): Observable<Array<Post>> {
    return new Promise((resolve, reject) = {
        let posts: Observable<Array<Post>> = new Observable<Array<Post>>();
        this.get<Array<Post>>('posts').subscribe(r => {
            posts = from(r);
            return resolve(posts);
        });
    });
}

这给了我一个 Observable<Array<Post>>但我应该如何创建一个 Observable<Array<Observable<Post>>>这是一个反模式吗?

javascript rxjs observable publish-subscribe
1个回答
1
投票

这都是为了方便,如果你的服务器为你提供帖子中变化的差异数据,那么就去创建 Observable<Observable<Post>[]>.


但是,在你的帖子中,存在多个问题。你不能把 ObservablesPromises. 该方法 getPosts 将只返回你从API获得的第一篇文章。


这是你要求的解决方案,但我不确定这是否是你真正想要的......

public getPosts(): Observable<Array<Observable<Post>>> {
  return this.get('posts').pipe(
    switchMap(posts => combineLatest(
      posts.map(post => this.get('post', post.id))
    )),
  );
}

1
投票

我不清楚你想达到什么目的,但你可能想要更多类似的东西。

@Injectable({providedIn:'root'})
export class PostService {
  // private replay subject will cache one value
  private postSource = new ReplaySubject<Post[]>(1)
  // public list of posts observable
  posts$ = this.postSource.asObservable();
  // function to select item by id out of list
  post$ = (id) => this.posts$.pipe(map(posts => posts.find(p => p.id === id)))

  getPosts() {
    // function to get remote posts
    return this.get<Post[]>('posts');
  }

  loadPosts() {
    // function to load posts and set the subject value
    this.getPosts().subscribe(posts => this.postSource.next(posts));
  }
}

你得先定义一下 get 函数和调用 loadPosts 每次你想更新列表时。


1
投票

给出的信息。

如果有任何语句错误,请告诉我,我会更新答案!

  1. get函数,返回一个由一个数组组成的观测值,数组中充满了帖子。
  2. 当职位发生变化时,get observable总是会发出信号。
  3. 观测值(Array>)内的值没有观测值,也不随时间变化。
this.get<Array<Post>>('posts')

可能的功能

  1. () => getPostById$
// This function returns you an observable with the post related to your id.
// If the id is not found the observable will not emit
// If the id is found the observable will only emit if the interface values have been changed
function getPostById$(id: string): Observable<Post> {
  // Returns you either the post or undefined if not found
  const findId = (id: string) => (posts: Array<Post>): Post | undefined =>
     posts.find(post => post.id === id);

  // Allows you only to emit, if id has been found
  const existingPost = (post: Post | undefined): boolean => post != null;

  // Allows you only to emit if your id has been changed
  const postComparator = (prevPost: Post, currPost: Post): boolean =>
    prevPost.value === currPost.value && prevPost.name === currPost.name;

  return this.get('posts').pipe(
    map(findId(id)),
    filter(existingPost),
    distinctUntilChanged(postComparator)
  );
}

  1. () => getPosts$
function getPosts$(): Observable<Array<Post>> {
  return this.get('posts');
}
  1. () => getStatePosts$
// This function allows to manage your own state
// 1. posts$: overwrites all posts
// 2. clear$: empties your posts$ observable
// 3. add$: adds one observable to the end of your posts
function statePosts$(posts$: Observable<Array<Posts>>, clear$: Observable<void>, add$: Observable<Post>): Observable<Array<Post>> {
  const updatePosts = (newPosts: Array<Posts>) => (oldPosts: Array<Posts>) => newPosts;

  const clearPosts = () => (oldPosts: Array<Posts>) => [];

  const addPost = (post: Post) => (oldPosts: Array<Posts>) => [...oldPosts, post];

  return merge(
    // You can add as much update functions as you need/want (eg: deleteId, addPostAtStart, sortPosts, ...)
    posts$.pipe(map(updatePosts)),
    clear$.pipe(map(clearPosts)),
    add$.pipe(map(addPost))
  ).pipe(
    // The fn in you scan is the (oldPosts: Array<Posts>) function from one of your three update functions (updatePosts, clearPosts and addPosts).
    // Whenever one of those three observables emits it first calls the left side of the function inside the map (post: Post) and returns a new function
    // When this function reaches the scan it gets the oldPosts and is able to update it
    scan((oldPosts, fn) => fn(oldPosts), [])
  )
}

// Usage
private posts$: Observable<Array<Post>> = this.get('posts');
private clear$: Subject<void> = new Subject();
private add$: Subject<Post> = new Subject();

public statePosts$ = getStatePosts(posts$, clear$, add$);

提示。 尽量先从返回语句中读取函数。然后再检查映射过滤或其他操作中发生了什么。希望我没有给你带来太多困惑。如果你有问题,欢迎提问。

© www.soinside.com 2019 - 2024. All rights reserved.