Rx.Net - 如何在触发关闭序列时聚合消息并发出中间输出?

问题描述 投票:0回答:1

目标

我试图解决的问题是聚合

(int Key, int Value)
类型的消息的序列(求和值),直到关闭的可观察对象发出“刷新”标记项。

例如,给定一系列

(Key,Value)

(1,1) - (2,3) - (1,3) - (Flush) - (2,1) - (2,5) - (Complete)

Flush
被触发时,我期望发出一个带有
[(1,4), (2,3)]
的数组。

序列完成后 - 应发出带有

[(2,6)]
的数组。

我尝试过的

我从

GroupBy
+
Aggregate
+
Buffer(flush)
作为实现所需行为的直观方式开始。但是,该序列不会发出中间结果,而是发出所有聚合作为最终输出。

这是完整代码

using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

var source = new Subject<(int Key, int Value)>();
var flush = new Subject<Unit>();

var completion = StartProcessing(source, flush);

source.OnNext((1, 1));
source.OnNext((2, 3));
source.OnNext((1, 3));

flush.OnNext(Unit.Default); // emit of [(1,4), (2,3)] is expected

source.OnNext((2, 1));
source.OnNext((2, 5));

source.OnCompleted(); // emit of [(2,6)] is expected

await completion;

return;

static Task StartProcessing(
    IObservable<(int Key, int Value)> source,
    IObservable<Unit> flush)
{
    return source
        .GroupBy
        (
            keySelector: message => message.Key
        )
        .SelectMany
        (
            group => group
                .Aggregate
                (
                    seed: (group.Key, Value: 0),
                    accumulator: (output, item) => (output.Key, output.Value + item.Value)
                )
        )
        .Buffer(flush)
        .Select(buffer => Observable.FromAsync(() => Flush(buffer)))
        .Merge()
        .ToTask();
}

static async Task Flush(IEnumerable<(int Key, int Value)> data)
{
    Console.WriteLine($"Flushing [{string.Join(", ", data)}]");

    // processing takes time
    await Task.Delay(TimeSpan.FromSeconds(1));
}

输出是

Flushing []
Flushing [(1, 4), (2, 9)]

据我所知(虽然不确定),除非序列完成,否则

Buffer
不会发出值,因为
Aggregate
不会传播中间值。

我还尝试使用

GroupByUntil
代替
GroupBy
Buffer
。通过这种结构,我在组关闭时获得中间输出,但是每个组的聚合都是一一发出的,并且不清楚如何将它们捆绑在一起,以便它们不会单独刷新。

Flushing (1, 4)
Flushing (2, 3)
Flushing (2, 6)

到目前为止,我陷入困境并寻求一些帮助,特别是:

  • 就 RX 而言,什么被认为是实现所需行为的正确方法?
  • 是否可以使用内置运算符来实现,还是应该创建自定义运算符(例如自定义 AggregateUntil)?
c# system.reactive .net-8.0 rx.net
1个回答
0
投票

重写你的例子。看起来它适合您的需求:

    static async Task Main()
    {
        var source = new Subject<(int Key, int Value)>();
        var flush = new Subject<Unit>();

        var completion = StartProcessing(source, flush);

        source.OnNext((1, 1));
        source.OnNext((2, 3));
        source.OnNext((1, 3));

        await Task.Delay(200);

        flush.OnNext(Unit.Default); // emit of [(1,4), (2,3)] is expected

        source.OnNext((2, 1));
        source.OnNext((2, 5));

        source.OnCompleted(); // emit of [(2,6)] is expected

        await completion;

        return;
    }

    static Task StartProcessing(
        IObservable<(int Key, int Value)> source,
        IObservable<Unit> flush)
    {
        return source
            .GroupBy(keySelector: message => message.Key)
            .Select(group => group
                    .Buffer(flush)
                    .Where(list => list.Any())
                    .Select(list => (group.Key, list.Sum(i=>i.Value))))
            .Merge()
            .Select(item => Observable.FromAsync(() => Flush(item)))
            .Merge()
            .ToTask();
    }

    static async Task Flush((int Key, int Value) data)
    {
        Console.WriteLine($"Flushing [{data}]");

        // processing takes time
        await Task.Delay(TimeSpan.FromSeconds(1));
    }

    private static async Task<object> SearchAsync(int i, CancellationToken ct)
    {
        System.Console.WriteLine($"Search {i}");
        await Task.Delay(2000*i);

        return i;
    }
© www.soinside.com 2019 - 2024. All rights reserved.