我们正在尝试使用Akka Streams和Alpakka Kafka来使用服务中的一系列事件。为了处理事件处理错误,我们使用Kafka autocommit和多个队列。例如,如果我们有一个主题user_created
,我们想要从产品服务中消费,我们还创建了user_created_for_products_failed
和user_created_for_products_dead_letter
。这两个额外的主题与特定的Kafka消费者群体相关联。如果一个事件未能被处理,它会进入失败的队列,我们会在五分钟内再次消耗 - 如果它再次失败则会转到死信。
在部署时,我们希望确保不会丢失事件。所以我们试图在停止应用程序之前停止流。正如我所说,我们正在使用自动提交,但所有这些“飞行”的事件尚未处理。一旦流和应用程序停止,我们就可以部署新代码并再次启动应用程序。
阅读文档后,我们已经看到了KillSwitch
功能。我们在其中看到的问题是shutdown
方法返回Unit
而不是我们期望的Future[Unit]
。我们不确定我们不会丢失使用它的事件,因为在测试中看起来它太快而无法正常工作。
作为一种解决方法,我们为每个流创建一个ActorSystem
并使用terminate
方法(返回Future[Terminate]
)。这个解决方案的问题在于我们不认为每个流创建一个ActorSystem
会很好地扩展,并且terminate
需要花费大量时间来解决(在测试中需要一分钟才能关闭)。
你遇到过像这样的问题吗?是否有更快的方法(与ActorSystem.terminate
相比)停止流并确保Source
发出的所有事件都已处理完毕?
来自documentation(强调我的):
使用外部偏移存储时,调用
Consumer.Control.shutdown()
就足以完成Source
,从而开始完成流。
val (consumerControl, streamComplete) =
Consumer
.plainSource(consumerSettings,
Subscriptions.assignmentWithOffset(
new TopicPartition(topic, 0) -> offset
))
.via(businessFlow)
.toMat(Sink.ignore)(Keep.both)
.run()
consumerControl.shutdown()
Consumer.control.shutdown()
返回Future[Done]
。从它的Scaladoc描述:
关闭消费者
Source
。在关闭之前,它将等待未完成的偏移提交请求完成。
或者,如果您在Kafka中使用偏移存储,请使用Consumer.Control.drainAndShutdown
,它也会返回Future
。再次从文档(其中包含有关drainAndShutdown
在幕后的内容的更多信息):
val drainingControl =
Consumer
.committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
.mapAsync(1) { msg =>
business(msg.record).map(_ => msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
val streamComplete = drainingControl.drainAndShutdown()
drainAndShutdown
的Scaladoc描述:
停止从
Source
生成消息,等待流完成并关闭消费者Source
,以便所有消费的消息到达流的末尾。流传输失败将被传播,无论如何都会关闭源。