我的代码将多个 IO 流合并(平面映射、连接)为一个流。使用
resource-with-try
块包装该流是否正确关闭了底层流?
public ImmutableListMultiMap<Long, Long> find(long customerId, ImmutableList<Long> itemIds) {
Stream<List<Long>> chunks =
Streams.stream(Iterators.partition(chunks.iterator(), CHUNK_SIZE));
return Streams.mapWithIndex(
chunk,
(chunk, index) -> {
System.out.println("reading chunk " + index);
return ImmutableList.copyOf(chunk);
})
.flatMap(
chunk ->
findItems(customerId, chunk))
.collect(toImmutableListMultimap(Pair::getFirst, Pair::getSecond));
}
Stream<Pair<Long, Long>> findItems(Long customerId, ImmutableList<Long> itemIds) {
return Stream.concat(findBigItems(customerId, itemIds), findSmallItems(customerId, itemIds));
}
Stream<Pair<Long, Long>> findSmallItems(Long customerId, ImmutableList<Long> itemIds) {
// returns a stream from an IO read
}
Stream<Pair<Long, Long>> findBigItems(Long customerId, ImmutableList<Long> itemIds) {
// returns a stream from an IO read
}
虽然代码中没有明确指出
try-with-resource
,但flatMap
应该执行相同的操作(即自动关闭资源)。
是的,正如我在评论中暗示的那样,您只需要添加一些日志记录即可确认:
.onClose(() -> System.out.println("onClose was called"))
这是一个人为的示例流:
static Stream<Integer> newStream() {
return IntStream.range(0,2).boxed()
.onClose(() -> System.out.println("onClose newStream"))
.flatMap(i -> Stream.of(i, i+1).onClose(() -> System.out.println("onClose flatMap Stream.of i="+i)));
}
如果您使用以下命令运行此流,您可以看到何时使用“onClose”调用 - 或不使用:
System.out.println("TEST1 "+newStream().count());
System.out.println("===");
try(Stream<Integer> stream = newStream()) {
System.out.println("TEST2 "+stream.count());
}
System.out.println("===");
try(Stream<Integer> stream = Stream.concat(newStream(), newStream())
.onClose(() -> System.out.println("onClose Stream.concat"))) {
System.out.println("TEST3 "+stream.count());
}
TEST1 不在 try-with-resources 内并且错过了一些 onClose,但在 onClose 上运行 flatMap 流。 TEST2/3 按预期打印所有 onClose 调用:
onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
TEST1 4
===
onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
TEST2 4
onClose newStream
===
onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
TEST3 8
onClose newStream
onClose newStream
onClose Stream.concat