org.apache.kafka.streams.KafkaStreams#store 方法线程安全吗?

问题描述 投票:0回答:1

在我的 Kafka Streams 应用程序中,我有以下 2 个线程:

  • 线程 A:这会创建一个包含状态存储和所有内容的 Topology 对象,然后最终调用 KafkaStreams 类的构造函数和 start() 方法。
  • 线程 B:它引用线程 A 创建的 KafkaStreams 对象。这会定期在对象上调用 KafkaStreams#store,获取 ReadOnlyWindowStore 实例并读取存储中的数据以进行监控。

我想知道我的应用程序在线程安全性方面是否可以。我不太担心 ReadOnlyWindowStore 因为 javadoc (link) 说:

实现应该是线程安全的,因为需要并发读取和写入。

但是至于

KafkaStreams#store
,我不太确定从单独的线程调用是否可以。我担心的一件事是它涉及 HashMap,它不是线程安全的here。 KafkaStreams#removeStreamThread() 调用可以改变此 HashMap 对象。鉴于此,我不太确定这是否被设计为线程安全的。

我的问题是:从与实例化 KafkaStreams 对象的线程不同的线程中调用 KafkaStreams#store 是否可以?或者在同一线程中调用 store() 方法并仅与其他线程共享 ReadOnlyWindowStore 实例会更好吗? KafkaStream 类是否设计为线程安全的?

multithreading apache-kafka-streams
1个回答
0
投票

事实证明,实际上从 Kafka Streams 3.6.1 开始这不是线程安全的。更多详细信息可以在https://issues.apache.org/jira/browse/KAFKA-16055

找到
© www.soinside.com 2019 - 2024. All rights reserved.