我正在尝试使用Retrofit和rxjava关闭来自http请求的流,这是因为它超时,或者因为我需要更改进入请求的详细信息。两者似乎都工作正常,因为当我取消订阅时,我收到doOnCancel调试消息,而在doOnNext完成时,我收到doOnTerminate消息。我也没有收到来自多个线程的inputLines。但是,上述任何一种操作每发生一次,我的线程数就会增加。似乎responsebody.close没有释放它们的资源,因此线程没有死(我也收到了类似的错误消息,如“ OKHTTP泄漏。您关闭了responseBody吗?”]
有人有什么建议吗?
public boolean closeSubscription() {
flowableAlive = false;
subscription.cancel();
return true;
}
public void subscribeToFlowable() {
streamFlowable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.newThread())
.doOnTerminate(() -> log.debug("TERMINATED")).doOnCancel(() -> log.debug("FLOWABLE CANCELED"))
.subscribe(new Subscriber<ResponseBody>() {
@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(ResponseBody responseBody) {
log.debug("onNext called");
String inputLine;
try (InputStream inputStream = responseBody.byteStream()) {
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
while (flowableAlive && ((inputLine = br.readLine()) != null)) {
log.debug("stream receive input line for thread " + name);
log.debug(inputLine);
}
} catch (IOException e) {
log.debug("error occurred");
log.debug(e.getMessage());
}
}
@Override
public void onError(Throwable t) {
log.debug("error");
flowableAlive = false;
}
@Override
public void onComplete() {
log.debug("completed");
closeSubscription();
flowableAlive = false;
}
});
}
subscribe()的结果是Disposable对象。您应该将其存储为文件,并稍后在其上调用Disposable.dispose(),如下所示:
https://proandroiddev.com/disposing-on-android-the-right-way-97bd55cbf970
您的OkHttp调用将被正确中断,因为dispose()会中断该调用在其上运行的线程,并且OkHttp会定期检查线程是否被中断以在发生这种情况时停止传输-这称为协作取消/中断。