强制某些代码总是在同一线程上运行。

问题描述 投票:1回答:3

我们有一个旧的第三方系统(我们称它为Junksoft®95),我们通过PowerShell与之接口(它暴露了一个COM对象),我正在将它包装在一个REST API中(ASP.NET Framework 4.8和WebAPI 2)。我使用的是 System.Management.Automation nuget包来创建一个 PowerShell 其中,我将Junksoft的COM API实例化为一个 dynamic 对象,然后我再使用该对象。

//I'm omitting some exception handling and maintenance code for brevity
powerShell = System.Management.Automation.PowerShell.Create();
powerShell.AddScript("Add-Type -Path C:\Path\To\Junksoft\Scripting.dll");
powerShell.AddScript("New-Object Com.Junksoft.Scripting.ScriptingObject");
dynamic junksoftAPI = powerShell.Invoke()[0];

//Now we issue commands to junksoftAPI like this:
junksoftAPI.Login(user,pass);
int age = junksoftAPI.GetAgeByCustomerId(custId);
List<string> names = junksoftAPI.GetNames();

当我在同一个线程上运行这些东西的时候(比如在一个控制台应用程序中),这一切都很好。然而,由于某些原因,这个 平时 不工作,当我把 junksoftAPI 变成 System.Web.Caching.Cache 并在我的web应用中从不同的控制器中使用它。我说 平时 因为当ASP.NET恰好把传入的调用给了线程时,这实际上是可行的。junksoftAPI 上创建的。如果没有,Junkoft 95会给我一个错误。

有什么方法可以让我确保所有与 junksoftAPI 碰上 一样 线?

我不想把整个web应用程序变成一个单线程应用程序!控制器和其他地方的逻辑应该在不同的线程上正常发生。控制器和其他地方的逻辑应该像正常的一样在不同的线程上发生。应该是只在Junksoft特定的线程上发生Junksoft的交互,就像这样。

[HttpGet]
public IHttpActionResult GetAge(...)
{
    //finding customer ID in database...

    ...

    int custAge = await Task.Run(() => {
        //this should happen on the Junksoft-specific thread and not the next available thread
        var cache = new System.Web.Caching.Cache();
        var junksoftAPI = cache.Get(...); //This has previously been added to cache on the Junksoft-specific thread
        return junksoftAPI.GetAgeByCustomerId(custId);
    });

    //prepare a response using custAge...
}
c# asp.net multithreading powershell asp.net-web-api2
3个回答
2
投票

你可以创建你自己的单人工作线程来实现这一点。这里是代码,你可以把它插入到你的Web应用程序中。

public class JunkSoftRunner
{
    private static JunkSoftRunner _instance;

    //singleton pattern to restrict all the actions to be executed on a single thread only.
    public static JunkSoftRunner Instance => _instance ?? (_instance = new JunkSoftRunner());

    private readonly SemaphoreSlim _semaphore;
    private readonly AutoResetEvent _newTaskRunSignal;

    private TaskCompletionSource<object> _taskCompletionSource;
    private Func<object> _func;

    private JunkSoftRunner()
    {
        _semaphore = new SemaphoreSlim(1, 1);
        _newTaskRunSignal = new AutoResetEvent(false);
        var contextThread = new Thread(ThreadLooper)
        {
            Priority = ThreadPriority.Highest
        };
        contextThread.Start();
    }

    private void ThreadLooper()
    {
        while (true)
        {
            //wait till the next task signal is received.
            _newTaskRunSignal.WaitOne();

            //next task execution signal is received.
            try
            {
                //try execute the task and get the result
                var result = _func.Invoke();

                //task executed successfully, set the result
                _taskCompletionSource.SetResult(result);
            }
            catch (Exception ex)
            {
                //task execution threw an exception, set the exception and continue with the looper
                _taskCompletionSource.SetException(ex);
            }

        }
    }

    public async Task<TResult> Run<TResult>(Func<TResult> func, CancellationToken cancellationToken = default(CancellationToken))
    {
        //allows only one thread to run at a time.
        await _semaphore.WaitAsync(cancellationToken);

        //thread has acquired the semaphore and entered
        try
        {
            //create new task completion source to wait for func to get executed on the context thread
            _taskCompletionSource = new TaskCompletionSource<object>();

            //set the function to be executed by the context thread
            _func = () => func();

            //signal the waiting context thread that it is time to execute the task
            _newTaskRunSignal.Set();

            //wait and return the result till the task execution is finished on the context/looper thread.
            return (TResult)await _taskCompletionSource.Task;
        }
        finally
        {
            //release the semaphore to allow other threads to acquire it.
            _semaphore.Release();
        }
    }
}

用于测试的Console主方法。

public class Program
{
    //testing the junk soft runner
    public static void Main()
    {
        //get the singleton instance
        var softRunner = JunkSoftRunner.Instance;

        //simulate web request on different threads
        for (var i = 0; i < 10; i++)
        {
            var taskIndex = i;
            //launch a web request on a new thread.
            Task.Run(async () =>
            {
                Console.WriteLine($"Task{taskIndex} (ThreadID:'{Thread.CurrentThread.ManagedThreadId})' Launched");
                return await softRunner.Run(() =>
                {
                    Console.WriteLine($"->Task{taskIndex} Completed On '{Thread.CurrentThread.ManagedThreadId}' thread.");
                    return taskIndex;
                });
            });
        }
    }   
}

输出。

enter image description here

请注意,虽然函数从不同的线程中启动,但一些代码总是在同一个ID为'5'的上下文线程中执行。

但要注意的是,虽然所有的web请求都在独立的线程上执行,但它们最终会等待一些任务在单人工作线程上被执行。这最终会在你的web应用中形成一个瓶颈。反正这是你的设计限制。


2
投票

下面是你如何从一个专门的STA线程中向Junkoft API发出命令,使用一个 BlockingCollection 类。

public class JunksoftSTA : IDisposable
{
    private readonly BlockingCollection<Action<Lazy<dynamic>>> _pump;
    private readonly Thread _thread;

    public JunksoftSTA()
    {
        _pump = new BlockingCollection<Action<Lazy<dynamic>>>();
        _thread = new Thread(() =>
        {
            var lazyApi = new Lazy<dynamic>(() =>
            {
                var powerShell = System.Management.Automation.PowerShell.Create();
                powerShell.AddScript("Add-Type -Path C:\Path\To\Junksoft.dll");
                powerShell.AddScript("New-Object Com.Junksoft.ScriptingObject");
                dynamic junksoftAPI = powerShell.Invoke()[0];
                return junksoftAPI;
            });
            foreach (var action in _pump.GetConsumingEnumerable())
            {
                action(lazyApi);
            }
        });
        _thread.SetApartmentState(ApartmentState.STA);
        _thread.IsBackground = true;
        _thread.Start();
    }

    public Task<T> CallAsync<T>(Func<dynamic, T> function)
    {
        var tcs = new TaskCompletionSource<T>(
            TaskCreationOptions.RunContinuationsAsynchronously);
        _pump.Add(lazyApi =>
        {
            try
            {
                var result = function(lazyApi.Value);
                tcs.SetResult(result);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        });
        return tcs.Task;
    }

    public Task CallAsync(Action<dynamic> action)
    {
        return CallAsync<object>(api => { action(api); return null; });
    }

    public void Dispose() => _pump.CompleteAdding();

    public void Join() => _thread.Join();
}

使用的目的 Lazy 类用于在动态对象的构造过程中浮现一个可能的异常,并将其传播给调用者。

...异常会被缓存。也就是说,如果工厂方法在线程第一次试图访问 Value 的财产 Lazy<T> 对象,以后每次尝试都会抛出相同的异常。

使用示例。

// A static field stored somewhere
public static readonly JunksoftSTA JunksoftStatic = new JunksoftSTA();

await JunksoftStatic.CallAsync(api => { api.Login("x", "y"); });
int age = await JunksoftStatic.CallAsync(api => api.GetAgeByCustomerId(custId));

如果你发现一个STA线程不足以及时处理所有的请求,你可以添加更多的STA线程,所有线程都运行相同的代码(private readonly Thread[] _threads; 等)。) 的 BlockingCollection 类是线程安全的,可以被任何数量的线程同时使用。


-1
投票

如果你没有说那是一个第三方工具,我会认为它是一个GUI类。出于实际原因,让多个线程对其进行写入是一个非常糟糕的想法。.NET执行了严格的 "只有创建线程才可以写 "的规则,从 2.0起.

一般来说,WebServers,特别是ASP.Net使用一个相当大的线程池。我们说的是每个核心有10到100个线程。这意味着很难将任何请求锁定在一个特定的线程上。你最好不要尝试。

同样,看看GUI类可能是你最好的选择。基本上,你可以制作一个单线程,其唯一目的就是模仿GUI的事件队列。一般Windows Forms应用程序的MainUI线程,负责创建每个GUI类的实例。它通过轮询处理事件队列来维持生命。只有当它通过事件队列接收到取消命令时,它才会结束。Dispatching只是将命令放入该队列中,因此我们可以避免Cross-Threading问题。

© www.soinside.com 2019 - 2024. All rights reserved.