如何持续使用来自Apache Pulsar的消息?

问题描述 投票:0回答:1

如何使用Akka Streams不断消耗来自Apache Pulsar的消息并打印每条消息?

下面是我从pulsar4s库中找到的示例代码。而不是将消息发布到另一个主题,如何打印消耗的消息?

val consumerFn = () => client.consumer(ConsumerConfig(Seq(intopic), Subscription("mysub")))
val producerFn = () => client.producer(ProducerConfig(outtopic))

val control = source(consumerFn, Some(MessageId.earliest))
                .map { consumerMessage => ProducerMessage(consumerMessage.data) }
                .to(sink(producerFn)).run()
scala akka-stream apache-pulsar
1个回答
0
投票

您可以简单地使用Sink.foreach(println))

例如

source(consumerFn, Some(MessageId.earliest)).
.runWith(Sink.foreach(println))
© www.soinside.com 2019 - 2024. All rights reserved.