我正在使用 kotlin + spring boot + kafka Streams 和 Spring Cloud Stream。
我有一个服务功能,可以验证客户端的请求并将其发送到 kafka 主题。
为了验证请求,它需要访问状态存储。
这是代码。
@Service
class MyService(
private val kafkaTemplate: KafkaTemplate<String, String>,
private val interactiveQueryService: InteractiveQueryService,
) {
fun myMethod(username: String) {
val stateStore = interactiveQueryService.getQueryableStore<ReadOnlyKeyValueStore<String, String>>(
"myStateStore",
QueryableStoreTypes.keyValueStore()
)
val someId = stateStore.get(username)
if (someId != null) { kafkaTemplate.send("myTopic", someId, someValue) }
}
这是我的消费者功能
@Configuration
class MyListener(
private val interactiveQueryService: InteractiveQueryService,
) {
// This function consumes the "myTopic"
@Bean
fun handleSomething() = Consumer<KStream<String, String>> { input ->
input
.map{ key, val ->
// some logic ..
KeyValue(someKey, someValue)
}
.toTable(
Materialized.`as`<String, String, KeyValueStore<Bytes, ByteArray>>("myStateStore")
.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
)
}
}
我认为如果有很多客户的请求,
handleSomething
和myMethod
可以并行运行。
如您所见,myMethod
需要访问“myStateStore”来读取数据,并且handleSomething
更新“myStateStore”。所以我想知道这个逻辑是线程安全的。 (我想知道 kafka 在内部以线程安全的方式处理这个问题)