我有一个 ActionBlock
作为我的类的一部分。
private readonly ActionBlock<QueueMessage> block;
在构造函数中,我把它初始化成这样。
block = new ActionBlock<QueueMessage>(async s => await Process(s),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelism });
我的 Process
方法实现为。
private async Task Process(QueueMessage message)
{
using var tx = stateManager.CreateTransaction();
var result = await processQueue.TryGetValueAsync(tx, message.Data.Checksum);
if (!result.HasValue)
{
logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} is being processed, but doesn't exist on the queue.");
return;
}
try
{
await processorService.Process(message);
logger.Log(LogLevel.Debug, $"Message {message.Data.Checksum} processed, removing from queue.");
await processQueue.TryRemoveAsync(tx, message.Data.Checksum);
await tx.CommitAsync();
}
catch (Exception e)
{
logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} could not be processed, adding to retry queue. Error: {e}");
message.RetryCount++;
await retryQueue.EnqueueAsync(tx, message);
}
finally
{
await tx.CommitAsync();
}
}
当我收到一条我想处理的消息时,我就调用。
// Process the message
if (!await block.SendAsync(message, cancellationToken))
{
logger.Log(LogLevel.Error, $"Failed to enqueue data with checksum {message.Data.Checksum}");
}
第一次就成功了,消息被完美地处理了。 第二次我收到消息(也许是几秒钟后,也许是一分钟后,取决于系统的负载情况),那么 block.SendAsync
立即返回false。 在 Process
被运行。 当我在调试器下查看时,我注意到了 block.IsCompleted
是真的。 但是,我从来没有叫 block.Complete();
或任何东西。
为什么我的区块已经完成了,而我还可能有更多的工作要做? 谢谢
更新:我可能有了点线索,我似乎遇到了这个断点(在ActionBlock.cs源代码中)。
我似乎遇到了这个断点(在ActionBlock.cs源代码中)。
谜团解开了。 我的Action抛出了一个未处理的异常,这显然标志着该块已经完成,并且不允许任何新的消息。
我很想知道是否有什么方法可以控制这种行为,或者是否推荐总是有一个 try/catch
块在你的行动周围。