如何在捕获的 ExecutionContext 上运行异步委托

问题描述 投票:0回答:2

正如 Stephen Toub 在这篇文章中所解释的那样,当您向 ActionBlock 提交消息时,您可以在调用 ActionBlock.Post 之前执行 ExecutionContext.Capture,将包含消息和 ExecutionContext 的 DTO 传递到块中,然后在消息处理委托中使用ExecutionContext.Run 在捕获的上下文上运行委托:

public sealed class ContextFlowProcessor<T> {
    private struct MessageState {
        internal ExecutionContext Context;
        internal T Value;
    }

    private readonly ITargetBlock<MessageState> m_block;

    public ContextFlowProcessor(Action<T> action) {
        m_block = new ActionBlock<MessageState>(ms =>
        {
            if (ms.Context != null)
                using (ms.Context) ExecutionContext.Run(ms.Context, s => action((T)s), ms.Value);
            else 
                action(ms.Value);
        });
    }

    public bool Post(T item) {
        var ec = ExecutionContext.Capture();
        var rv = m_block.Post(new MessageState { Context = ec, Value = item });
        if (!rv) ec.Dispose();
        return rv;
    }

    public void Done() { m_block.DeclinePermanently(); }

    public Task CompletionTask { get { return m_block.CompletionTask; } }

当消息处理程序内部的逻辑是同步的时,这很有效。但是如何在捕获的 ExecutionContext 上运行一段async逻辑呢?我需要这样的东西:

m_block = new ActionBlock<MessageState>(async ms =>
{
      // omitting the null context situation for brevity
      using (ms.Context)
      {
         await ExecutionContext.Run(ms.Context, async _ => { callSomethingAsync(ms.Value) });
      }
});

显然,这不会编译,因为 ExecutionContext.Run 不支持异步委托(而 ActionBlock 支持) - 那么我该怎么做?

async-await tpl-dataflow executioncontext
2个回答
2
投票

如果您可以提供一个独立的示例,以便我们可以尝试重现该问题,我们也许能够提供更好的答案。也就是说,可以使用简单的自定义同步上下文来手动控制

ExecutionContext
(或者更确切地说,它的副本)在
await
延续中的流动。这是一个示例(警告 - 几乎未经测试!):

// using EcFlowingSynchronizationContext:

m_block = new ActionBlock<MessageState>(async ms =>
{
      using (ms.Context)
      using (var sc = new EcFlowingSynchronizationContext(ms.Context))
      {
         await sc.Run(async _ => { await callSomethingAsync(ms.Value); });
      }
});

// EcFlowingSynchronizationContext: flow execution context manually 

public class EcFlowingSynchronizationContext : SynchronizationContext, IDisposable
{
    private readonly ExecutionContext _ec;
    private readonly TaskScheduler _taskScheduler;

    public EcFlowingSynchronizationContext(ExecutionContext sourceEc) 
    {
        TaskScheduler ts = null;
        ExecutionContext ec = null;

        ExecutionContext.Run(sourceEc, _ =>
        {
            var sc = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(this);
            try
            {
                ts = TaskScheduler.FromCurrentSynchronizationContext();
                // this will also capture SynchronizationContext.Current,
                // and it will be flown by subsequent ExecutionContext.Run
                ec = ExecutionContext.Capture();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(sc);
            }
        }, null);

        _ec = ec;
        _taskScheduler = ts;
    }

    private void Execute(SendOrPostCallback d, object state)
    {
        using (var ec = _ec.CreateCopy())
        {
            ExecutionContext.Run(ec, new ContextCallback(d), state);
        }
    }

    public Task Run(Func<Task> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
    }

    public Task<TResult> Run<TResult>(Func<Task<TResult>> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
    }

    public override void Post(SendOrPostCallback d, object state)
    {
        ThreadPool.UnsafeQueueUserWorkItem(s => Execute(d, s), state);
    }

    public override void Send(SendOrPostCallback d, object state)
    {
        Execute(d, state);
    }

    public override SynchronizationContext CreateCopy()
    {
        return this;
    }

    public void Dispose()
    {
        _ec.Dispose();
    }
}

注意,您应该只使用

CallContext.LogicalSetData
(或
AsyncLocal<T>
)存储不可变值。即,如果您需要存储在从被调用者到调用者的异步流期间可能发生更改的内容,并且能够跟踪调用者中的更改,请将其作为类的属性,然后存储该类的实例。确保该类也是线程安全的,因为最终您可以拥有原始执行上下文的许多并发分支。

有关更多详细信息,请参阅 Stephen Cleary 的优秀文章 Implicit Async Context ("AsyncLocal")“Eliding Async and Await”


0
投票

这似乎对我有用。至少原始任务中的 AsyncLocal<> 内容可以按预期在 SomeMethodAsync 中使用:

    public static class ExecutionContextExtensions
    {
        public static async Task RunAsync(this ExecutionContext executionContext, Func<Task> asyncAction)
        {
            Task task = null;
            ExecutionContext.Run(executionContext, _ => task = asyncAction(), null);
            if (task == null)
                return;
            await task;
        }

        public static async Task<T> RunAsync<T>(this ExecutionContext executionContext, Func<Task<T>> asyncFunc)
        {
            Task<T> task = null;
            ExecutionContext.Run(executionContext, _ => task = asyncFunc(), null);
            if (task == null)
                return default;
            return await task;
        }
    }

用途:

 var ec = ExecutionContext.Capture();
 var result = await ec.RunAsync(() => SomeMethodAsync());

这种方法有什么问题吗?

© www.soinside.com 2019 - 2024. All rights reserved.