使用数据流管道继续处理循环

问题描述 投票:0回答:3

我正在玩数据流并尝试学习如何使用它们。我已经找到了很多示例来说明如何使用不同的块,但它们都没有真正解释如何处理异常。

我的主要问题是如果发生异常或者前一个变换块的输出不是您所期望的,如何继续foreach循环。下面是我用来测试的简单Windows窗体应用程序。它只是一个按钮,循环显示数字列表并显示它们。

我在动作块中添加了一个if语句,表示如果数字= 5,则抛出异常。循环看起来像是在它遇到异常后继续处理,但它在输出异常后停止写输出。异常也永远不会转到foreach循环中的catch子句。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;

namespace DataFlowsTest
{
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }

        private void button1_Click(object sender, EventArgs e)
        {
            List<int> TestList = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

            var actionBlock = new ActionBlock<int>(item =>
            {
                if (item == 5)
                    throw new Exception("Blech.");
                Debug.WriteLine(item.ToString());
            });


            foreach(var number in TestList)
            {
                try
                {
                    actionBlock.Post(number);
                }
                catch(AggregateException ex)
                {
                    Debug.WriteLine(ex.Message);
                    continue;
                }
            }
            actionBlock.Complete();
        }
    }
}

此代码返回1 2 3 4异常抛出:DataFlowsTest.exe中的'System.Exception'DataFlowsTest.exe中出现类型'System.Exception'的异常,但未在用户代码Blech中处理。

c# tpl-dataflow
3个回答
1
投票

这是我实现它的方式。如果这种方法让你感兴趣,我可以在Github上分享更多。我使用数据流非常多,所以我已经基于这种方法实现了很多其他IDataFlow类。

The building block

基本上,通过将每个消息包装在一个名为Flow<T>的类中,我们可以实现Railway Oriented方法。流程有两种状态:失败或成功。成功的流量是Flow<T>被传递到下一个自定义数据流或在失败时连接到FailureBlock : ITargetBlock<IFlow>。 (基本上是一个处理异常,日志等的ActionBlock<IFlow>

我的基本Flow类如下所示:

public class Flow<T> : IFlow
{
  public T Value { get; private set; }
  public Exception Exception { get; private set; }
  public bool Success => Exception is null;
  public bool Failure => !Success;
  public void Fail(Exception exception) => Exception = exception;
  public Flow(T value) => Data = value;
  public Flow(Exception exception) => Fail(exception);
  public static Flow<T> FromValue<T>(T data) => new Flow<T>(data);
}
public interface IFlow
{
  bool Success { get; }
  bool Failure { get; }
  Exception Exception { get; }
  void Fail(Exception exception);
}

The resulting custom IDataflow

以下部分看起来很可怕,但不是。它本质上是一个TransformBlock包装器,具有两个额外的功能:

在这里输入代码1。每个自定义FlowBlock<T1,T2>包装方法到try { } catch { }

  1. LinkTo方法将成功的流量链接到下一个块,并将FailureBlock的失败链接起来
public class FlowBlock<TInput, TOutput>: IPropagatorBlock<Flow<TInput>, Flow<TOutput>>
{
    protected override ITargetBlock<Flow<TInput>> Target => TransformBlock;
    protected override ISourceBlock<Flow<TOutput>> Source => TransformBlock;
    private TransformBlock<Flow<TInput>, Flow<TOutput>> TransformBlock { get; }
    private FailureBlock FailureBlock { get; }
    public FlowBlock(
        Func<TInput, Task<TOutput>> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions,
        FailureBlock failureBlock)
    {
        TransformBlock = new TransformBlock<Flow<TInput>, Flow<TOutput>>(
            async inFlow =>
            {
                try
                {
                    return new Flow<TOutput>(await transform(inFlow.Data));
                }
                catch (Exception exception)
                {
                    return new Flow<TOutput>(exception);
                }
            },
            dataflowBlockOptions);
    }
    public override IDisposable LinkTo(
        ITargetBlock<Flow<TOutput>> target,
        DataflowLinkOptions linkOptions)
        => new Disposable(
            Source.LinkTo(target, linkOptions, flow => flow.Success),
            Source.LinkTo(OutputBlock, linkOptions, flow => flow.Failure));
}

如果您有兴趣,请在评论中告诉我,我很乐意打开一个包含更多细节的github回购。


0
投票

你扔了一个Exception但只捕捉AggregateException

为通用(Exception ex)或您想要捕获的类型添加一个catch


0
投票

你在你的区块内抛出异常。这将导致块移动到故障状态并将异常附加到它的Completion任务。

你的代码只在try/catch周围有actionBlock.Post,这不是抛出异常的地方。

由于异常附加到完成任务,在块之外捕获异常的唯一方法是await actionBlock.Completion,它将在块外重新抛出异常并允许你捕获(Exception ex)

防止堵塞;在块内捕获异常。如果异常离开块,则该块将出现故障并且不再接受新输入。

var actionBlock = new ActionBlock<int>(item =>
{
    try
    {
        if (item == 5)
            throw new Exception("Blech.");
        Debug.WriteLine(item.ToString());
    }
    catch (Exception ex)
    {
        Debug.WriteLine(ex);
    }

});

此外,您还可以处理Post或更好的SendAsync的结果并对块故障做出反应:

foreach (var number in TestList)
{
    if(!actionBlock.Post(number))
        actionBlock.Complete();
    try
    {
        await actionBlock.Completion;
    }
    catch (Exception ex)
    {
        Debug.WriteLine(ex.Message);
        //actionBlock is now dead
        break;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.