我有一个消费者,一个自定义后台工作人员,使用
BlockingCollection
类似于 this
public class PCQueue : IDisposable
{
BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
public PCQueue () { Task.Factory.StartNew (Consume); }
public void Dispose() { _taskQ.CompleteAdding(); }
public TaskCompletionSource<t> EnqueueTask<T>(Func<T> action)
{
var completion = new TaskCompletionSource<T>();
_taskQ.Add (() => completion.TrySetResult(action()));
return completion; // Perhaps task needs to be returned here
}
void Consume() { foreach (Action action in _taskQ.GetConsumingEnumerable()) action(); }
}
问题在于,如果不等待任务,则阻塞收集不会继续执行队列中的下一个任务。换句话说,“即发即忘”功能不起作用。在下面的代码中,第二个调用被困在队列中,直到等待下一个调用。
public static async Task Main()
{
// This will print "Start"
var originalScheduler = InstanceService<ScheduleService>.Instance;
await originalScheduler.Send(() => Console.WriteLine("Start")).Task;
// If not awaited, this will not print anything
originalScheduler.Send(() =>
Console.WriteLine("Continue"));
// This will print the last two messages, "Continue" and "Complete"
await originalScheduler.Send(() =>
Console.WriteLine("Complete")).Task;
}
问题
现在工作人员允许等待排队的任务或取消工作。如何确保即使没有等待,任务也会以“即发即忘”的方式执行?
对于可能关注的人,更新了实施。
问题似乎是
TaskCreationSource
永远不会完成,因此当执行这些命令时,任务队列将陷入等待第一个任务的状态。
var s = InstanceService<ScheduleService>.Instance;
await s.Send(() => Console.WriteLine("Start")).Task; // 1st task starts but doesn't complete
s.Send(() => Console.WriteLine("Continue"));
// await s.Send(() => Console.WriteLine("Complete")).Task; // This completes the 1st task
解决方案是使用选项
TaskCompletionSource
创建 TaskCreationOptions.RunContinuationsAsynchronously
,因此原来安排工作的方法将更改为此。
TaskCompletionSource<T> EnqueueTask<T>(Func<T> action)
{
var options = TaskCreationOptions.RunContinuationsAsynchronously;
var completion = new TaskCompletionSource<T>(options);
_taskQ.Add (() => completion.TrySetResult(action()));
return completion; // completion.Task.Start() is not needed
}