使用Observable SelectMany的Reactive Extensions错误处理>>

问题描述 投票:0回答:1

我正在尝试使用反应式扩展程序库在某些文件夹上编写文件监视程序这个想法是监视硬盘驱动器文件夹中是否有新文件,等到文件完全写入后再将事件推送到订阅服务器。我不想使用FileSystemWatcher,因为它为同一文件引发两次Changed事件。

所以我以如下所示的“反应性方式”(希望如此)编写了它:

var provider = new MessageProviderFake();
var source = Observable.Interval(TimeSpan.FromSeconds(2), NewThreadScheduler.Default).SelectMany(_ => provider.GetFiles());
using (source.Subscribe(_ => Console.WriteLine(_.Name), () => Console.WriteLine("completed to Console")))
{
    Console.WriteLine("press Enter to stop");
    Console.ReadLine();
}

但是我找不到处理错误的“反应性方法”。例如,文件目录可以位于外部驱动器上,由于连接问题而变得不可用。因此,我添加了GetFilesSafe,它将处理来自Reactive Extensions的异常错误:

static IEnumerable<MessageArg> GetFilesSafe(IMessageProvider provider)
{
    try
    {
        return provider.GetFiles();
    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
        return new MessageArg[0];
    }
}

并且像这样使用它

var source = Observable.Interval(TimeSpan.FromSeconds(2), NewThreadScheduler.Default).SelectMany(_ => GetFilesSafe(provider));

即使引发异常,是否有更好的方法使SelectMany调用provider.GetFiles()?在这种情况下,我使用错误计数器将读取操作重复N次,然后失败(终止过程)。

Reactive Extensions中是否有“尝试N次,两次尝试之间等待Q秒”?

GetFilesSafe也存在问题:它会返回IEnumerable<MessageArg>进行延迟读取,但是它可能会在迭代时引发,并且异常会在SelectMany中的某个位置引发]

我正在尝试使用反应式扩展程序库在某些文件夹上编写文件监视程序,其想法是监视硬盘驱动器文件夹中的新文件,等待文件完全写入并将事件推送到...

c# observable system.reactive
1个回答
0
投票
有一个Retry扩展名,如果当前出现错误,它只是再次订阅可观察的内容,但这听起来像不能提供您想要的灵活性。

您可以使用Retry构建某些内容,如果外部错误发生,则该内容将订阅您提供的可观察对象。类似于以下内容(未经测试):

© www.soinside.com 2019 - 2024. All rights reserved.