ITargetBlock中的重试策略

问题描述 投票:11回答:3

我需要在工作流程中引入重试策略。假设有3个块以这种方式连接:

var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 };
var buffer = new BufferBlock<int>();
var processing = new TransformBlock<int, int>(..., executionOptions);
var send = new ActionBlock<int>(...);

buffer.LinkTo(processing);
processing.LinkTo(send);

因此,有一个缓冲区来存储数据,然后将其发送到转换块,该块一次处理不超过3个项目,然后将结果发送到操作块。

潜在地,在处理变换块期间可能会出现瞬时错误,如果错误是几次瞬时的,我想重试该块。

我知道块通常是不可重试的(传递到块中的代理可以使其可重试)。一种选择是包装传递给支持重试的委托。

我也知道,有一个很好的库TransientFaultHandling.Core,它提供了瞬态故障的重试机制。这是一个很好的库,但就我而言不是。如果我将传递给转换块的委托包装到TransientFaultHandling.Core方法中,则转换块内的message将被锁定,并且在重试完成或失败之前,转换块将无法收到新消息。想象一下,如果将所有3条消息都输入到重试中(例如,下一次重试将在2分钟之内)并且失败,则转换块将被卡住,直到至少一条消息离开转换块为止。

我看到的唯一解决方案是扩展RetryPolicy.ExecuteAsync(实际上,TranformBlock也足够了,然后手动重试(例如从ITargetBlock开始:]]

here

例如将消息再次延迟放入转换块中,但在这种情况下,重试上下文(剩余的重试次数等)也应传递到此块中。听起来太复杂了...

没有人看到一种更简单的方法来实现工作流程块的重试策略吗?

c# task-parallel-library tpl-dataflow
3个回答
15
投票

我认为您几乎必须这样做,必须跟踪消息的剩余重试次数,并且必须以某种方式安排重试的尝试。

但是您可以通过将其封装在单独的方法中来使它更好。类似于:

do
 {
    try { return await transform(input); }
    catch
    { 
        if( numRetries <= 0 ) throw;
        else Task.Delay(timeout).ContinueWith(t => processing.Post(message));
    }
 } while( numRetries-- > 0 );

我添加了代码来跟踪异常,因为我认为不应忽略故障,至少应该将它们记录下来。

此外,此代码在完成时也无法很好地工作:如果有重试等待延迟,而您// it's a private class, so public fields are okay private class RetryingMessage<T> { public T Data; public int RetriesRemaining; public readonly List<Exception> Exceptions = new List<Exception>(); } public static IPropagatorBlock<TInput, TOutput> CreateRetryingBlock<TInput, TOutput>( Func<TInput, Task<TOutput>> transform, int numberOfRetries, TimeSpan retryDelay, Action<IEnumerable<Exception>> failureHandler) { var source = new TransformBlock<TInput, RetryingMessage<TInput>>( input => new RetryingMessage<TInput> { Data = input, RetriesRemaining = numberOfRetries }); // TransformManyBlock, so that we can propagate zero results on failure TransformManyBlock<RetryingMessage<TInput>, TOutput> target = null; target = new TransformManyBlock<RetryingMessage<TInput>, TOutput>( async message => { try { return new[] { await transform(message.Data) }; } catch (Exception ex) { message.Exceptions.Add(ex); if (message.RetriesRemaining == 0) { failureHandler(message.Exceptions); } else { message.RetriesRemaining--; Task.Delay(retryDelay) .ContinueWith(_ => target.Post(message)); } return null; } }); source.LinkTo( target, new DataflowLinkOptions { PropagateCompletion = true }); return DataflowBlock.Encapsulate(source, target); } 该块,它将立即完成,并且重试将丢失。如果这对您来说是个问题,则您必须跟踪未完成的联系并在Complete()完成且没有重试的情况下完成target


3
投票

除了svick的出色答案,还有其他两个选择:

  1. 您可以使用source-只需将TransientFaultHandling.Core设置为MaxDegreeOfParallelism,以便其他消息可以通过。
  2. 您可以修改块输出类型以包括故障指示和重试计数,并创建数据流循环,将过滤器传递给Unbounded,以检查是否需要进行其他重试。这种方法更加复杂。如果您要重试,则必须向其块添加延迟,并添加LinkTo以删除网格其余部分的失败/重试信息。

0
投票

以下是在这些假设下运行的两种方法TransformBlockCreateRetryTransformBlock

  1. 呼叫者希望处理所有项目,即使其中一些项目反复失败。
  2. 调用者有兴趣了解所有发生的异常,即使对于最终成功的项目也是如此(不适用于CreateRetryActionBlock)。
  3. 调用方可能希望为总重试次数设置一个上限,此后,该块应转换为故障状态。
  4. 呼叫者希望能够在与重试功能相关的选项之上设置普通块的所有可用选项,包括CreateRetryActionBlockMaxDegreeOfParallelismBoundedCapacityCancellationToken

下面的实现使用EnsureOrdered来控制第一次尝试的操作与延迟时间过去后重试的先前故障操作之间的并发级别。

SemaphoreSlim
© www.soinside.com 2019 - 2024. All rights reserved.