调度程序中的通知数量限制

问题描述 投票:1回答:1

我有一个冷的观察者和一个观察者。两者都很慢,但是观察者比可观察者慢。他们处理许多通知,因此我不想无限制地存储通知。

此示例大约需要30秒才能完成。非常慢。我相信他们可以在21秒内完成。

var subject = new Subject<int>();

subject.Subscribe(i =>
{
    Thread.Sleep(2000);
    Console.WriteLine($"{DateTime.Now:HH:mm:ss} - {i}");
});


Task.Run(() =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss} - Start");
    foreach (var i in Enumerable.Range(0, 10))
    {
        Thread.Sleep(1000);
        subject.OnNext(i);
    }
    Console.WriteLine($"{DateTime.Now:HH:mm:ss} - End");
});

此示例大约在20秒内完成,但是在观察者显示“ 4”之前,可观察到的结束。它指示调度程序将4到9存储在某个位置。我担心它是否存储超过1'000'000通知并抛出OutOfMemoryException。

var subject = new Subject<int>();

subject.ObserveOn(ThreadPoolScheduler.Instance).Subscribe(i =>
{
    Thread.Sleep(2000);
    Console.WriteLine($"{DateTime.Now:HH:mm:ss} - {i}");
});


Task.Run(() =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss} - Start");
    foreach (var i in Enumerable.Range(0, 10))
    {
        Thread.Sleep(1000);
        subject.OnNext(i);
    }
    Console.WriteLine($"{DateTime.Now:HH:mm:ss} - End");
});

这就是为什么我想限制调度程序中的通知数量。

编辑:图表

x : calculate or other task
S : send notification
R : revive notification

-- time -->

sample1
thread1:  xxxS       xxxS       xxxS
thread2:     Rxxxxxx    Rxxxxxx    Rxxxxxx

sample2
thread1:  xxxSxxxSxxxSxxxSxxxSxxxSxxxS
thread2:     RxxxxxxRxxxxxxRxxxxxxRxxxxxx

I want
thread1:  xxxSxxxS   xxxS   xxxS   xxxS
thread2:     RxxxxxxRxxxxxxRxxxxxxRxxxxxx
c# system.reactive
1个回答
0
投票

Rx合同要求通知必须序列化,因此,即使您可以指定Scheduler,它也更像是说“在这里,使用此调度程序来管理并发性。”

ThreadPoolScheduler仍将序列化通知,因此最终结果是它不会并行调用您的方法。

如果要异步执行,可以将其重写为:

subject.Subscribe(async i =>
{
    await Task.Delay(1000);
    Console.WriteLine($"{DateTime.Now:HH:mm:ss} - {i}");
});

但是潜在的问题是您的消费者落后于生产者。您可以考虑使用backpressure,或者如果您的应用程序是一系列数据处理任务,则还可以考虑使用出色的TPL数据流。

事件源与其最终接收器之间的管道中发生了什么,这是Rx发挥最大作用的地方。

© www.soinside.com 2019 - 2024. All rights reserved.