使用一个observable同步另一个observable

问题描述 投票:2回答:2

我想使用另一个用作时钟的同步观察一个observable,下面举例说明。

Main:     ---------abc----------------------------------
Clock:    -x-----x-----x-----x-----x-----x-----x-----x--
Expected: -------------a-----b-----c--------------------

我尝试使用Zip方法实现此同步,类似于RX文档(http://reactivex.io/documentation/operators/zip.html)中描述的示例:

mainValues.Zip(clockValues, (mainValue,clockValue) => mainValue)

问题是,当我测试这个实现时,它没有用。下面是我写的测试以检查预期的行为:

scheduler = new TestScheduler();


var mainValues = scheduler.CreateHotObservable(
    new Recorded<Notification<char>>(100, Notification.CreateOnNext('a')),
    new Recorded<Notification<char>>(101, Notification.CreateOnNext('b')),
    new Recorded<Notification<char>>(102, Notification.CreateOnNext('c')),
    new Recorded<Notification<char>>(103, Notification.CreateOnNext('d')),
    new Recorded<Notification<char>>(104, Notification.CreateOnNext('e')),
    new Recorded<Notification<char>>(105, Notification.CreateOnNext('f')),
    new Recorded<Notification<char>>(106, Notification.CreateOnNext('g')),
    new Recorded<Notification<char>>(107, Notification.CreateOnCompleted<char>()));

var clockValues = scheduler.CreateHotObservable(
    new Recorded<Notification<long>>(70, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(90, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(110, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(130, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(150, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(170, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(190, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(210, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(230, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(250, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(270, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(290, Notification.CreateOnNext(0L)),
    new Recorded<Notification<long>>(310, Notification.CreateOnCompleted<long>()));


var res = scheduler.Start(() => mainValues.Zip(clockValues, (mainValue, clockValue) => mainValue), 0, 70, long.MaxValue);

在这里低于预期值和我真正得到的(描述为评论):

res.Messages.AssertEqual(
    OnNext(110, 'a'), // Expected: 110, a - Actual: 100, a
    OnNext(130, 'b'), // Expected: 130, b - Actual: 110, b
    OnNext(150, 'c'), // Expected: 150, c - Actual: 130, c
    OnNext(170, 'd'), // Expected: 170, d - Actual: 150, d
    OnNext(190, 'e'), // Expected: 190, e - Actual: 170, e
    OnNext(210, 'f'), // Expected: 210, f - Actual: 190, f
    OnNext(230, 'g'));// Expected: 230, g - Actual: 210, g

问题是什么?是否正确使用Zip来同步两个可观察的?我是否错误地使用了TestScheduler?

c# reactive-programming system.reactive rx.net
2个回答
2
投票

放手一搏:

var query =
    Observable
        .Create<char?>(o =>
        {
            IDisposable inner = null;
            IDisposable subscription = 
                mainValues
                    .Publish(mvs =>
                    {
                        var q = new System.Collections.Generic.Queue<char>();
                        inner = mvs.Subscribe(mv => q.Enqueue(mv));
                        return clockValues.Select(x => q.Count > 0 ? q.Dequeue() : (char?)null);
                    })
                    .Subscribe(o);
            return new CompositeDisposable(inner, subscription);
        });

query.Subscribe(x => Console.WriteLine(x));
scheduler.Start();

如果按您希望的方式工作,请告诉我。如果它,我会弹出一些解释。


1
投票

改变

mainValues.Zip(clockValues, (mainValue,clockValue) => mainValues)

...至:

mainValues.Zip(clockValues, (mainValue,clockValue) => mainValue)

修理它?

© www.soinside.com 2019 - 2024. All rights reserved.