为什么在SelectMany中引发异常后,我的进程仍然有效,而类似的rx运算符中的异常会导致未处理的异常?

问题描述 投票:1回答:2

这里是一个示例程序,它对控制台输入进行了两次订阅(此处可观察到的源不相关)。在第一个订阅中,它使用Observable.SelectMany;在第二个订阅中,它使用内部使用System.Threading.Tasks.Dataflow包的类似SelectMany运算符。每个输入中的某些输入都会引发异常。异常已正确转发到Observer onError,该异常将其重新引发到默认的Subscribe实现中。观察到的行为是,在SelectMany中发生异常的情况下,该进程将继续运行,而在SelectManyPreseveOrder中发生异常的情况下,该进程将以未处理的异常终止。是什么导致了不同的行为?有没有办法在SelectManyPreserveOrder运算符中实现“更友好”的行为?这是一个使用Rx.Linq 2.2.5和System.Threading.Tasks.Dataflow 4.10.0的.net 4.6.1控制台应用程序:

class Program
{
    static async Task Main()
    {
        AppDomain.CurrentDomain.UnhandledException += (sender, args) => Console.WriteLine("App domain unhandled exception");
        TaskScheduler.UnobservedTaskException += (sender, args) => Console.WriteLine("Unobserved task exception");

        var consoleInput = Helper.ConsoleInput();

        consoleInput.SelectMany(async input =>
            {
                await Task.Delay(50).ConfigureAwait(false);
                if (input == "1")
                    throw new Exception("This exception is swallowed");
                return input;
            })
            .Subscribe(s => Console.WriteLine($"SelectMany: {s}"));

        consoleInput.SelectManyPreserveOrder(async input =>
            {
                await Task.Delay(50).ConfigureAwait(false);
                if (input == "2")
                    throw new Exception("This exception kills the process");
                return input;
            })
            .Subscribe(s => Console.WriteLine($"SelectMany (TPL Dataflow): {s}"));

        await Task.Delay(TimeSpan.FromMinutes(10)).ConfigureAwait(false);
    }
}

public static class ObservableExtension
{
    public static IObservable<TResult> SelectManyPreserveOrder<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector, int maxParallelBatches = 1)
    {
        return source.FromTplDataflow(() =>
            new TransformBlock<TSource, TResult>(selector,
                new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelBatches }));
    }

    public static IObservable<TResult> FromTplDataflow<T, TResult>(
        this IObservable<T> source, Func<IPropagatorBlock<T, TResult>> blockFactory)
    {
        return Observable.Defer(() =>
        {
            var block = blockFactory();
            return Observable.Using(() =>
            {
                var disposable = source.Subscribe(block.AsObserver());
                return Disposable.Create(dispose: () => disposable.Dispose());
            }, r => block.AsObservable());
        });
    }
}

public static class Helper
{
    public static IObservable<string> ConsoleInput()
    {
        return
            Observable
                .FromAsync(() => Console.In.ReadLineAsync())
                .Repeat()
                .Publish()
                .RefCount()
                .SubscribeOn(Scheduler.Default);
    }
}

有趣的是,从未调用过UnobservedTaskException处理程序。

.net system.reactive unhandled-exception tpl-dataflow
2个回答
0
投票

这里抛出异常,但它被抛出在unobserved Task continuation中。在.NET 4.5及更高版本中,运行时系统将自动处理未观察到的任务异常。 Here's a good article by Stephen Toub talking about that change

重要部分:

为了使开发人员更容易编写基于Task的异步代码,.NET 4.5更改了未观察到的异常的默认异常行为。尽管未观察到的异常仍将引发UnobservedTaskException事件(不这样做将是一个重大更改),但默认情况下该过程不会崩溃。相反,无论事件处理程序是否观察到异常,异常都会在引发事件后最终被吞噬。


-1
投票

SelectMany执行推迟到查询被实际“调用”为止。

代替.Subscribe()调用.ToArray()或.ToList()来执行SelectMany。那应该抛出异常。

© www.soinside.com 2019 - 2024. All rights reserved.