基于开始/结束事件在反应流中分组事件

问题描述 投票:1回答:1

我观察到一个反应性流Observable<Event>,目前它直接发出事件。基于一个BEGIN / END事件,我想在内部小组的支持下将此事件流分组。

输入流

我有类似以下事件:

Event(type = Data, groupId = 1)
Event(type = BeginGroup, groupId = 2)   // outer group begins
Event(type = Data, groupId = 2)
Event(type = BeginGroup, groupId = 3)   // inner group begins
Event(type = Data, groupId = 3)
Event(type = EndGroup, groupId = 3)     // inner group ends
Event(type = EndGroup, groupId = 2)     // outer group ends
Event(type = Data, groupId = 4)
Event(type = Data, groupId = 5)

编辑-其他先决条件:

我在示例数据中添加了ID,但通常不需要ID。该流将注意满足以下条件:

  • 每个BeginGroup事件将在某个时间后跟相应的EndGroup事件
  • 组内的事件(在我的示例中,具有相同的组构想)将总是置于流内的begin / end事件中,因此可以保证顺序(如上例所示)

所需输出流

因此,请确保每个事件不是其上的组的一部分,或者如果它不是真实组的一部分,则具有唯一的ID。我想将上述9个事件流归为以下4个事件流:

Event(type = Data, groupId = 1)
GroupEvent(groupId = 2, data = <LIST of Events and/or sub groups>) with following data:
    data = [
        Event(type = BeginGroup, groupId = 2)
        Event(type = Data, groupId = 2)
        GroupEvent(groupId = 3, data = <LIST of Events and/or sub groups>) with following data:
            data = [
                Event(type = BeginGroup, groupId = 3)
                Event(type = Data, groupId = 3)
                Event(type = EndGroup, groupId = 3)
            ]
        Event(type = EndGroup, groupId = 2)
    ]
Event(type = Data, groupId = 4)
Event(type = Data, groupId = 5)

我想要的是-逻辑

我想在发生BeginGroup类型的事件后立即开始分组,直到发生正确的EndGroup事件,然后在这两个事件之间对ALL事件进行分组,包括最终嵌套的分组事件。在开始/结束组事件之外的元素仅作为单个事件传递。

[这是我到目前为止尝试过的

我在问这个问题之前尝试了一些东西,但是我来自Java,甚至在那儿我很少使用window / buffer运算符,对它们的使用也很少。我看到C#中有类似GroupByUntil的运算符,因此我尝试使用它,但在我的示例中它从未发出任何东西。

var eventObservable: Observable<Event> = ...

// 1) make the observable hot so it can be resued inside the groupbyuntil operator
eventObservable = observable.Publish().RefCount();
var res = eventObservable
    .GroupByUntil(
        e => e.GroupId, // selector for groups => the group id can be used here
        grp => eventObservable.Where(e => e.GroupId != grp.Key) // stop a group as soon as the group id changes
    )
    .SelectMany(data => data.ToList()) // flatten the observable
    .Select(data => {
        // Convert the list of Events to GroupEvent if it contains more than 1 event
        var list = data.ToList();
        if (list.Count == 1)
            return list[0];
        return new GroupEvent(list);
    })

此方法不起作用,它根本不会发出任何东西(尽管eventObservable确实会正确发出其项目)。此外,它缺少嵌套组的支持(从理论上讲是否可行)。

有人可以向我解释如何解决我的问题吗?

c# system.reactive reactive
1个回答
1
投票

首先转储代码,然后进行解释。

© www.soinside.com 2019 - 2024. All rights reserved.