GroupJoin,其中一个窗口在流完成时结束

问题描述 投票:0回答:1

我有两个数据源,它们正在从单独的线程中转储数据。我正在尝试通过密钥将两个来源都加入。我可以使用GroupJoin做到这一点。我必须使用Observable.Never,以便窗口永远不会结束。就加入流而言,一切工作正常。当两个源都完成转储数据时,它们在观察者上调用各自的onComplete。我期望两个源都收到OnComplete后,流式传输将结束。正如我所使用的Observable.Never流永远不会结束并且永远不会调用aggSource的Oncomplete事件。

是否有办法告诉Rx当两个源都收到OnComplete时关闭了窗口,而不是无限地使其保持打开状态?

我是Rx的新手,不确定是否可以实现这一目标。下面是代码片段。在此先感谢!

var l = Source1;
var r = Source2;

var q = r.GroupJoin(l,
                _ => Observable.Never<Unit>(), // windows from each left event going on forever 
                _ => Observable.Never<Unit>(), // windows from each right event going on forever
                (left, obsOfRight) => Tuple.Create(left, obsOfRight)); // create tuple of left event with observable of right events

        var joinSource =   q.SelectMany(e => {
            return e.Item2.Where(
               x =>
               {
                 return x.ID== e.Item1.ID;
               })
               .Select(v=>  (Item1:v.Value, Item2: e.Item1.Value));
        });



var aggSource = joinSource.GroupBy(x => x.Item1).SelectMany(grp =>
            {
                return grp.Scan(0.0, (accumulator, current) => accumulator + current.Item2).Select(z => (Group: grp.Key, Value: z));
            });



aggSource.Subscribe(x => dictResults[x.Group] = x,
              y => { Console.WriteLine("Error Ocurred: " + y.Message); completed = true; },
              () => { completed = true; Console.WriteLine("Subcription comnpleted"); }
              );
// dict results is dictionary which is my projection which is shown to View. Right now my view is just console window.
c# system.reactive rx.net
1个回答
1
投票

持续时间选择器控制连接窗口的重叠-当两个源之一完成时,我们都需要缩短重叠时间。首先,当发出LastOrDefaultAsync时,我们将使用OnComplete获得通知。

var either = Observable.CombineLatest(l.LastOrDefaultAsync(), r.LastOrDefaultAsync());

现在我们可以从示例中修改持续时间选择器:

var q = r.GroupJoin(l,
                _ => Observable.Never<Unit>().TakeUntil(either), // windows from each left event until l or r completes
                _ => Observable.Never<Unit>().TakeUntil(either), // windows from each right event until l or r completes
                (left, obsOfRight) => Tuple.Create(left, obsOfRight)); // create tuple of left event with observable of right events

这将导致级联,并且complete其余可观察的管道。

© www.soinside.com 2019 - 2024. All rights reserved.