我观察到一个反应性流Observable<Event>
,目前它直接发出事件。基于一个BEGIN / END事件,我想在内部小组的支持下将此事件流分组。
输入流
我有类似以下事件:
Event(type = Data, groupId = 1)
Event(type = BeginGroup, groupId = 2) // outer group begins
Event(type = Data, groupId = 2)
Event(type = BeginGroup, groupId = 3) // inner group begins
Event(type = Data, groupId = 3)
Event(type = EndGroup, groupId = 3) // inner group ends
Event(type = EndGroup, groupId = 2) // outer group ends
Event(type = Data, groupId = 4)
Event(type = Data, groupId = 5)
编辑-其他先决条件:
我在示例数据中添加了ID,但通常不需要ID。该流将注意满足以下条件:
BeginGroup
事件将在某个时间后跟相应的EndGroup
事件所需输出流
因此,请确保每个事件不是其上的组的一部分,或者如果它不是真实组的一部分,则具有唯一的ID。我想将上述9个事件流归为以下4个事件流:
Event(type = Data, groupId = 1)
GroupEvent(groupId = 2, data = <LIST of Events and/or sub groups>) with following data:
data = [
Event(type = BeginGroup, groupId = 2)
Event(type = Data, groupId = 2)
GroupEvent(groupId = 3, data = <LIST of Events and/or sub groups>) with following data:
data = [
Event(type = BeginGroup, groupId = 3)
Event(type = Data, groupId = 3)
Event(type = EndGroup, groupId = 3)
]
Event(type = EndGroup, groupId = 2)
]
Event(type = Data, groupId = 4)
Event(type = Data, groupId = 5)
我想要的是-逻辑
我想在发生BeginGroup
类型的事件后立即开始分组,直到发生正确的EndGroup
事件,然后在这两个事件之间对ALL事件进行分组,包括最终嵌套的分组事件。在开始/结束组事件之外的元素仅作为单个事件传递。
[这是我到目前为止尝试过的
我在问这个问题之前尝试了一些东西,但是我来自Java,甚至在那儿我很少使用window / buffer运算符,对它们的使用也很少。我看到C#中有类似GroupByUntil
的运算符,因此我尝试使用它,但在我的示例中它从未发出任何东西。
var eventObservable: Observable<Event> = ...
// 1) make the observable hot so it can be resued inside the groupbyuntil operator
eventObservable = observable.Publish().RefCount();
var res = eventObservable
.GroupByUntil(
e => e.GroupId, // selector for groups => the group id can be used here
grp => eventObservable.Where(e => e.GroupId != grp.Key) // stop a group as soon as the group id changes
)
.SelectMany(data => data.ToList()) // flatten the observable
.Select(data => {
// Convert the list of Events to GroupEvent if it contains more than 1 event
var list = data.ToList();
if (list.Count == 1)
return list[0];
return new GroupEvent(list);
})
此方法不起作用,它根本不会发出任何东西(尽管eventObservable
确实会正确发出其项目)。此外,它缺少嵌套组的支持(从理论上讲是否可行)。
有人可以向我解释如何解决我的问题吗?
首先转储代码,然后进行解释。