消费者函数和服务函数访问同一个kafka流的状态存储是线程安全的吗?

问题描述 投票:0回答:0

我正在使用 kotlin + spring boot + kafka Streams 和 Spring Cloud Stream。
我有一个服务功能,可以验证客户端的请求并将其发送到 kafka 主题。 为了验证请求,它需要访问状态存储。
这是代码。

@Service
class MyService(
  private val kafkaTemplate: KafkaTemplate<String, String>,
  private val interactiveQueryService: InteractiveQueryService,
) {
  fun myMethod(username: String) {
    val stateStore = interactiveQueryService.getQueryableStore<ReadOnlyKeyValueStore<String, String>>(
        "myStateStore",
        QueryableStoreTypes.keyValueStore()
    )
    val someId = stateStore.get(username)
    if (someId != null) { kafkaTemplate.send("myTopic", someId, someValue) }
}

这是我的消费者功能

@Configuration
class MyListener(
  private val interactiveQueryService: InteractiveQueryService,
) {
  
  // This function consumes the "myTopic"
  @Bean
  fun handleSomething() = Consumer<KStream<String, String>> { input ->
    input
      .map{ key, val -> 
         // some logic ..
         KeyValue(someKey, someValue)
       }
      .toTable(
                Materialized.`as`<String, String, KeyValueStore<Bytes, ByteArray>>("myStateStore")
                    .withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
            )
  }
}

我认为如果有很多客户的请求,

handleSomething
myMethod
可以并行运行。 如您所见,
myMethod
需要访问“myStateStore”来读取数据,并且
handleSomething
更新“myStateStore”。所以我想知道这个逻辑是线程安全的。 (我想知道 kafka 在内部以线程安全的方式处理这个问题)

spring-boot kotlin apache-kafka-streams spring-cloud-stream
© www.soinside.com 2019 - 2024. All rights reserved.