合并两个Observable,但仅当第一个Obs紧随第二个Obs之后

问题描述 投票:3回答:3

我的目标可能用大理石图最容易解释。我有两个Obsservable,xs和ys。我想返回一个名为rs的Observable。

xs --x---x---x---x-------x---x---x-
      \           \       \
ys ----y-----------y---y---y-------
       |           |       |
rs ----x-----------x-------x-------
       y           y       y

因此,我需要类似于CombineLatest的东西,除了它仅在xs后跟ys时才触发。该模式之外的其他xs或ys不应触发输出,应将其丢弃。

CombineLatest,Zip或And / Then / When不执行我所需要的,并且找不到任何方法来指定更复杂的联接结构。

c# system.reactive
3个回答
3
投票

我最终使用了Join。

var rs = xs.Join(ys, 
                 _ => xs.Merge(ys),
                 _ => Observable.Empty<Unit>(),
                 Tuple.Create);

这是一篇很好的文章,解释了Join的工作原理:http://blogs.microsoft.co.il/blogs/bnaya/archive/2012/04/04/rx-join.aspx


3
投票

对于此类问题,提供测试方案确实很有用。您已经给出了想要的小巧的大理石图,将其转换为测试用例真的很容易。然后其他论坛读者就可以获取测试代码,实现他们的想法,并知道他们是否满足您的要求。

[Test]
public void Should_only_get_latest_value_from_Y_but_not_while_x_produes()
{
    //            11111111112222222222333
    //   12345678901234567890123456789012
    //xs --x---x---x---x-------x---x---x-
    //      \           \       \
    //ys ----y-----------y---y---y-------
    //       |           |       |
    //rs ----x-----------x-------x-------
    //       y           y       y


    var testScheduler = new TestScheduler();
    //            11111111112222222222333
    //   12345678901234567890123456789012
    //xs --x---x---x---x-------x---x---x-
    var xs = testScheduler.CreateColdObservable(
        new Recorded<Notification<char>>(3, Notification.CreateOnNext('1')),
        new Recorded<Notification<char>>(7, Notification.CreateOnNext('2')),
        new Recorded<Notification<char>>(10, Notification.CreateOnNext('3')),
        new Recorded<Notification<char>>(15, Notification.CreateOnNext('4')),
        new Recorded<Notification<char>>(23, Notification.CreateOnNext('5')),
        new Recorded<Notification<char>>(27, Notification.CreateOnNext('6')),
        new Recorded<Notification<char>>(31, Notification.CreateOnNext('7')));

    //            11111111112222222222333
    //   12345678901234567890123456789012
    //ys ----y-----------y---y---y-------
    var ys = testScheduler.CreateColdObservable(
        new Recorded<Notification<char>>(5, Notification.CreateOnNext('A')),
        new Recorded<Notification<char>>(17, Notification.CreateOnNext('B')),
        new Recorded<Notification<char>>(21, Notification.CreateOnNext('C')),
        new Recorded<Notification<char>>(25, Notification.CreateOnNext('D')));


    //Expected :
    //Tick  x   y
    //5     1   A
    //17    4   B
    //25    5   D
    var expected = new[]
    {
        new Recorded<Notification<Tuple<char, char>>>(5, Notification.CreateOnNext(Tuple.Create('1', 'A'))),
        new Recorded<Notification<Tuple<char, char>>>(17, Notification.CreateOnNext(Tuple.Create('4', 'B'))),
        new Recorded<Notification<Tuple<char, char>>>(25, Notification.CreateOnNext(Tuple.Create('5', 'D')))
    };

    var observer = testScheduler.CreateObserver<Tuple<char, char>>();

    //Passes HOT, fails Cold. Doesn't meet the requirements due to Timeout anyway.
    //xs.Select(x => ys.Take(1)
    //                    .Timeout(TimeSpan.FromSeconds(0.5),
    //                            Observable.Empty<char>())
    //                    .Select(y => Tuple.Create(x, y))
    //    )
    //    .Switch()
    //    .Subscribe(observer);

    //Passes HOT. Passes Cold
    xs.Join(ys,
            _ => xs.Merge(ys),
            _ => Observable.Empty<Unit>(),
            Tuple.Create)
        .Subscribe(observer);

    testScheduler.Start();

    //You may want to Console.WriteLine out the two collections to validate for yourself.

    CollectionAssert.AreEqual(expected, observer.Messages);
}

P.S。顺便说一下,Merge很好地用于了答案。


0
投票

以另一种方式说明该图,每个x需要采用第一个 y 在y之前没有额外的x。那些第一个要求是Take(1)。此时,您将有一个IObservable<IObservable<...>>。第二个要求和前两个生成的类型指向Switch。放在一起,您应该能够将图与以下项匹配:

var rs = xs.Select(x => ys.Take(1)
                          .Select(y => Tuple.Create(x, y))
                  )
           .Switch()
© www.soinside.com 2019 - 2024. All rights reserved.