如何使用rxjava和改型正确关闭可流动和靠近的响应主体

问题描述 投票:0回答:1

我正在尝试使用Retrofit和rxjava关闭来自http请求的流,这是因为它超时,或者因为我需要更改进入请求的详细信息。两者似乎都工作正常,因为当我取消订阅时,我收到doOnCancel调试消息,而在doOnNext完成时,我收到doOnTerminate消息。我也没有收到来自多个线程的inputLines。但是,上述任何一种操作每发生一次,我的线程数就会增加。似乎responsebody.close没有释放它们的资源,因此线程没有死(我也收到了类似的错误消息,如“ OKHTTP泄漏。您关闭了responseBody吗?”]

有人有什么建议吗?

 public boolean closeSubscription() {
    flowableAlive = false;
    subscription.cancel();
    return true;
}
public void subscribeToFlowable() {
    streamFlowable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.newThread())
            .doOnTerminate(() -> log.debug("TERMINATED")).doOnCancel(() -> log.debug("FLOWABLE CANCELED"))
            .subscribe(new Subscriber<ResponseBody>() {

                @Override
                public void onSubscribe(Subscription s) {
                    subscription = s;
                    subscription.request(Long.MAX_VALUE);
                }


                @Override
                public void onNext(ResponseBody responseBody) {
                    log.debug("onNext called");
                    String inputLine;
                    try (InputStream inputStream = responseBody.byteStream()) {
                        BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
                        while (flowableAlive && ((inputLine = br.readLine()) != null)) {
                            log.debug("stream receive input line for thread " + name);
                            log.debug(inputLine);
                        }
                    } catch (IOException e) {
                        log.debug("error occurred");
                        log.debug(e.getMessage());
                    }
                }

                @Override
                public void onError(Throwable t) {
                    log.debug("error");
                    flowableAlive = false;
                }

                @Override
                public void onComplete() {
                    log.debug("completed");
                    closeSubscription();
                    flowableAlive = false;
                }
            });
}
retrofit2 rx-java2
1个回答
0
投票

subscribe()的结果是Disposable对象。您应该将其存储为文件,并稍后在其上调用Disposable.dispose(),如下所示:

https://proandroiddev.com/disposing-on-android-the-right-way-97bd55cbf970

您的OkHttp调用将被正确中断,因为dispose()会中断该调用在其上运行的线程,并且OkHttp会定期检查线程是否被中断以在发生这种情况时停止传输-这称为协作取消/中断。

© www.soinside.com 2019 - 2024. All rights reserved.