我们有一个多组件应用程序,它在组件之间提供Reactive Streams API。一些组件使用Akka Streams实现,其他组件使用例如Akka Streams。反应堆。
在一个组件中,我们注意到Streams没有处理任何消息,尽管提供的Publisher提供了记录。我把问题解决了以下情况:
Publisher<String> stringPublisher = Source
.from(Lists.newArrayList("Hello", "World", "!"))
.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);
Source<String, NotUsed> allMessages = Source
.fromPublisher(stringPublisher)
.toMat(BroadcastHub.of(String.class, 256), Keep.right())
.run(materializer);
allMessages
.runForeach(System.out::println, materializer)
.toCompletableFuture()
.get();
一个组件提供Publisher(它需要是Publisher,因为API使用Reactive Streams API,而不是Akka Streams API)。此发布者是从另一个Akka Streams Source创建的,并使用Sink.asPublisher
变成了一个发布者。
当我们现在使用BroadcastHub从Publisher开始实现Stream时,根本没有记录。
我尝试使用Reactor Publisher:
Publisher<String> stringPublisher = Flux.fromIterable(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);
这按预期工作。不幸的是,我无法排除另一个组件从Akka Stream Source创建其Publisher的情况。
有谁知道出了什么问题?
我现在知道如何解决它,如果我开始在mapMaterializedValue中使用resultunt的resultunt来运行它:
Publisher<String> stringPublisher = Source
.from(Lists.newArrayList("Hello", "World", "!"))
.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);
Source
.fromPublisher(stringPublisher)
.alsoToMat(BroadcastHub.of(String.class, 256), Keep.right())
.mapMaterializedValue(source -> source
.runWith(Sink.foreach(System.out::println, materializer))
.run(materializer)
.toCompletableFuture()
.get();
编辑:TL; DR:解释在qazxsw poi中说明:
这里发生的是当您附加其他流时主流已经完成。有时它可能足够快,在完成之前看到一些元素。
---
因此,看起来BroadcastHub实际上在消费者附加到BroadcastHub创建的Source之前丢弃了元素。
文档说它没有丢弃:
如果没有订户连接到该集线器,那么它将不会丢弃任何元素,而是反向压力上游生产者,直到订户到达。
实际上在大多数情况下都是如此,但我发现了一些行为不正确的情况:
https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html
这适用于100个案例中的~3个。但是以下情况适用于所有情况(我只是添加了一个油门来生成更慢的元素):
public void testBH3() throws ExecutionException, InterruptedException {
Publisher<String> stringPublisher = Source
.from(Lists.newArrayList("Hello", "World", "!"))
.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);
Source<String, NotUsed> allMessages = Source
.fromPublisher(stringPublisher)
.toMat(BroadcastHub.of(String.class, 256), Keep.right())
.run(materializer);
allMessages
.runForeach(System.out::println, materializer)
.toCompletableFuture()
.get();
}
public void repeat() throws ExecutionException, InterruptedException {
for (int i = 0; i < 100; i++) {
testBH3();
System.out.println("------");
}
}
因此,在我看来,当没有连接Sink时,BroadcastHub有时会丢弃元素。