rx.net合并重置流

问题描述 投票:1回答:1

我们有一个正在运行的服务,该服务处理从系统x到系统y的消息。它基本上如下所示:

aSystem.Messages.Subscribe(message => 
{
   try
   {
      ProcessMessage(message);
   }
   catch(Exception ex)
   {
      _logger.LogFatal(ex.Message);
   }
})

问题是,我们每秒至少收到一封邮件,并且LogFatal邮件配置为发送电子邮件。结果,邮箱在某个时刻爆炸了。

通过添加自定义的Logging类来“改进”该代码,该类将保留最后一个时间戳。根据该时间戳记,消息将记录或不记录。

这看起来很麻烦,我认为这是Rx.NET的完美方案。我们需要以下内容:

  • 1)记录字符串是否更改
  • 2)记录是否经过一定时间

我尝试的是以下内容:

var logSubject = new Subject<string>();
var logMessagesChangedStream = logSubject.DistinctUntilChanged(); // log on every message change
var logMessagesSampleStream = logSubject.Sample(TimeSpan.FromSeconds(10)); // log at least every 10 seconds

var subscription = logMessagesChangedStream.Merge(logMessagesSampleStream).Subscribe(result =>
{
    _logger.LogFatal(result);
});

aSystem.Messages.Subscribe(message => 
{
   try
   {
      ProcessMessage(message);
   }
   catch(Exception ex)
   {
      logSubject.OnNext(ex.Message);
   }
})

似乎正在运行,但这将记录两次消息,一次是DistinctUntilChanged,一次是Sample。因此,如果其中一个发出值,我应该以某种方式重置流。它们可以独立完美地工作,一旦合并,就应该互相倾听;-)

c# system.reactive rx.net
1个回答
0
投票

[有一个模棱两可的运算符Amb,它竞争两个序列以查看哪个先赢。

Observable.Amb(logMessagesChangedStream, logMessagesSampleStream) 

获胜流继续传播到最后-我们不想要那样。我们有兴趣为下一个价值再次开始比赛。让我们这样做:

    Observable.Amb(logMessagesChangedStream, logMessagesSampleStream)
      .Take(1)
      .Repeat()

[现在的最后一个问题是,每当我们重新开始比赛时,DistinctUntilChanged都会失去其状态,并且其行为是将立即获得的第一个值推入。让我们通过将其变为热门观察者来解决此问题。

logSubject.DistinctUntilChanged().Publish();

全部放在一起:

var logSubject = new Subject<string>();
var logMessagesChangedStream = logSubject.DistinctUntilChanged().Publish(); // log on every message change
var logMessagesSampleStream = logSubject.Sample(TimeSpan.FromSeconds(5)); // log at least every 5 seconds    

var oneof =
        Observable
          .Amb(logMessagesChangedStream, logMessagesSampleStream)
          .Take(1)
          .Repeat(); 

   logMessagesChangedStream.Connect();
   oneof.Timestamp().Subscribe(c => Console.WriteLine(c));

将10秒更改为5,because

Test

    Action<string> onNext = logSubject.OnNext;

    onNext("a");
    onNext("b");
    Delay(1000, () => { onNext("c"); onNext("c"); onNext("c"); onNext("d"); });
    Delay(3000, () => { onNext("d"); onNext("e"); });
    Delay(6000, () => { onNext("e"); });
    Delay(10000, () => { onNext("e"); });

输出

a@1/30/2020 12:17:52 PM +00:00
b@1/30/2020 12:17:52 PM +00:00
c@1/30/2020 12:17:53 PM +00:00
d@1/30/2020 12:17:53 PM +00:00
e@1/30/2020 12:17:55 PM +00:00
e@1/30/2020 12:18:00 PM +00:00
e@1/30/2020 12:18:05 PM +00:00
© www.soinside.com 2019 - 2024. All rights reserved.