我有一个用例,其中流应该仅在累积“总和”等于或超过给定值 n 时发出。让我们以 n = 5 的六个整数为例。
+---+------+---------+
| i | Emit | Sum |
+---+------+---------+
| 1 | - | 1 |
| 2 | - | 3 |
| 3 | 5 | 1 |
| 4 | 5 | 0 |
| 5 | 5 | 0 |
| 2 | 2 | 0 (end) |
+---+------+---------+
如您所见,除非总和等于或超过 5,否则不会发出任何内容,但最后一个元素除外,无论如何都会发出该元素。
一旦发出一个项目,总和就会减少该值(n)。实际上,我正在从网络调用中读取数据,然后将它们发送给仅接受固定大小块的下游消费者,当然,最后一个除外(上游已完成)。
我正在使用项目 Reactor Flux 作为
Publisher
;我找不到任何方法可以让我执行上面所示的操作。 scan
最接近,但它也会发出需要过滤掉的中间元素。
实际上,我正在从网络调用中读取数据,然后 将它们发送给仅接受固定尺寸的下游消费者 当然,除了最后一个块之外(上游已完成)。
我突然想到,我自己尝试拆分响应
Flux
可能有点晚了,而且相当困难;相反,我可以使用 Netty FixedLengthFrameDecoder 之类的东西,它正是我正在寻找的东西。
这让我找到了 reactor-netty 源代码,经过广泛的挖掘,我找到了我所需要的。
fun get(url: String, maxChunkSize: Int): List<ByteArray> {
return HttpClient.create()
.httpResponseDecoder { it.maxChunkSize(maxChunkSize) }
.get()
.uri(url)
.responseContent()
.asByteArray()
.collectList()
.block()!!
}
关键部分是
httpResponseDecoder { it.maxChunkSize(maxChunkSize) }
;单元测试证明这是有效的:
@Test
fun testHonorsMaxChunkSize() {
val maxChunkSize = 4096
val chunks = FixedLengthResponseFrameClient.get(
"http://doesnotexist.nowhere/binary", maxChunkSize
)
assertThat(chunks.subList(0, chunks.size - 1))
.allMatch { it.size == maxChunkSize}
assertThat(chunks.last().size).isLessThanOrEqualTo(maxChunkSize)
}
WebClient
可以配置自定义HttpClient
(用httpResponseDecoder
配置)如下图:
WebClient
.builder()
.clientConnector(ReactorClientHttpConnector(httpClient))
.build()
.get()
.uri("uri")
.exchange()
.flatMapMany { it.body(BodyExtractors.toDataBuffers()) }
...
这些缓冲区的大小将是在
HttpClient.httpResponseDecoder
中设置的大小(默认为 8192 Kb)。
这不可能直接在
Flux
对象上执行,但如果您有权访问创建 Flux
对象的资源,则可能会实现解决方案。由于在流(Flux)内部,您无法访问先前的元素,因此您可以在资源索引上创建 Flux
并直接从索引 Flux
访问此资源(因为其只读操作)。例如这样的事情:
List<Integer> list = List.of(1, 2, 3, 4, 5, 2);
AtomicReference<Integer> atomicSum = new AtomicReference<>(0);
return Flux.fromStream(IntStream.range(0, list.size() - 1).boxed())
.flatMap(i -> {
int sum = atomicSum.updateAndGet((integer -> integer + list.get(i)));
if (sum >= 5) {
atomicSum.updateAndGet(integer -> integer - 5);
return Flux.just(5);
}
return (i.equals(list.size() -1))
? Flux.just(list.get(i)) // emit last element even if sum was not 5
: Flux.empty();
}); // emitted element's
请注意,这不是一个好的做法,我不建议这样的解决方案。
Flux
对象处理可能会在线程之间跳过,因此,如果您在Flux
之外修改对象,则应该以同步方式进行(因此使用AtomicReference
)。列表仅用于只读操作,因此没问题。另外,我不知道这部分代码是否真正有效,但我想向您展示如果您有权访问创建 Flux
对象的资源,您将如何找到解决方案。
编辑:即使这样的解决方案也行不通。我自己错了,
Flux
对象不会在线程之间跳过,但可能由多个线程处理,导致单个原子引用无效状态。这种云仍然可以通过一些同步机制(例如锁)而不是原子引用来解决,但远远超出了平均开发人员的经验。您确定不能使用 scan()
函数,因为您可以提供自己的累加器函数作为参数?
如果您需要保持运行总计或以其他方式维护从中导出通量的状态,一种方法是创建一个订阅第一个通量的新通量,并通过订阅维护状态,例如
Flux<Long> flux = Flux.just(1L, 2L, 3L, 4L, 5L);
Sinks.Many<Long> runningTotalSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Long> runningTotalFlux = runningTotalSink.asFlux()
.doOnSubscribe(subscription -> {
AtomicLong runningTotal = new AtomicLong();
flux
.doOnCancel(subscription::cancel)
.doOnError(runningTotalSink::tryEmitError)
.doOnComplete(runningTotalSink::tryEmitComplete)
.subscribe(i -> {
runningTotalSink.tryEmitNext(runningTotal.accumulateAndGet(i, Long::sum));
});
});
runningTotalFlux.toStream().forEach(i -> {
System.out.println(i);
});