获取 InMemoryWindowStore 中键值对的数量(如果可能的话,还可以获取窗口的数量)的最有效方法是什么?我想定期捕获该号码以进行监控。
我现在能想到的唯一方法就是调用 all() 方法并遍历标点符号中的所有内容。这样做的问题是它会长时间阻塞流处理,我不确定它是否能满足我的吞吐量要求。另一种选择可能是使用单独的线程,但我不确定它是否安全。
橱窗商店中似乎没有approximateNumEntries()的等价物。是否有任何 API 允许在橱窗商店中捕获相同的内容?从单独的线程遍历状态存储对象是否安全?
考虑到您的担忧,我会考虑使用带有 KeyValueIterator 的 ReadOnlyWindowStore。
像这样:
ReadOnlyWindowStore<K, V> windowStore = streams.store(storeName, QueryableStoreTypes.windowStore());
KeyValueIterator<Windowed<K>, V> iterator = windowStore.all();
int count = 0;
while(iterator.hasNext()) {
iterator.next();
count++;
}
iterator.close();
关于线程安全:
Kafka Streams 的状态存储(包括 ReadOnlyWindowStore)默认情况下不是线程安全的。从流线程或处理器访问状态存储通常是安全的,但对于其他线程的访问,您应该使用 ReadWriteLock 或类似的机制来确保线程安全。
这是一个例子:
ReadWriteLock lock = new ReentrantReadWriteLock();
lock.readLock().lock();
try {
ReadOnlyWindowStore<K, V> windowStore = streams.store(storeName, QueryableStoreTypes.windowStore());
KeyValueIterator<Windowed<K>, V> iterator = windowStore.all();
int count = 0;
while(iterator.hasNext()) {
iterator.next();
count++;
}
iterator.close();
} finally {
lock.readLock().unlock();
}