如何获取Kafka Streams InMemoryWindowStore中键值对的数量

问题描述 投票:0回答:1

获取 InMemoryWindowStore 中键值对的数量(如果可能的话,还可以获取窗口的数量)的最有效方法是什么?我想定期捕获该号码以进行监控。

我现在能想到的唯一方法就是调用 all() 方法并遍历标点符号中的所有内容。这样做的问题是它会长时间阻塞流处理,我不确定它是否能满足我的吞吐量要求。另一种选择可能是使用单独的线程,但我不确定它是否安全。

橱窗商店中似乎没有approximateNumEntries()的等价物。是否有任何 API 允许在橱窗商店中捕获相同的内容?从单独的线程遍历状态存储对象是否安全?

apache-kafka-streams
1个回答
1
投票

考虑到您的担忧,我会考虑使用带有 KeyValueIterator 的 ReadOnlyWindowStore。

像这样:

ReadOnlyWindowStore<K, V> windowStore = streams.store(storeName, QueryableStoreTypes.windowStore());
KeyValueIterator<Windowed<K>, V> iterator = windowStore.all();

int count = 0;
while(iterator.hasNext()) {
    iterator.next();
    count++;
}
iterator.close();

关于线程安全:

Kafka Streams 的状态存储(包括 ReadOnlyWindowStore)默认情况下不是线程安全的。从流线程或处理器访问状态存储通常是安全的,但对于其他线程的访问,您应该使用 ReadWriteLock 或类似的机制来确保线程安全。

这是一个例子:

ReadWriteLock lock = new ReentrantReadWriteLock();

lock.readLock().lock();
try {
    ReadOnlyWindowStore<K, V> windowStore = streams.store(storeName, QueryableStoreTypes.windowStore());
    KeyValueIterator<Windowed<K>, V> iterator = windowStore.all();

    int count = 0;
    while(iterator.hasNext()) {
        iterator.next();
        count++;
    }
    iterator.close();
} finally {
    lock.readLock().unlock();
}
© www.soinside.com 2019 - 2024. All rights reserved.