我需要在工作流程中引入重试策略。假设有3个块以这种方式连接:
var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 };
var buffer = new BufferBlock<int>();
var processing = new TransformBlock<int, int>(..., executionOptions);
var send = new ActionBlock<int>(...);
buffer.LinkTo(processing);
processing.LinkTo(send);
因此,有一个缓冲区来存储数据,然后将其发送到转换块,该块一次处理不超过3个项目,然后将结果发送到操作块。
潜在地,在处理变换块期间可能会出现瞬时错误,如果错误是几次瞬时的,我想重试该块。
我知道块通常是不可重试的(传递到块中的代理可以使其可重试)。一种选择是包装传递给支持重试的委托。
我也知道,有一个很好的库TransientFaultHandling.Core
,它提供了瞬态故障的重试机制。这是一个很好的库,但就我而言不是。如果我将传递给转换块的委托包装到TransientFaultHandling.Core
方法中,则转换块内的message将被锁定,并且在重试完成或失败之前,转换块将无法收到新消息。想象一下,如果将所有3条消息都输入到重试中(例如,下一次重试将在2分钟之内)并且失败,则转换块将被卡住,直到至少一条消息离开转换块为止。
我看到的唯一解决方案是扩展RetryPolicy.ExecuteAsync
(实际上,TranformBlock
也足够了,然后手动重试(例如从ITargetBlock
开始:]]
here
例如将消息再次延迟放入转换块中,但在这种情况下,重试上下文(剩余的重试次数等)也应传递到此块中。听起来太复杂了...
没有人看到一种更简单的方法来实现工作流程块的重试策略吗?
我认为您几乎必须这样做,必须跟踪消息的剩余重试次数,并且必须以某种方式安排重试的尝试。
但是您可以通过将其封装在单独的方法中来使它更好。类似于:
do
{
try { return await transform(input); }
catch
{
if( numRetries <= 0 ) throw;
else Task.Delay(timeout).ContinueWith(t => processing.Post(message));
}
} while( numRetries-- > 0 );
我添加了代码来跟踪异常,因为我认为不应忽略故障,至少应该将它们记录下来。
此外,此代码在完成时也无法很好地工作:如果有重试等待延迟,而您// it's a private class, so public fields are okay
private class RetryingMessage<T>
{
public T Data;
public int RetriesRemaining;
public readonly List<Exception> Exceptions = new List<Exception>();
}
public static IPropagatorBlock<TInput, TOutput>
CreateRetryingBlock<TInput, TOutput>(
Func<TInput, Task<TOutput>> transform, int numberOfRetries,
TimeSpan retryDelay, Action<IEnumerable<Exception>> failureHandler)
{
var source = new TransformBlock<TInput, RetryingMessage<TInput>>(
input => new RetryingMessage<TInput>
{ Data = input, RetriesRemaining = numberOfRetries });
// TransformManyBlock, so that we can propagate zero results on failure
TransformManyBlock<RetryingMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryingMessage<TInput>, TOutput>(
async message =>
{
try
{
return new[] { await transform(message.Data) };
}
catch (Exception ex)
{
message.Exceptions.Add(ex);
if (message.RetriesRemaining == 0)
{
failureHandler(message.Exceptions);
}
else
{
message.RetriesRemaining--;
Task.Delay(retryDelay)
.ContinueWith(_ => target.Post(message));
}
return null;
}
});
source.LinkTo(
target, new DataflowLinkOptions { PropagateCompletion = true });
return DataflowBlock.Encapsulate(source, target);
}
该块,它将立即完成,并且重试将丢失。如果这对您来说是个问题,则您必须跟踪未完成的联系并在Complete()
完成且没有重试的情况下完成target
。
除了svick的出色答案,还有其他两个选择:
source
-只需将TransientFaultHandling.Core
设置为MaxDegreeOfParallelism
,以便其他消息可以通过。Unbounded
,以检查是否需要进行其他重试。这种方法更加复杂。如果您要重试,则必须向其块添加延迟,并添加LinkTo
以删除网格其余部分的失败/重试信息。以下是在这些假设下运行的两种方法TransformBlock
和CreateRetryTransformBlock
:
CreateRetryActionBlock
)。 CreateRetryActionBlock
,MaxDegreeOfParallelism
,BoundedCapacity
和CancellationToken
。 下面的实现使用EnsureOrdered
来控制第一次尝试的操作与延迟时间过去后重试的先前故障操作之间的并发级别。
SemaphoreSlim