为什么我偶尔会收到一个InvalidStateStoreException PARTITIONS_REVOKED,而不是在检索商店进行查询时运行?

问题描述 投票:1回答:1

我正在访问一个状态存储来查询它,并且必须使用try / catch块包装store()语句来重试它,因为有时我得到这个异常:

org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store customers-store because the stream thread is PARTITIONS_REVOKED, not RUNNING
    at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:57)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1053)
    at com.codependent.kafkastreams.customer.service.CustomerService.getCustomer(CustomerService.kt:75)
    at com.codependent.kafkastreams.customer.service.CustomerServiceKt.main(CustomerService.kt:108)

这是用于检索商店的代码(完整代码在github repo上):

fun getCustomer(id: String): Customer? {
    var keyValueStore: ReadOnlyKeyValueStore<String, Customer>? = null
    while(keyValueStore == null) {
        try {
            keyValueStore = streams.store(CUSTOMERS_STORE, QueryableStoreTypes.keyValueStore<String, Customer>())
        } catch (ex: InvalidStateStoreException) {
            ex.printStackTrace()
        }
    }
    val customer = keyValueStore.get(id)
    return customer
}

这是主要计划:

fun main(args: Array<String>) {
    val customerService = CustomerService("main", "localhost:9092")
    customerService.initializeStreams()
    customerService.createCustomer(Customer("53", "Joey"))
    val customer = customerService.getCustomer("53")
    println(customer)
    customerService.stopStreams()
}

在先前的执行完成之后,异常会多次随机运行程序。注意:我对执行的Kafka集群没有任何操作并使用其默认配置。

apache-kafka apache-kafka-streams
1个回答
0
投票

在您访问商店时,Kafka Streams应用程序正在进行重新平衡,并且当时无法访问状态存储。您希望确保在应用程序状态为RUNNING而不是REBALANCING时仅查询存储。

您可以做的是在尝试从商店读取之前检查应用程序的状态,如下所示:

if(streams.state() == State.RUNNING) {
    keyValueStore = streams.store(...);
    val customer = keyValueStore.get(id);
    return customer;
}

还有一个KafkaStreams.setStateListener方法可用于注册KafkStreams.StateListener实现。每次应用程序更改其状态时都会调用StateListener.onChange方法。

© www.soinside.com 2019 - 2024. All rights reserved.