ActionBlock在我想要它完成之前就出现了。

问题描述 投票:0回答:1

我有一个 ActionBlock 作为我的类的一部分。

private readonly ActionBlock<QueueMessage> block;

在构造函数中,我把它初始化成这样。

block = new ActionBlock<QueueMessage>(async s => await Process(s),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelism });

我的 Process 方法实现为。

private async Task Process(QueueMessage message)
{
    using var tx = stateManager.CreateTransaction();

    var result = await processQueue.TryGetValueAsync(tx, message.Data.Checksum);
    if (!result.HasValue)
    {
        logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} is being processed, but doesn't exist on the queue.");
        return;
    }

    try
    {
        await processorService.Process(message);
        logger.Log(LogLevel.Debug, $"Message {message.Data.Checksum} processed, removing from queue.");
        await processQueue.TryRemoveAsync(tx, message.Data.Checksum);
        await tx.CommitAsync();
    }
    catch (Exception e)
    {
        logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} could not be processed, adding to retry queue.  Error: {e}");

        message.RetryCount++;
        await retryQueue.EnqueueAsync(tx, message);
    }
    finally
    {
        await tx.CommitAsync();
    }
}

当我收到一条我想处理的消息时,我就调用。

// Process the message
if (!await block.SendAsync(message, cancellationToken))
{
    logger.Log(LogLevel.Error, $"Failed to enqueue data with checksum {message.Data.Checksum}");
}

第一次就成功了,消息被完美地处理了。 第二次我收到消息(也许是几秒钟后,也许是一分钟后,取决于系统的负载情况),那么 block.SendAsync 立即返回false。 在 Process 被运行。 当我在调试器下查看时,我注意到了 block.IsCompleted 是真的。 但是,我从来没有叫 block.Complete(); 或任何东西。

为什么我的区块已经完成了,而我还可能有更多的工作要做? 谢谢

更新:我可能有了点线索,我似乎遇到了这个断点(在ActionBlock.cs源代码中)。

我似乎遇到了这个断点(在ActionBlock.cs源代码中)。

enter image description here

c# .net task-parallel-library
1个回答
0
投票

谜团解开了。 我的Action抛出了一个未处理的异常,这显然标志着该块已经完成,并且不允许任何新的消息。

我很想知道是否有什么方法可以控制这种行为,或者是否推荐总是有一个 try/catch 块在你的行动周围。

© www.soinside.com 2019 - 2024. All rights reserved.