Kafka流处理器什么时候提交?不记录日志我们能做到容错吗?

问题描述 投票:0回答:1

假设我有一个简单的处理器,可以输出每个键的最新记录,使用不一定与 Kafka 时间戳相同的内部版本(因此不可能压缩日志)。

public class LatestProcessor implements Processor<Long, Dto> {
    private ProcessorContext context = null;
    private KeyValueStore<Long, Dto> kvStore;

    public LatestProcessor () {
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.kvStore = (KeyValueStore) context.getStateStore(Config.stateStore);
        this.context.schedule(Duration.ofMillis(TimeUnit.MINUTES.toMillis(1)),
                PunctuationType.WALL_CLOCK_TIME, (l) -> {
                    this.punctuate(l);
                });
    }


    @Override
    public void process(Long key, Dto message) {
        Dto current = kvStore.get(key);
        if (current == null || current.version() <= message.version()) {
              kvStore.put(key, message);
        }
    }


    public void punctuate(long timestamp) {
        // processes all the messages in the state
        KeyValueIterator<Long, Dto> iter = this.kvStore.all();
        while (iter.hasNext()) {
            KeyValue<Long, Dto> entry = iter.next();
            context.forward(entry.key, entry.value);
            kvStore.delete(entry.key);

          
        }
        iter.close();
        context.commit();
    }

    @Override
    public void close() {
        kvStore.close();
    }

}

该处理器是否可以在不记录日志的情况下通过状态存储实现容错?这要求我们只提交经过标点符号处理的记录。如果流提交了尚未经过标点的记录,那么我认为我们需要将日志记录添加到状态存储中。

我们可以假设拓扑只是这个处理器,并且它转发到下游主题。

上面的代码可以工作,但由于不能保证 commit() 提交,所以我们不清楚如何测试。

apache-kafka-streams
1个回答
0
投票

https://forum.confluence.io/t/fault-operative-stream-without-logging-in-state-store/9006 中回答我们无法在没有日志记录的情况下实现容错,因为提交可能发生在任何阶段

© www.soinside.com 2019 - 2024. All rights reserved.