如何仅从反应流中发出累积和?

问题描述 投票:0回答:3

我有一个用例,其中流应该仅在累积“总和”等于或超过给定值 n 时发出。让我们以 n = 5 的六个整数为例。

+---+------+---------+
| i | Emit |   Sum   |
+---+------+---------+
| 1 |    - | 1       |
| 2 |    - | 3       |
| 3 |    5 | 1       |
| 4 |    5 | 0       |
| 5 |    5 | 0       |
| 2 |    2 | 0 (end) |
+---+------+---------+

如您所见,除非总和等于或超过 5,否则不会发出任何内容,但最后一个元素除外,无论如何都会发出该元素。

一旦发出一个项目,总和就会减少该值(n)。实际上,我正在从网络调用中读取数据,然后将它们发送给仅接受固定大小块的下游消费者,当然,最后一个除外(上游已完成)。

我正在使用项目 Reactor Flux 作为

Publisher
;我找不到任何方法可以让我执行上面所示的操作。
scan
最接近,但它也会发出需要过滤掉的中间元素。

java reactive-programming publish-subscribe project-reactor reactive-streams
3个回答
1
投票

实际上,我正在从网络调用中读取数据,然后 将它们发送给仅接受固定尺寸的下游消费者 当然,除了最后一个块之外(上游已完成)。

我突然想到,我自己尝试拆分响应

Flux
可能有点晚了,而且相当困难;相反,我可以使用 Netty FixedLengthFrameDecoder 之类的东西,它正是我正在寻找的东西。

这让我找到了 reactor-netty 源代码,经过广泛的挖掘,我找到了我所需要的。

fun get(url: String, maxChunkSize: Int): List<ByteArray> {
    return HttpClient.create()
        .httpResponseDecoder { it.maxChunkSize(maxChunkSize) }
        .get()
        .uri(url)
        .responseContent()
        .asByteArray()
        .collectList()
        .block()!!
}

关键部分是

httpResponseDecoder { it.maxChunkSize(maxChunkSize) }
;单元测试证明这是有效的:

@Test

fun testHonorsMaxChunkSize() {
    val maxChunkSize = 4096
    val chunks = FixedLengthResponseFrameClient.get(
        "http://doesnotexist.nowhere/binary", maxChunkSize
    )

    assertThat(chunks.subList(0, chunks.size - 1))
        .allMatch { it.size ==  maxChunkSize}
    assertThat(chunks.last().size).isLessThanOrEqualTo(maxChunkSize)
}

WebClient
可以配置自定义
HttpClient
(用
httpResponseDecoder
配置)如下图:

WebClient
  .builder()
  .clientConnector(ReactorClientHttpConnector(httpClient))
  .build()
  .get()
  .uri("uri")
  .exchange()
  .flatMapMany { it.body(BodyExtractors.toDataBuffers()) }
  ...

这些缓冲区的大小将是在

HttpClient.httpResponseDecoder
中设置的大小(默认为 8192 Kb)。


0
投票

这不可能直接在

Flux
对象上执行,但如果您有权访问创建
Flux
对象的资源,则可能会实现解决方案。由于在流(Flux)内部,您无法访问先前的元素,因此您可以在资源索引上创建
Flux
并直接从索引
Flux
访问此资源(因为其只读操作)。例如这样的事情:

List<Integer> list = List.of(1, 2, 3, 4, 5, 2);
AtomicReference<Integer> atomicSum = new AtomicReference<>(0);
return Flux.fromStream(IntStream.range(0, list.size() - 1).boxed())
        .flatMap(i -> {
            int sum = atomicSum.updateAndGet((integer -> integer + list.get(i)));
            if (sum >= 5) {
                atomicSum.updateAndGet(integer -> integer - 5);
                return Flux.just(5);
            }

            return (i.equals(list.size() -1))
                    ? Flux.just(list.get(i)) // emit last element even if sum was not 5
                    : Flux.empty();
        }); // emitted element's

请注意,这不是一个好的做法,我不建议这样的解决方案。

Flux
对象处理可能会在线程之间跳过,因此,如果您在
Flux
之外修改对象,则应该以同步方式进行(因此使用
AtomicReference
)。列表仅用于只读操作,因此没问题。另外,我不知道这部分代码是否真正有效,但我想向您展示如果您有权访问创建
Flux
对象的资源,您将如何找到解决方案。

编辑:即使这样的解决方案也行不通。我自己错了,

Flux
对象不会在线程之间跳过,但可能由多个线程处理,导致单个原子引用无效状态。这种云仍然可以通过一些同步机制(例如锁)而不是原子引用来解决,但远远超出了平均开发人员的经验。您确定不能使用
scan()
函数,因为您可以提供自己的累加器函数作为参数?


0
投票

如果您需要保持运行总计或以其他方式维护从中导出通量的状态,一种方法是创建一个订阅第一个通量的新通量,并通过订阅维护状态,例如

Flux<Long> flux = Flux.just(1L, 2L, 3L, 4L, 5L);

Sinks.Many<Long> runningTotalSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Long> runningTotalFlux = runningTotalSink.asFlux()
        .doOnSubscribe(subscription -> {
            AtomicLong runningTotal = new AtomicLong();
            flux
                    .doOnCancel(subscription::cancel)
                    .doOnError(runningTotalSink::tryEmitError)
                    .doOnComplete(runningTotalSink::tryEmitComplete)
                    .subscribe(i -> {
                        runningTotalSink.tryEmitNext(runningTotal.accumulateAndGet(i, Long::sum));
                    });
        });

runningTotalFlux.toStream().forEach(i -> {
    System.out.println(i);
});
© www.soinside.com 2019 - 2024. All rights reserved.