我有以下代码
string dataDirectory = _settingsProvider.DataSettings.BaseDirectory;
_solverManagementService.MergedPointCloudProducer(dataDirectory, cancellationToken)
.Subscribe(PointCloudMergerCompleted);
SolverManagementService _solverManagementService
在哪里
Public class SolverManagementService : ISolverManagementService
{
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory, CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(pairCollection =>
{
observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token));
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
... // Other methods.
}
但是这里_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token)
很昂贵,尽管它返回Task<IPointCloud>
,但我没有对此进行线程化,并且此调用块。由于RecursivelyMergeAsync
返回了Task<IPointCloud>
,因此可以等待,因此我修改了代码以使用async/await
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory, CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(async (pairCollection) =>
{
observer.OnNext(await _icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token));
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
但现在它立即返回,并且控制台应用程序关闭。我确信无需Semephores
即可完成此操作,但是我是RX的新手。如何配置RecursivelyMergeAsync
以针对每个返回的pairCollection
同时运行,而不会在所有递归合并完成后阻塞并获得通知?
注意。在单元测试中,我执行以下操作
public class IcpBatchSolverServiceTests
{
private Mock<ISettingsProvider> _mockSettingsProvider;
private IIcpBatchSolverService _icpBatchSolverService;
[OneTimeSetUp]
public void Setup()
{
_mockSettingsProvider = new Mock<ISettingsProvider>();
_mockSettingsProvider.Setup(m => m.IcpSolverSettings).Returns(new IcpSolverSettings());
_mockSettingsProvider.Object.IcpSolverSettings.MaximumDegreeOfParallelism = 6;
Log.Logger = new LoggerConfiguration()
.WriteTo.Console()
.CreateLogger();
var serviceProvider = new ServiceCollection()
.AddLogging(builder =>
{
builder.SetMinimumLevel(LogLevel.Trace);
builder.AddSerilog(Log.Logger);
})
.BuildServiceProvider();
ILogger<IcpBatchSolverServiceTests> logger = serviceProvider
.GetService<ILoggerFactory>()
.CreateLogger<IcpBatchSolverServiceTests>();
_icpBatchSolverService = new IcpBatchSolverService(_mockSettingsProvider.Object, logger);
}
[Test]
public async Task CanSolveBatchAsync()
{
IPointCloud @static = PointCloudFactory.GetRandomPointCloud(1000);
List<IPointCloud> pointCloudList = PointCloudFactory.GenerateRandomlyRotatedBatch(@static, 12);
IPartitioningService<IPointCloud> ps = new PointCloudPartitioningService();
IPointCloud result = await _icpBatchSolverService.RecursivelyMergeAsync(ps.Partition(pointCloudList), CancellationToken.None);
Assert.AreEqual(@static.Vertices.Length, result.Vertices.Length);
}
}
而且此过程可以完美地同时进行。
我将留下一个通用的答案,因为上面的代码过于广泛,无法精简。
有两种语法可用于定义异步行为。第一个是async/await
模式,第二个是较旧的,是Subscribe()
模式(反应性)。
异步与并发相同吗?
不,绝对不是。对于可能正在阅读此书而又不知道的人,异步意味着“它会稍后发生”,而不是“它会同时发生”。通过使用这两种语法,您可以定义在满足某些谓词后立即发生的行为。一个非常常见的用例是处理从Web服务器返回的响应。您需要发出请求,然后在响应返回时执行一些操作。
并发性不同。例如,您可以使用Task.Run()
或Parallel.ForEach()
来调用并发。在这两种情况下,您都将定义一个fork。如果是Task.Run
,则稍后可以再执行Task.WaitAll
。对于Parallel.ForEach
,它将为您执行分叉/联接。当然,反应式有它自己的一组fork / join操作。
我等待或订阅时会发生什么?
以下两行代码都具有相同的行为,并且这种行为使很多程序员感到困惑:
var result = await myAsync();
myObservable.Subscribe(result => { ... });
在两种情况下,程序的控制流都以可预测的方式运行,但可能会造成混淆。在第一种情况下,在等待await
时,控制流返回到父调用方。在第二步中,控制流移至代码的下一行,并在返回结果时调用lambda表达式。
我在学习如何使用它们的人们中看到的常见现象是,尝试将lambda中的变量分配给父作用域中的地址。这是行不通的,因为在执行lambda之前,该作用域将不复存在。使用async/await
进行愚蠢的操作的可能性较小,但是您还必须记住,控制流将一直进入调用堆栈,直到定义了下一个同步操作为止。 This article explains it in a little more depth和this article有点容易理解。