如何订阅和取消订阅或取消rxjava observable

问题描述 投票:1回答:1

我是RxJava的新手,并尝试将我的asyncTask更新为RxJava。作为第一次尝试,我做了以下代码:

    public class MainActivity extends AppCompatActivity 
    {
        @Override
        protected void onCreate(Bundle savedInstanceState) 
        {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);

            doSomeWork();
        }


        private String funcCallServerGet()
        {
            //Some code to call a HttpClient Get method & return a response string
            //this is the method which previously i used to call inside asynctask doInbackground method
        }


          private void doSomeWork() {
               getSingleObservable()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(getSingleObserver())    ;
        }


        private Single<String> getSingleObservable() 
        {
            return Single.create(new SingleOnSubscribe<String>() {
                @Override
                public void subscribe(SingleEmitter<String> emitter) throws Exception {
                    if(!emitter.isDisposed()) {
                        String strRxResponse =  funcCallServerGet();
                        emitter.onSuccess(strRxResponse);
                    }
                }
            });
        }


         private SingleObserver<String> getSingleObserver() 
        {

            return new SingleObserver<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, " onSubscribe getSingleObserver: " + d.isDisposed());             }

                @Override
                public void onSuccess(String value) {
                    Log.d(TAG, " onNext : value : " + value);          }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, " onError : " + e.getMessage());         }
            };
        }

    }

但我有一些困惑:

  1. 为什么我在SingleObserver getSingleObserver()的onSubscribe()中得到错误。
  2. 当调用onStop()活动时,如何取消订阅或取消observable / observer。
  3. 此外,屏幕方向时真正发生了什么。是否可以自动取消订阅或继续其工作?怎么做设备轮换?
android rx-java rx-java2
1个回答
1
投票

为什么我在SingleObserver getSingleObserver()的onSubscribe()中得到错误。

您当前正在记录是否将一次性物品放置在onSubscribe方法中。此时,一次性物品还没有被丢弃。

当调用onStop()活动时,如何取消订阅或取消observable / observer。

您可以使用返回一次性的subscribe方法,而不是使用SingleObserver。有了这个,你可以直接管理一次性或使用CompositeDisposable。然后你可以在那个一次性上调用dispose方法,使用CompositeDisposable这可以通过调用clear()来实现

private final CompositeDisposable disposables = new CompositeDisposable();

@Override
protected void onStart() {
    super.onStart();
    disposables.add(getSingleObservable()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(value -> {
                Log.d(TAG, " onSuccess: " + value);
            }, error -> {
                Log.e(TAG, " onError", error);
            }
        )
    );
}

@Override
protected void onStop() {
    disposables.clear();
    super.onStop();
}

此外,屏幕方向时真正发生了什么。是否可以自动取消订阅或继续其工作?怎么做设备轮换?

默认情况下,不会对可观察对象进行自动管理,您有责任对其进行管理。在您的示例代码中,当设备旋转时,您将收到另一个onCreate调用,此处您正在安排再次执行的工作,在轮换可能仍在运行之前安排的工作,因此您可能最终泄漏旧活动并接收工作成功或失败时的回调 - 在这种情况下,您会看到一个日志语句。

虽然您应该阅读作者关于此方法存在的一些问题的文章,但有些工具可以提供自动可观察的管理。

另一个选择可能是查看新的Architecture Components库,特别是ViewModel和LiveData。这将简化您在订阅管理和配置更改方面需要执行的操作。

© www.soinside.com 2019 - 2024. All rights reserved.