RxJava Android:观察者不兼容类型错误

问题描述 投票:0回答:1

我想使用RxJava和RxAndroid进行以下异步网络调用。我在项目依赖项中包含了最新的RxJava(3.0.1)和RxAndroid(3.0.0),但在Subscription subscription = ....块中仍然出现以下错误:

不兼容的类型

必需:org.reactivestreams.Subscription

发现:无效

为什么?谁能告诉我如何使RxJava可观察的工作?

感谢您的帮助,谢谢。

Subscription subscription = getServerResponse(et_id.getText().toString())// error highlighted in this whole block
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(new Observer<String>() { 
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }

            @Override
            public void onNext(String bitmap) {
               // Handle result of network request
            }

            @Override
            public void onError(@NonNull Throwable e) {
               // Update user interface to handle error
            }

            @Override
            public void onComplete() {
               // Update user interface if needed
            }
       });

public Observable<String> getServerResponse(String string) {
    return Observable.fromCallable(new Callable<String>() {
        @Override
        public String call() throws Exception {
            BufferedReader inFromServer = null;
            String response = "";
            Socket clientSocket = null;
            try {
                clientSocket = new Socket(serverHostname, serverPort);
                DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream());
                inFromServer = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                outToServer.writeBytes(string + "\n");
                response = inFromServer.readLine();
                clientSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
                response = MainActivity.this.getString(R.string.result_serverError);
            }
            return response;
        }
    });
}

我对反应性Java的所有导入:

import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
java android observable rx-java rx-android
1个回答
0
投票

尝试使用一次性复合材料。就像这样:

private CompositeDisposable compositeDisposable = CompositeDisposable();
compositeDisposable.add(getServerResponse(et_id.getText().toString())
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(new Observer<String>() { 
        @Override
        public void onSubscribe(@NonNull Disposable d) {
        }

        @Override
        public void onNext(String bitmap) {
           // Handle result of network request
        }

        @Override
        public void onError(@NonNull Throwable e) {
           // Update user interface to handle error
        }

        @Override
        public void onComplete() {
           // Update user interface if needed
        }
   }));

请不要忘记对活动的onDestroy方法执行CompositeDisposable.dispose()。

让我知道它是否对您有用!

© www.soinside.com 2019 - 2024. All rights reserved.