RxJava 用Observable和take操作符读取文件的问题。

问题描述 投票:0回答:1

我的工作环境是JDK 1.6和RxJava 2。

我想做一个Observable,它发出的项目是一个通过BufferedReader读取的文件行字符串,如下所示。

...
Observable<String> fileLineObservable = Observable.defer(new Callable<String>(){
    return new ObservableSource<String> call() throws Exception {
        return new ObservableSource<String>() {
            public void subscribe(Observer<String> observer) {
                BufferedReader reader = null;
                try {
                    reader = new BufferedReader(new FileReader(filePath));
                    String line = null;
                    while ((line = reader.readLine()) != null) {
                        observer.onNext(line);
                    }
                    observer.onComplete();

                    ... catching exception and close reader
                }
            }
        }
    }
});

我还想做一个Observer来观察上面的Observable 用一个take(count)操作符,如下所示。

fileLineObservable.take(2)
                  .subscribe(new Consumer<String>() {
                       public void onNext(String line) {
                           ... do something with the file line string
                       }
                  });

当执行上述代码时,我遇到了NullPointerException,我知道为什么。NPE的原因是,第二次调用onNext导致在TakeObserver实例上执行onComplete,而在onComplete方法中,调用了没有设置(null)的upstream.dispose。TakeObserver的上游变量在订阅Observable时,应该用onSubscribe(Disposable disposable)来设置。

如何解决这个问题?我是否应该实现自己的Disposable类来设置TakeObserver的上游变量?

rx-java2
1个回答
0
投票

这个解决方案怎么样?

Observable<String> observableFile2(Path path) {
        return Observable.using(
                () -> Files.newBufferedReader(path),
                reader -> {
                    return Observable.fromIterable(() -> {
                        return new Iterator<>() {
                            private String nextLine = null;

                            @Override
                            public boolean hasNext() {
                                try {
                                    nextLine = reader.readLine();
                                    return nextLine != null;
                                } catch (Exception ex) {
                                    return false;
                                }
                            }

                            @Override
                            public String next() {
                                if (nextLine != null) {
                                    return nextLine;
                                }
                                throw new IllegalStateException("nextLine can not be null.");
                            }
                        };
                    });
                },
                BufferedReader::close
        );
    }
  • 使用Observable确保BufferedReader在onError时被正确关闭。
  • Observable#fromIterable为我们封装了readLine调用并处理onComplete。

测试

testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.2")
testRuntimeOnly("org.junit.platform:junit-platform-launcher:1.6.2")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.6.2")
testRuntimeOnly("org.junit.vintage:junit-vintage-engine:5.6.2")
testImplementation("com.google.jimfs:jimfs:1.1")

测试

@Test
void name() {
    observableFile2(hello).take(2)
            .test()
            .assertValues("line0", "line1")
            .assertComplete();
}

@Test
void name2() {
    observableFile2(hello).take(10)
            .test()
            .assertValues("line0", "line1", "line2", "line3")
            .assertComplete();
}

@Test
void name3() {
    observableFile2(hello2)
            .test()
            .assertComplete();
}
© www.soinside.com 2019 - 2024. All rights reserved.