我的工作环境是JDK 1.6和RxJava 2。
我想做一个Observable,它发出的项目是一个通过BufferedReader读取的文件行字符串,如下所示。
...
Observable<String> fileLineObservable = Observable.defer(new Callable<String>(){
return new ObservableSource<String> call() throws Exception {
return new ObservableSource<String>() {
public void subscribe(Observer<String> observer) {
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(filePath));
String line = null;
while ((line = reader.readLine()) != null) {
observer.onNext(line);
}
observer.onComplete();
... catching exception and close reader
}
}
}
}
});
我还想做一个Observer来观察上面的Observable 用一个take(count)操作符,如下所示。
fileLineObservable.take(2)
.subscribe(new Consumer<String>() {
public void onNext(String line) {
... do something with the file line string
}
});
当执行上述代码时,我遇到了NullPointerException,我知道为什么。NPE的原因是,第二次调用onNext导致在TakeObserver实例上执行onComplete,而在onComplete方法中,调用了没有设置(null)的upstream.dispose。TakeObserver的上游变量在订阅Observable时,应该用onSubscribe(Disposable disposable)来设置。
如何解决这个问题?我是否应该实现自己的Disposable类来设置TakeObserver的上游变量?
这个解决方案怎么样?
Observable<String> observableFile2(Path path) {
return Observable.using(
() -> Files.newBufferedReader(path),
reader -> {
return Observable.fromIterable(() -> {
return new Iterator<>() {
private String nextLine = null;
@Override
public boolean hasNext() {
try {
nextLine = reader.readLine();
return nextLine != null;
} catch (Exception ex) {
return false;
}
}
@Override
public String next() {
if (nextLine != null) {
return nextLine;
}
throw new IllegalStateException("nextLine can not be null.");
}
};
});
},
BufferedReader::close
);
}
测试
testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.2")
testRuntimeOnly("org.junit.platform:junit-platform-launcher:1.6.2")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.6.2")
testRuntimeOnly("org.junit.vintage:junit-vintage-engine:5.6.2")
testImplementation("com.google.jimfs:jimfs:1.1")
测试
@Test
void name() {
observableFile2(hello).take(2)
.test()
.assertValues("line0", "line1")
.assertComplete();
}
@Test
void name2() {
observableFile2(hello).take(10)
.test()
.assertValues("line0", "line1", "line2", "line3")
.assertComplete();
}
@Test
void name3() {
observableFile2(hello2)
.test()
.assertComplete();
}