为什么使用Sink.asPublisher创建的Publisher在BroadcastHub使用时不起作用?

问题描述 投票:0回答:1

我们有一个多组件应用程序,它在组件之间提供Reactive Streams API。一些组件使用Akka Streams实现,其他组件使用例如Akka Streams。反应堆。

在一个组件中,我们注意到Streams没有处理任何消息,尽管提供的Publisher提供了记录。我把问题解决了以下情况:

Publisher<String> stringPublisher = Source
    .from(Lists.newArrayList("Hello", "World", "!"))
    .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);

Source<String, NotUsed> allMessages = Source
    .fromPublisher(stringPublisher)
    .toMat(BroadcastHub.of(String.class, 256), Keep.right())
    .run(materializer);

allMessages
    .runForeach(System.out::println, materializer)
    .toCompletableFuture()
    .get();

一个组件提供Publisher(它需要是Publisher,因为API使用Reactive Streams API,而不是Akka Streams API)。此发布者是从另一个Akka Streams Source创建的,并使用Sink.asPublisher变成了一个发布者。

当我们现在使用BroadcastHub从Publisher开始实现Stream时,根本没有记录。

我尝试使用Reactor Publisher:

Publisher<String> stringPublisher = Flux.fromIterable(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);

这按预期工作。不幸的是,我无法排除另一个组件从Akka Stream Source创建其Publisher的情况。

有谁知道出了什么问题?

java akka-stream
1个回答
0
投票

我现在知道如何解决它,如果我开始在mapMaterializedValue中使用resultunt的resultunt来运行它:

Publisher<String> stringPublisher = Source
    .from(Lists.newArrayList("Hello", "World", "!"))
    .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);

Source
    .fromPublisher(stringPublisher)
    .alsoToMat(BroadcastHub.of(String.class, 256), Keep.right())
    .mapMaterializedValue(source -> source
         .runWith(Sink.foreach(System.out::println, materializer))
    .run(materializer)
    .toCompletableFuture()
    .get();

编辑:TL; DR:解释在qazxsw poi中说明:

这里发生的是当您附加其他流时主流已经完成。有时它可能足够快,在完成之前看到一些元素。

---

因此,看起来BroadcastHub实际上在消费者附加到BroadcastHub创建的Source之前丢弃了元素。

文档说它没有丢弃:

如果没有订户连接到该集线器,那么它将不会丢弃任何元素,而是反向压力上游生产者,直到订户到达。

Lightbend Forum

实际上在大多数情况下都是如此,但我发现了一些行为不正确的情况:

https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html

这适用于100个案例中的~3个。但是以下情况适用于所有情况(我只是添加了一个油门来生成更慢的元素):

public void testBH3() throws ExecutionException, InterruptedException {
    Publisher<String> stringPublisher = Source
        .from(Lists.newArrayList("Hello", "World", "!"))
        .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);

    Source<String, NotUsed> allMessages = Source
        .fromPublisher(stringPublisher)
        .toMat(BroadcastHub.of(String.class, 256), Keep.right())
        .run(materializer);

    allMessages
        .runForeach(System.out::println, materializer)
        .toCompletableFuture()
        .get();
}

public void repeat() throws ExecutionException, InterruptedException {
    for (int i = 0; i < 100; i++) {
        testBH3();
        System.out.println("------");
    }
}

因此,在我看来,当没有连接Sink时,BroadcastHub有时会丢弃元素。

© www.soinside.com 2019 - 2024. All rights reserved.