在Consumer API中使用createDrainingControl?

问题描述 投票:0回答:1

我正在浏览Alpakka中Kafka的Consumer API文档。我遇到了这段代码。根据我的理解,使用msg.committableOffset()提交偏移量。那么为什么我们需要.toMat()和mapMaterializedValue()。我不能只将它附加到Sink.Ignore()?

Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(
          1,
          msg ->
              business(msg.record().key(), msg.record().value())
                  .thenApply(done -> msg.committableOffset()))
      .toMat(Committer.sink(committerSettings.withMaxBatch(1)), Keep.both())
      .mapMaterializedValue(Consumer::createDrainingControl)
      .run(materializer);
akka kafka-consumer-api akka-stream alpakka
1个回答
1
投票

您无法附加到Sink.ignore,因为您已经附加了Commiter.Sink。但是你可以丢弃具体化的价值观。

该示例使用toMat with Keep.both来保留物化值,来自源的控制和来自接收器的未来[完成]。使用这两个值,它会在mapMaterializedValue中创建一个DrainingControl,允许您在停止之前停止流或排出流,或者在流停止时收到通知。

如果你不关心这个控件(虽然你应该),你可以使用

Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(
          1,
          msg ->
              business(msg.record().key(), msg.record().value())
                  .thenApply(done -> msg.committableOffset()))
      .to(Committer.sink(committerSettings.withMaxBatch(1)))
      .run(materializer);
© www.soinside.com 2019 - 2024. All rights reserved.