目标
我试图解决的问题是聚合
(int Key, int Value)
类型的消息的序列(求和值),直到关闭的可观察对象发出“刷新”标记项。
例如,给定一系列
(Key,Value)
项
(1,1) - (2,3) - (1,3) - (Flush) - (2,1) - (2,5) - (Complete)
当
Flush
被触发时,我期望发出一个带有 [(1,4), (2,3)]
的数组。
序列完成后 - 应发出带有
[(2,6)]
的数组。
我尝试过的
我从
GroupBy
+ Aggregate
+ Buffer(flush)
作为实现所需行为的直观方式开始。但是,该序列不会发出中间结果,而是发出所有聚合作为最终输出。
这是完整代码
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
var source = new Subject<(int Key, int Value)>();
var flush = new Subject<Unit>();
var completion = StartProcessing(source, flush);
source.OnNext((1, 1));
source.OnNext((2, 3));
source.OnNext((1, 3));
flush.OnNext(Unit.Default); // emit of [(1,4), (2,3)] is expected
source.OnNext((2, 1));
source.OnNext((2, 5));
source.OnCompleted(); // emit of [(2,6)] is expected
await completion;
return;
static Task StartProcessing(
IObservable<(int Key, int Value)> source,
IObservable<Unit> flush)
{
return source
.GroupBy
(
keySelector: message => message.Key
)
.SelectMany
(
group => group
.Aggregate
(
seed: (group.Key, Value: 0),
accumulator: (output, item) => (output.Key, output.Value + item.Value)
)
)
.Buffer(flush)
.Select(buffer => Observable.FromAsync(() => Flush(buffer)))
.Merge()
.ToTask();
}
static async Task Flush(IEnumerable<(int Key, int Value)> data)
{
Console.WriteLine($"Flushing [{string.Join(", ", data)}]");
// processing takes time
await Task.Delay(TimeSpan.FromSeconds(1));
}
输出是
Flushing []
Flushing [(1, 4), (2, 9)]
据我所知(虽然不确定),除非序列完成,否则
Buffer
不会发出值,因为Aggregate
不会传播中间值。
我还尝试使用
GroupByUntil
代替GroupBy
和Buffer
。通过这种结构,我在组关闭时获得中间输出,但是每个组的聚合都是一一发出的,并且不清楚如何将它们捆绑在一起,以便它们不会单独刷新。
Flushing (1, 4)
Flushing (2, 3)
Flushing (2, 6)
到目前为止,我陷入困境并寻求一些帮助,特别是:
重写你的例子。看起来它适合您的需求:
static async Task Main()
{
var source = new Subject<(int Key, int Value)>();
var flush = new Subject<Unit>();
var completion = StartProcessing(source, flush);
source.OnNext((1, 1));
source.OnNext((2, 3));
source.OnNext((1, 3));
await Task.Delay(200);
flush.OnNext(Unit.Default); // emit of [(1,4), (2,3)] is expected
source.OnNext((2, 1));
source.OnNext((2, 5));
source.OnCompleted(); // emit of [(2,6)] is expected
await completion;
return;
}
static Task StartProcessing(
IObservable<(int Key, int Value)> source,
IObservable<Unit> flush)
{
return source
.GroupBy(keySelector: message => message.Key)
.Select(group => group
.Buffer(flush)
.Where(list => list.Any())
.Select(list => (group.Key, list.Sum(i=>i.Value))))
.Merge()
.Select(item => Observable.FromAsync(() => Flush(item)))
.Merge()
.ToTask();
}
static async Task Flush((int Key, int Value) data)
{
Console.WriteLine($"Flushing [{data}]");
// processing takes time
await Task.Delay(TimeSpan.FromSeconds(1));
}
private static async Task<object> SearchAsync(int i, CancellationToken ct)
{
System.Console.WriteLine($"Search {i}");
await Task.Delay(2000*i);
return i;
}