我正在使用TPL管道设计与Stephen Cleary的 试试图书馆 简而言之,它把valueexception包装起来,并把它浮在管道中。因此,即使是在处理方法中抛出异常的项目,在最后当我用 await resultsBlock.Completion;Status=RunToCompletion. 所以我需要其他方法来登记故障项目。这里是一个小例子。

var downloadBlock = new TransformBlock<int, Try<int>>(construct => Try.Create(() =>
    return 1;
var processBlock = new TransformBlock<Try<int>, Try<int>>(construct => construct.Map(value =>
    return 1;
var resultsBlock = new ActionBlock<Try<int>>(construct =>
    if (construct.IsException)
        var exception = construct.Exception;
        switch (exception)
            case GoogleApiException gex:
                //_notificationService.NotifyUser("OMG, my dear sir, I think I messed something up:/"
                //Register that this item was faulted, so we know that we need to retry it.

一个解决方案是创建一个 List<int> FaultedItems; 在这里,我将把所有故障项目插入我的 Exception 处理块,然后在 await resultsBlock.Completion; 我可以检查列表是否为空,并为故障项目创建新的管道。我的问题是,如果我使用 List<int> 如果我决定玩这个游戏,我是否有可能遇到线程安全问题?MaxDegreeOfParallelism 设置,而我最好使用一些。ConcurrentCollection? 或者说这种方法有其他方面的缺陷?

c# task-parallel-library

我把一个重试块的实现从一个答案转换为一个 类似问题与Stephen Cleary的 Try 类型作为输入和输出。该方法的 CreateRetryTransformBlock 返回一个 TransformBlock<Try<TInput>, Try<TOutput>>和方法 CreateRetryActionBlock 返回的东西实际上是一个 ActionBlock<Try<TInput>>.

还有三个选项,即 MaxAttemptsPerItem, MinimumRetryDelayMaxRetriesTotal,在标准的基础上 执行选项.

public class RetryExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
    /// <summary>The limit after which an item is returned as failed.</summary>
    public int MaxAttemptsPerItem { get; set; } = 1;
    /// <summary>The minimum delay duration before retrying an item.</summary>
    public TimeSpan MinimumRetryDelay { get; set; } = TimeSpan.Zero;
    /// <summary>The limit after which the block transitions to a faulted
    /// state (unlimited is the default).</summary>
    public int MaxRetriesTotal { get; set; } = -1;

public class RetryLimitException : Exception
    public RetryLimitException(string message, Exception innerException)
        : base(message, innerException) { }

public static TransformBlock<Try<TInput>, Try<TOutput>>
    CreateRetryTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    if (dataflowBlockOptions == null)
        throw new ArgumentNullException(nameof(dataflowBlockOptions));
    int maxAttemptsPerItem = dataflowBlockOptions.MaxAttemptsPerItem;
    int maxRetriesTotal = dataflowBlockOptions.MaxRetriesTotal;
    TimeSpan retryDelay = dataflowBlockOptions.MinimumRetryDelay;
    if (maxAttemptsPerItem < 1) throw new ArgumentOutOfRangeException(
    if (maxRetriesTotal < -1) throw new ArgumentOutOfRangeException(
    if (retryDelay < TimeSpan.Zero) throw new ArgumentOutOfRangeException(

    var internalCTS = CancellationTokenSource

    var maxDOP = dataflowBlockOptions.MaxDegreeOfParallelism;
    var taskScheduler = dataflowBlockOptions.TaskScheduler;

    var exceptionsCount = 0;
    SemaphoreSlim semaphore;
    if (maxDOP == DataflowBlockOptions.Unbounded)
        semaphore = new SemaphoreSlim(Int32.MaxValue);
        semaphore = new SemaphoreSlim(maxDOP, maxDOP);

        // The degree of parallelism is controlled by the semaphore
        dataflowBlockOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;

        // Use a limited-concurrency scheduler for preserving the processing order
        dataflowBlockOptions.TaskScheduler = new ConcurrentExclusiveSchedulerPair(
            taskScheduler, maxDOP).ConcurrentScheduler;

    var block = new TransformBlock<Try<TInput>, Try<TOutput>>(async item =>
        // Continue on captured context after every await
        if (item.IsException) return Try<TOutput>.FromException(item.Exception);
        var result1 = await ProcessOnceAsync(item);
        if (item.IsException || result1.IsValue) return result1;
        for (int i = 2; i <= maxAttemptsPerItem; i++)
            await Task.Delay(retryDelay, internalCTS.Token);
            var result = await ProcessOnceAsync(item);
            if (result.IsValue) return result;
        return result1; // Return the first-attempt exception
    }, dataflowBlockOptions);

    dataflowBlockOptions.MaxDegreeOfParallelism = maxDOP; // Restore initial value
    dataflowBlockOptions.TaskScheduler = taskScheduler; // Restore initial value

    _ = block.Completion.ContinueWith(_ => internalCTS.Dispose(),

    return block;

    async Task<Try<TOutput>> ProcessOnceAsync(Try<TInput> item)
        await semaphore.WaitAsync(internalCTS.Token);
            var result = await item.Map(transform);
            if (item.IsValue && result.IsException)
            return result;

    void ObserveNewException(Exception ex)
        if (maxRetriesTotal == -1) return;
        uint newCount = (uint)Interlocked.Increment(ref exceptionsCount);
        if (newCount <= (uint)maxRetriesTotal) return;
        if (newCount == (uint)maxRetriesTotal + 1)
            internalCTS.Cancel(); // The block has failed
            throw new RetryLimitException($"The max retry limit " +
                $"({maxRetriesTotal}) has been reached.", ex);
        throw new OperationCanceledException();

public static ITargetBlock<Try<TInput>> CreateRetryActionBlock<TInput>(
    Func<TInput, Task> action,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateRetryTransformBlock<TInput, object>(async input =>
        await action(input).ConfigureAwait(false); return null;
    }, dataflowBlockOptions);
    var nullTarget = DataflowBlock.NullTarget<Try<object>>();
    return block;


var downloadBlock = CreateRetryTransformBlock(async (int construct) =>
    int result = await DownloadAsync(construct);
    return result;
}, new RetryExecutionDataflowBlockOptions()
    MaxDegreeOfParallelism = 10,
    MaxAttemptsPerItem = 3,
    MaxRetriesTotal = 100,
    MinimumRetryDelay = TimeSpan.FromSeconds(10)

var processBlock = new TransformBlock<Try<int>, Try<int>>(
    construct => construct.Map(async value =>
    return await ProcessAsync(value);

    new DataflowLinkOptions() { PropagateCompletion = true });


