并发IObservable订阅

问题描述 投票:0回答:1

我有以下代码

string dataDirectory = _settingsProvider.DataSettings.BaseDirectory;
_solverManagementService.MergedPointCloudProducer(dataDirectory, cancellationToken)
    .Subscribe(PointCloudMergerCompleted);

SolverManagementService _solverManagementService在哪里

Public class SolverManagementService : ISolverManagementService
{
    public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory, CancellationToken token)
    {
        return Observable.Create<IPointCloud>(
            observer =>
            {
                PairCollectionProducer(dataDirectory, token)
                    .Subscribe(pairCollection =>
                    {
                        observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token));
                    },
                    onCompleted: () =>
                    {
                        observer.OnCompleted();
                    });
                return () => { };
            });
    }
    ... // Other methods. 
}

但是这里_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token)很昂贵,尽管它返回Task<IPointCloud>,但我没有对此进行线程化,并且此调用块。由于RecursivelyMergeAsync返回了Task<IPointCloud>,因此可以等待,因此我修改了代码以使用async/await

public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory, CancellationToken token)
{
    return Observable.Create<IPointCloud>(
        observer =>
        {
            PairCollectionProducer(dataDirectory, token)
                .Subscribe(async (pairCollection) =>
                {
                    observer.OnNext(await _icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token));
                },
                onCompleted: () =>
                {
                    observer.OnCompleted();
                });
            return () => { };
        });
}

但现在它立即返回,并且控制台应用程序关闭。我确信无需Semephores即可完成此操作,但是我是RX的新手。如何配置RecursivelyMergeAsync以针对每个返回的pairCollection同时运行,而不会在所有递归合并完成后阻塞并获得通知?

注意。在单元测试中,我执行以下操作

public class IcpBatchSolverServiceTests
{
    private Mock<ISettingsProvider> _mockSettingsProvider; 
    private IIcpBatchSolverService _icpBatchSolverService;

    [OneTimeSetUp]
    public void Setup()
    {
        _mockSettingsProvider = new Mock<ISettingsProvider>();

        _mockSettingsProvider.Setup(m => m.IcpSolverSettings).Returns(new IcpSolverSettings());
        _mockSettingsProvider.Object.IcpSolverSettings.MaximumDegreeOfParallelism = 6;

        Log.Logger = new LoggerConfiguration()
            .WriteTo.Console()
            .CreateLogger();

        var serviceProvider = new ServiceCollection()
            .AddLogging(builder =>
            {
                builder.SetMinimumLevel(LogLevel.Trace);
                builder.AddSerilog(Log.Logger);
            })
            .BuildServiceProvider();

        ILogger<IcpBatchSolverServiceTests> logger = serviceProvider
            .GetService<ILoggerFactory>()
            .CreateLogger<IcpBatchSolverServiceTests>();

        _icpBatchSolverService = new IcpBatchSolverService(_mockSettingsProvider.Object, logger);
    }

    [Test]
    public async Task CanSolveBatchAsync()
    {
        IPointCloud @static = PointCloudFactory.GetRandomPointCloud(1000);
        List<IPointCloud> pointCloudList = PointCloudFactory.GenerateRandomlyRotatedBatch(@static, 12);

        IPartitioningService<IPointCloud> ps = new PointCloudPartitioningService();
        IPointCloud result = await _icpBatchSolverService.RecursivelyMergeAsync(ps.Partition(pointCloudList), CancellationToken.None);

        Assert.AreEqual(@static.Vertices.Length, result.Vertices.Length);
    }
}

而且此过程可以完美地同时进行。

c# observable system.reactive
1个回答
0
投票

我将留下一个通用的答案,因为上面的代码过于广泛,无法精简。

有两种语法可用于定义异步行为。第一个是async/await模式,第二个是较旧的,是Subscribe()模式(反应性)。

异步与并发相同吗?

不,绝对不是。对于可能正在阅读此书而又不知道的人,异步意味着“它会稍后发生”,而不是“它会同时发生”。通过使用这两种语法,您可以定义在满足某些谓词后立即发生的行为。一个非常常见的用例是处理从Web服务器返回的响应。您需要发出请求,然后在响应返回时执行一些操作。

并发性不同。例如,您可以使用Task.Run()Parallel.ForEach()来调用并发。在这两种情况下,您都将定义一个fork。如果是Task.Run,则稍后可以再执行Task.WaitAll。对于Parallel.ForEach,它将为您执行分叉/联接。当然,反应式有它自己的一组fork / join操作。

我等待或订阅时会发生什么?

以下两行代码都具有相同的行为,并且这种行为使很多程序员感到困惑:

var result = await myAsync();

myObservable.Subscribe(result => { ... });

在两种情况下,程序的控制流都以可预测的方式运行,但可能会造成混淆。在第一种情况下,在等待await时,控制流返回到父调用方。在第二步中,控制流移至代码的下一行,并在返回结果时调用lambda表达式。

我在学习如何使用它们的人们中看到的常见现象是,尝试将lambda中的变量分配给父作用域中的地址。这是行不通的,因为在执行lambda之前,该作用域将不复存在。使用async/await进行愚蠢的操作的可能性较小,但是您还必须记住,控制流将一直进入调用堆栈,直到定义了下一个同步操作为止。 This article explains it in a little more depththis article有点容易理解。

© www.soinside.com 2019 - 2024. All rights reserved.