使用Rx订阅进度事件,并在完成时仍获得结果

问题描述 投票:2回答:2

我必须调用属于外部组件的方法。该方法的签名如下所示:

IImportedData Import(string fileName, Action<int> progress);

此操作可能需要很长时间才能执行,因此我需要异步调用它并向用户报告进度。我正在寻找调用它的不同方法(Rx,TPL,ThreadPool),以找到表达清晰的东西,但是我正在努力想办法在Rx中做到这一点。

乍一看,使用Rx报告进度的想法似乎很合适-它是与进度相关的大量输入数据。唯一要注意的是,当操作完成时,我需要检查IImportedData以向用户显示结果。 OnCompleted并非用于该目的,这使我走了一条路,该路有一个暴露两个IObservable流的类,然后是一个“启动”操作的方法。

private class Importer : IObservable<int>, IObservable<IImportedData>

感觉笨拙,我敢肯定还有一种我不知道的更好的方法。

c# system.reactive
2个回答
3
投票

先想到两件事:

  1. Task<T>IProgress<T>似乎更适合此任务
  2. 实现IObservable<T>通常是不赞成的。建议在Observable上使用静态方法创建复合实例。

[如果您坚持使用Rx,我建议您看一下几年前与Rx家伙在一起的this discussion。 Jeffrey van Gogh最终推荐了Either<TLeft, TRight>响应,您可以根据消息是“ progress”事件还是“ result”事件来自动路由回调。如果我再次处于这个位置,那肯定是我要采取的方向。


0
投票

您可以尝试这样的事情:

Func<string, Action<int>, IObservable<IImportedData>> create =
    (fileName, progress) =>
        Observable.Create<IImportedData>(o =>
        {
            var foo = new Foo();
            var subject = new BehaviorSubject<int>(0);

            var d1 = subject.Subscribe(progress);

            var result = foo.Import(fileName, n => subject.OnNext(n));

            var d2 = subject
                .Where(x => x == 100)
                .Select(x => result)
                .Subscribe(o);

            return new CompositeDisposable(d1, d2);
        });

我还没有测试过,但是类似的东西应该可以为您提供一种相对干净的Rx方式来执行您要执行的操作。您可能需要在同步上下文中添加。

© www.soinside.com 2019 - 2024. All rights reserved.