在akka流中的ActorSystem关闭

问题描述 投票:0回答:1

我有一个akka流,它连续消耗来自kafka主题的数据。我从不关闭actor系统,我不希望我的应用程序关闭它是正确的方法吗?处理actorySystem关闭的正确方法是什么?

  implicit val actorSystem: ActorSystem = ActorSystem("mytest")
  implicit val materializer: ActorMaterializer =
    ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))

  val actorConfig = actorSystem.settings.config.getConfig("akka.kafka.consumer")

  val consumerSettings =
    ConsumerSettings(actorConfig, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(config.getString("kafka.hosts"))
      .withGroupId("mytestgrp")


  val flow = Consumer
    .atMostOnceSource(consumerSettings, Subscriptions.topics(config.getString("kafka.topic")))
    .grouped(500)
    .map(Pipeline.process)
    .withAttributes(supervisionStrategy(decider))

  flow.runWith(Sink.ignore)
apache-kafka akka-stream
1个回答
0
投票

当流完成时,您可以关闭actor系统

flow.runWith(Sink.ignore).onComplete {
    case _ => actorSystem.shutdown
}
© www.soinside.com 2019 - 2024. All rights reserved.