异步处理程序中的Mediator死锁在后台工作程序中等待-如何检测调用自身的线程

问题描述 投票:1回答:1

我有一个介体,最近需要在后台线程上一次分发消息时同步一个介体,但是该介体处于锁定状态,如下所示。

我将命令发布到队列并从TaskCompletionSource返回任务:

public Task<object> Send(object command, CancellationToken cancellationToken)
{
    var item = new CommandItem() { Command = request, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };            
    this.queue.Writer.WriteAsync(item); // just write and immediatly return the tcs
    return item.Tcs.Task;
}

然后从后台工作程序中拾取它,并创建处理程序:

var item = await this.queue.Reader.ReadAsync(cancellationToken);
// work out command  type snipped
var command = item.Command as LockMeGoodCommand;
var handler = new LockMeGoodCommandHandler();
var result = await handler.Handle(command, item.Ct);
item.Tcs.SetResult(result);

然后处理,当命令处理程序发送到命令处理程序内时(使用后台线程,但在线程内可以),以下内容被锁定:

public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
{
   Console.WriteLine(command.GetType().Name);

   // this would get the result but will lock forever when using background worker bus implementation
   var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken);

   // perform some action based on the result - but we never get here
   Console.WriteLine("otherResult is " + otherResult);

   return 3;
}

**问题和可能的解决方法**

我相信我们可以通过检测后台线程是否正在从其线程内向自身发布(通过命令处理程序,然后通过调用Send()来发布新命令)来避免死锁,如果这样,则不应这样做使用任何线程机制(发布到命令队列或TaskCompletionSource),而应直接直接处理任务。

我已经尝试检测该线程,但是它不起作用,所以我在var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken, true)以上的处理程序中将手动标志isSameThread设置为true,并且我可以确认它可以工作并且避免了死锁

此修复程序有任何警告吗?如何检测后台线程是否正在请求发送命令(线程如何检测自身)以及如何完成下面的代码(来自DispatchOnBackgroundThread.Send()以包括此自调用检测)(因此我可以isSameThread标志)?

似乎似乎涉及更多,因为每次等待都会给出不同的线程ID。

// in thread start we set the thread id of the background thread
this.workerThreadId = System.Threading.Thread.CurrentThread.ManagedThreadId;

public Task<object> Send(object command, CancellationToken cancellationToken, bool isSameThread = false)
{
    Console.WriteLine($"this.workerThreadId: {this.workerThreadId}, Thread.CurrentThread.ManagedThreadId: {Thread.CurrentThread.ManagedThreadId}");

    // below doesnt work gives different numbers so i use flag instead
    // this.workerThreadId == Thread.CurrentThread.ManagedThreadId
    if (isSameThread == true)
    {
        if (command is BoringCommand boringCommand)
        {
            var handler = new BoringCommandHandler();
            return handler.Handle(boringCommand, cancellationToken).ContinueWith(t => (object)t);

        }
        else if (command is LockMeGoodCommand lockMeGoodCommand)
        {
            var handler = new LockMeGoodCommandHandler(this);
            return handler.Handle(lockMeGoodCommand, cancellationToken).ContinueWith(t => (object)t);
        }
        else
            throw new Exception("unknown");
    }
    else
    {
        var item = new CommandItem() { Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };
        this.queue.Writer.WriteAsync(item); // just write and immediatly return the cts
        return item.Tcs.Task;
    }
}

**显示问题的代码**

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace TestDeadlock
{
    class BoringCommand { }
    class LockMeGoodCommand { }    

    class BoringCommandHandler
    {
        public Task<int> Handle(BoringCommand command, CancellationToken cancellationToken)
        {
            Console.WriteLine(command.GetType().Name);         
            return Task.FromResult(1);
        }
    }
    class LockMeGoodCommandHandler
    {
        private readonly DispatchOnBackgroundThread commandBus;

        public LockMeGoodCommandHandler(DispatchOnBackgroundThread commandBus) => this.commandBus = commandBus;

        public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
        {
            Console.WriteLine(command.GetType().Name);

            // this locks forever
            var otherResult = await this.commandBus.Send(new BoringCommand(), cancellationToken);
            Console.WriteLine("otherResult is " + otherResult);
            return 3;
        }
    }

    public class DispatchOnBackgroundThread
    {
        private readonly Channel<CommandItem> queue = Channel.CreateUnbounded<CommandItem>();
        private Task worker = null;

        class CommandItem
        {
            public object Command { get; set; }
            public CancellationToken Ct { get; set; }
            public TaskCompletionSource<object> Tcs { get; set; }
        }

        public Task<object> Send(object command, CancellationToken cancellationToken)
        {
            var item = new CommandItem()
            { Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };            
            this.queue.Writer.WriteAsync(item); // just write and immediatly return the tcs
            return item.Tcs.Task;
        }

        public void Start(CancellationToken cancellationToken)
        {
            this.worker = Task.Factory.StartNew(async () =>
            {
                try
                {                    
                    while (cancellationToken.IsCancellationRequested == false)
                    {
                        var item = await this.queue.Reader.ReadAsync(cancellationToken);

                        // simplified DI container magic to static invocation
                        if (item.Command is BoringCommand boringCommand)
                        {
                            var handler = new BoringCommandHandler();
                            var result = await handler.Handle(boringCommand, item.Ct);
                            item.Tcs.SetResult(result);
                        }
                        if (item.Command is LockMeGoodCommand lockMeGoodCommand)
                        {
                            var handler = new LockMeGoodCommandHandler(this);
                            var result = await handler.Handle(lockMeGoodCommand, item.Ct);
                            item.Tcs.SetResult(result);
                        }
                    }
                }
                catch (TaskCanceledException) { }
            },
            TaskCreationOptions.LongRunning)
            .Unwrap();
        }

        public async Task StopAsync()
        {
            this.queue.Writer.Complete();
            await this.worker;
        }
    }

    class Program
    {
        static async Task Main(string[] args)
        {
            var cts = new CancellationTokenSource();
            var threadStrategy = new DispatchOnBackgroundThread();
            threadStrategy.Start(cts.Token);

            var result1 = await threadStrategy.Send(new BoringCommand(), cts.Token);
            var result2 = await threadStrategy.Send(new LockMeGoodCommand(), cts.Token);

            cts.Cancel();
            await threadStrategy.StopAsync();
        }
    }
}

**简单的非线程介体实现,无需锁定即可**

public class DispatchInCallingThread
{
    public async Task<object> Send(object request, CancellationToken cancellationToken)
    {
        // simplified DI container magic to static invocation
        if (request is BoringCommand boringCommand)
        {
            var handler = new BoringCommandHandler();
            return await handler.Handle(boringCommand, cancellationToken);
        }
        else if (request is LockMeGoodCommand lockMeGoodCommand)
        {
            var handler = new LockMeGoodCommandHandler(this);
            return await handler.Handle(lockMeGoodCommand, cancellationToken);
        }
        else
            throw new Exception("unknown");
    }
}
c# multithreading concurrency task-parallel-library mediator
1个回答
0
投票

死锁的原因很简单:

  • 有一个代码循环(不是特定的线程;请参阅下文),负责处理队列。在处理每个命令时,它会await该命令的处理程序。
  • 有一个命令处理程序,它await是另一个命令待处理。但是,这将不起作用,因为将不再处理其他命令。在此循环完成之前,代码循环不会使下一个命令出队。

换句话说,如果一次只能执行一个命令,则一个命令在逻辑上不可能执行另一个命令。

有几种解决此问题的方法。我not推荐“重入”方法;重入是许多细微的逻辑错误的原因。我推荐的方法是以下方法之一:

  1. 更改Send语义,使其成为“队列”语义。这意味着不可能获得命令结果。结果必须通过某些中介作为消息发送。
  2. 使代码循环不await命令处理程序,使其循环返回并接收下一个命令。这意味着它不再“一次同步”。
  3. 将“一次同步”重新定义为“一次一次,但如果是await则则不算作一个”。在这种情况下,您可能可以使用ConcurrentExclusiveSchedulerPairNito.AsyncEx.AsyncContext之类的方法一次运行一个方法块。

旁注:LongRunning并未执行您认为正在执行的操作。 StartNew is not async-aware,因此StartNew标志仅适用于直到第一个async的代码;之后,该lambda中的代码将在任意线程池线程(未设置LongRunning)上运行。将await替换为LongRunning将使代码更清晰。

© www.soinside.com 2019 - 2024. All rights reserved.