假设我有一个简单的处理器,可以输出每个键的最新记录,使用不一定与 Kafka 时间戳相同的内部版本(因此不可能压缩日志)。
public class LatestProcessor implements Processor<Long, Dto> {
private ProcessorContext context = null;
private KeyValueStore<Long, Dto> kvStore;
public LatestProcessor () {
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.kvStore = (KeyValueStore) context.getStateStore(Config.stateStore);
this.context.schedule(Duration.ofMillis(TimeUnit.MINUTES.toMillis(1)),
PunctuationType.WALL_CLOCK_TIME, (l) -> {
this.punctuate(l);
});
}
@Override
public void process(Long key, Dto message) {
Dto current = kvStore.get(key);
if (current == null || current.version() <= message.version()) {
kvStore.put(key, message);
}
}
public void punctuate(long timestamp) {
// processes all the messages in the state
KeyValueIterator<Long, Dto> iter = this.kvStore.all();
while (iter.hasNext()) {
KeyValue<Long, Dto> entry = iter.next();
context.forward(entry.key, entry.value);
kvStore.delete(entry.key);
}
iter.close();
context.commit();
}
@Override
public void close() {
kvStore.close();
}
}
该处理器是否可以在不记录日志的情况下通过状态存储实现容错?这要求我们只提交经过标点符号处理的记录。如果流提交了尚未经过标点的记录,那么我认为我们需要将日志记录添加到状态存储中。
我们可以假设拓扑只是这个处理器,并且它转发到下游主题。
上面的代码可以工作,但由于不能保证 commit() 提交,所以我们不清楚如何测试。
在 https://forum.confluence.io/t/fault-operative-stream-without-logging-in-state-store/9006 中回答我们无法在没有日志记录的情况下实现容错,因为提交可能发生在任何阶段