后台工作线程中的阻塞收集不会运行未等待的任务

问题描述 投票:0回答:1

我有一个消费者,一个自定义后台工作人员,使用

BlockingCollection
类似于 this

public class PCQueue : IDisposable
{
  BlockingCollection<Action> _taskQ = new BlockingCollection<Action>(); 

  public PCQueue () { Task.Factory.StartNew (Consume); }
  public void Dispose() { _taskQ.CompleteAdding(); }
  public TaskCompletionSource<t> EnqueueTask<T>(Func<T> action) 
  {
     var completion = new TaskCompletionSource<T>();
     _taskQ.Add (() => completion.TrySetResult(action()));
     return completion; // Perhaps task needs to be returned here
  }
  void Consume() { foreach (Action action in _taskQ.GetConsumingEnumerable()) action(); }
}

问题在于,如果不等待任务,则阻塞收集不会继续执行队列中的下一个任务。换句话说,“即发即忘”功能不起作用。在下面的代码中,第二个调用被困在队列中,直到等待下一个调用。

public static async Task Main()
{
  // This will print "Start"
  var originalScheduler = InstanceService<ScheduleService>.Instance;
  await originalScheduler.Send(() => Console.WriteLine("Start")).Task;

  // If not awaited, this will not print anything
  originalScheduler.Send(() => 
      Console.WriteLine("Continue"));

  // This will print the last two messages, "Continue" and "Complete"
  await originalScheduler.Send(() => 
      Console.WriteLine("Complete")).Task;
}

问题

现在工作人员允许等待排队的任务或取消工作。如何确保即使没有等待,任务也会以“即发即忘”的方式执行?

c# multithreading async-await task-parallel-library background-process
1个回答
0
投票

对于可能关注的人,更新了实施

问题似乎是

TaskCreationSource
永远不会完成,因此当执行这些命令时,任务队列将陷入等待第一个任务的状态。

var s = InstanceService<ScheduleService>.Instance;
await s.Send(() => Console.WriteLine("Start")).Task; // 1st task starts but doesn't complete
s.Send(() => Console.WriteLine("Continue"));
// await s.Send(() => Console.WriteLine("Complete")).Task; // This completes the 1st task

解决方案是使用选项

TaskCompletionSource
创建
TaskCreationOptions.RunContinuationsAsynchronously
,因此原来安排工作的方法将更改为此。

TaskCompletionSource<T> EnqueueTask<T>(Func<T> action) 
{
   var options = TaskCreationOptions.RunContinuationsAsynchronously;
   var completion = new TaskCompletionSource<T>(options);
   _taskQ.Add (() => completion.TrySetResult(action()));
   return completion; // completion.Task.Start() is not needed 
}
© www.soinside.com 2019 - 2024. All rights reserved.