在RxJs中,如何在mergemap中执行顺序,同时按顺序返回结果数组?

问题描述 投票:0回答:1

我有一个功能,可以将一条消息分解为多个消息块。我需要将这些消息发布到我的发布功能中。但是我不希望Observable阻止其他传入的帖子。我的解决方案是合并映射中concat运算符的某种组合,但我似乎无法使其正常工作]

我不确定是否可以绘制图表,但这是我的尝试:

-1-2------3|->
--4--5--6|->
desired output:
[4,5,6]
[1,2,3]

我需要请求1在3之前的2之前执行,在5之前的6之前执行,而6之前的执行。用英语来说,我想我会有一个可观察的可观察对象,我希望将其映射到可观察的流中,然后为每个可观察的输出流映射到一个标准数组。我只是不确定如何确切地做到这一点。我一直在弄乱代码很长时间,试图将我刚才所说的概念化,这是我的最佳尝试:

    interface SendInfo {
        message: discord.Message
        content: string
        options?: discord.MessageOptions
    }
    export const sendMessage$: Subject<SendInfo> = new Subject();

    const regex = /[\s\S]{1,1980}(?:\n|$)/g;
    export const sentMessages$ = sendMessage$.pipe(
        mergeMap(
            (input: SendInfo):
            Observable<(discord.Message | discord.Message[] | null)[]> => {
                const chunks: string[] = input.content.match(regex) || [];
                const superObservable: Observable<Observable<discord.Message | discord.Message[] | null>> = concat(chunks.map(
                    (chunk: string):
                    Observable<discord.Message | discord.Message[] | null> => {
                        const bound = input.message.channel.send.bind(
                            undefined,
                            chunk,
                            input.options,
                        );
                        return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
                            bound,
                        );
                    }
                ));

                return superObservable.pipe(
                    mergeMap(e => e),
                    toArray(),
                );
            }
        ),
        tap((e): void => Utils.logger.fatal(e)),
        share(),
    );

我的输出:

[2019-10-21T17:24:15.322] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg1' } ]
[2019-10-21T17:24:15.324] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg2' } ]
[2019-10-21T17:24:15.325] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg3' } ]

我觉得我已经接近解决方案了,但是我无法弄清楚如何将其准确地合并为一个数组。我也不知道它在功能上是否正确。

arrays rxjs observable concat mergemap
1个回答
0
投票

我已经在此问题RxJS: MergeMap with Preserving Input order中解决了保存订单的问题

因此,使用我的parallelExecute,您可以将值减少为一个数组。

parallelExecute(...yourObservables).pipe(
  reduce((results, item) => [...results, item], [])
);

这里是parallelExecute函数。

const { BehaviorSubject, Subject } = rxjs;
const { filter } = rxjs.operators;

const parallelExecute = (...obs$) => {
  const subjects = obs$.map(o$ => {
    const subject$ = new BehaviorSubject();
    const sub = o$.subscribe(o => { subject$.next(o); });
    return { sub: sub, obs$: subject$.pipe(filter(val => val)) };
  });
  const subject$ = new Subject();
  sub(0);
  function sub(index) {
    const current = subjects[index];
    current.obs$.subscribe(c => {
      subject$.next(c);
      current.obs$.complete();
      current.sub.unsubscribe();
      if (index < subjects.length -1) {
        sub(index + 1);
      } else {
        subject$.complete();
      }
    });
  }
  return subject$;
}
© www.soinside.com 2019 - 2024. All rights reserved.