我的目标可能用大理石图最容易解释。我有两个Obsservable,xs和ys。我想返回一个名为rs的Observable。
xs --x---x---x---x-------x---x---x-
\ \ \
ys ----y-----------y---y---y-------
| | |
rs ----x-----------x-------x-------
y y y
因此,我需要类似于CombineLatest的东西,除了它仅在xs后跟ys时才触发。该模式之外的其他xs或ys不应触发输出,应将其丢弃。
CombineLatest,Zip或And / Then / When不执行我所需要的,并且找不到任何方法来指定更复杂的联接结构。
我最终使用了Join。
var rs = xs.Join(ys,
_ => xs.Merge(ys),
_ => Observable.Empty<Unit>(),
Tuple.Create);
这是一篇很好的文章,解释了Join的工作原理:http://blogs.microsoft.co.il/blogs/bnaya/archive/2012/04/04/rx-join.aspx
对于此类问题,提供测试方案确实很有用。您已经给出了想要的小巧的大理石图,将其转换为测试用例真的很容易。然后其他论坛读者就可以获取测试代码,实现他们的想法,并知道他们是否满足您的要求。
[Test]
public void Should_only_get_latest_value_from_Y_but_not_while_x_produes()
{
// 11111111112222222222333
// 12345678901234567890123456789012
//xs --x---x---x---x-------x---x---x-
// \ \ \
//ys ----y-----------y---y---y-------
// | | |
//rs ----x-----------x-------x-------
// y y y
var testScheduler = new TestScheduler();
// 11111111112222222222333
// 12345678901234567890123456789012
//xs --x---x---x---x-------x---x---x-
var xs = testScheduler.CreateColdObservable(
new Recorded<Notification<char>>(3, Notification.CreateOnNext('1')),
new Recorded<Notification<char>>(7, Notification.CreateOnNext('2')),
new Recorded<Notification<char>>(10, Notification.CreateOnNext('3')),
new Recorded<Notification<char>>(15, Notification.CreateOnNext('4')),
new Recorded<Notification<char>>(23, Notification.CreateOnNext('5')),
new Recorded<Notification<char>>(27, Notification.CreateOnNext('6')),
new Recorded<Notification<char>>(31, Notification.CreateOnNext('7')));
// 11111111112222222222333
// 12345678901234567890123456789012
//ys ----y-----------y---y---y-------
var ys = testScheduler.CreateColdObservable(
new Recorded<Notification<char>>(5, Notification.CreateOnNext('A')),
new Recorded<Notification<char>>(17, Notification.CreateOnNext('B')),
new Recorded<Notification<char>>(21, Notification.CreateOnNext('C')),
new Recorded<Notification<char>>(25, Notification.CreateOnNext('D')));
//Expected :
//Tick x y
//5 1 A
//17 4 B
//25 5 D
var expected = new[]
{
new Recorded<Notification<Tuple<char, char>>>(5, Notification.CreateOnNext(Tuple.Create('1', 'A'))),
new Recorded<Notification<Tuple<char, char>>>(17, Notification.CreateOnNext(Tuple.Create('4', 'B'))),
new Recorded<Notification<Tuple<char, char>>>(25, Notification.CreateOnNext(Tuple.Create('5', 'D')))
};
var observer = testScheduler.CreateObserver<Tuple<char, char>>();
//Passes HOT, fails Cold. Doesn't meet the requirements due to Timeout anyway.
//xs.Select(x => ys.Take(1)
// .Timeout(TimeSpan.FromSeconds(0.5),
// Observable.Empty<char>())
// .Select(y => Tuple.Create(x, y))
// )
// .Switch()
// .Subscribe(observer);
//Passes HOT. Passes Cold
xs.Join(ys,
_ => xs.Merge(ys),
_ => Observable.Empty<Unit>(),
Tuple.Create)
.Subscribe(observer);
testScheduler.Start();
//You may want to Console.WriteLine out the two collections to validate for yourself.
CollectionAssert.AreEqual(expected, observer.Messages);
}
P.S。顺便说一下,Merge很好地用于了答案。
以另一种方式说明该图,每个x需要采用第一个 y 在y之前没有额外的x。那些第一个要求是Take(1)
。此时,您将有一个IObservable<IObservable<...>>
。第二个要求和前两个生成的类型指向Switch
。放在一起,您应该能够将图与以下项匹配:
var rs = xs.Select(x => ys.Take(1)
.Select(y => Tuple.Create(x, y))
)
.Switch()