合并时`onClose`是否会传播到底层流?

问题描述 投票:0回答:1

我的代码将多个 IO 流合并(平面映射、连接)为一个流。使用

resource-with-try
块包装该流是否正确关闭了底层流?

public ImmutableListMultiMap<Long, Long> find(long customerId, ImmutableList<Long> itemIds) {
    Stream<List<Long>> chunks =
        Streams.stream(Iterators.partition(chunks.iterator(), CHUNK_SIZE));
    return Streams.mapWithIndex(
            chunk,
            (chunk, index) -> {
              System.out.println("reading chunk " + index);
              return ImmutableList.copyOf(chunk);
            })
        .flatMap(
            chunk ->
                findItems(customerId, chunk))
        .collect(toImmutableListMultimap(Pair::getFirst, Pair::getSecond));
}

Stream<Pair<Long, Long>> findItems(Long customerId, ImmutableList<Long> itemIds) {
 return Stream.concat(findBigItems(customerId, itemIds), findSmallItems(customerId, itemIds));
}

Stream<Pair<Long, Long>> findSmallItems(Long customerId, ImmutableList<Long> itemIds) {
 // returns a stream from an IO read
}

Stream<Pair<Long, Long>> findBigItems(Long customerId, ImmutableList<Long> itemIds) {
 // returns a stream from an IO read
}

虽然代码中没有明确指出

try-with-resource
,但
flatMap
应该执行相同的操作(即自动关闭资源)。

java java-stream guava try-with-resources
1个回答
0
投票

是的,正如我在评论中暗示的那样,您只需要添加一些日志记录即可确认:

.onClose(() -> System.out.println("onClose was called"))

这是一个人为的示例流:

static Stream<Integer> newStream() {
    return IntStream.range(0,2).boxed()
            .onClose(() -> System.out.println("onClose newStream"))
            .flatMap(i -> Stream.of(i, i+1).onClose(() -> System.out.println("onClose flatMap Stream.of i="+i)));
}

如果您使用以下命令运行此流,您可以看到何时使用“onClose”调用 - 或不使用:

System.out.println("TEST1 "+newStream().count());
System.out.println("===");

try(Stream<Integer> stream = newStream()) {
    System.out.println("TEST2 "+stream.count());
}
System.out.println("===");

try(Stream<Integer> stream = Stream.concat(newStream(), newStream())
            .onClose(() -> System.out.println("onClose Stream.concat"))) {
    System.out.println("TEST3 "+stream.count());
}

TEST1 不在 try-with-resources 内并且错过了一些 onClose,但在 onClose 上运行 flatMap 流。 TEST2/3 按预期打印所有 onClose 调用:

onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
TEST1 4
===
onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
TEST2 4
onClose newStream
===
onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
onClose flatMap Stream.of i=0
onClose flatMap Stream.of i=1
TEST3 8
onClose newStream
onClose newStream
onClose Stream.concat
© www.soinside.com 2019 - 2024. All rights reserved.