为什么Flink Exactly Once commit不会失败?

问题描述 投票:0回答:1

我正在使用 Flink EO。重新启动 Flink 作业时,我收到此警告:

2023-12-30 13:07:44.538 [共平面地图 -> 水槽: 水槽1 (3/8)#0] 警告 o.a.f.s.api.functions.sink.TwoPhaseCommitSinkFunction - 事务 KafkaTransactionState [transactionalId=ABCD, ProducerId=1028910, epoch=3] 已打开 2813524 毫秒。这接近甚至是 超过交易超时900000毫秒。

但是提交成功了。我不明白如果事务打开时间超过配置的超时时间,为什么提交会成功。 (交易.最大超时.ms)

2023-12-30 13:07:44.907 [共平地图 -> 水槽: 水槽1 (3/8)#0] INFO o.a.f.s.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer 3/8 已提交恢复事务 TransactionHolder{handle=KafkaTransactionState [transactionalId=ABCD, ProducerId=1028910,纪元=3],transactionStartTime=1703938851014}

即使检查了 TwoPhaseCommitFunction 代码,这个警告不应该意味着提交肯定会失败吗?

private void recoverAndCommitInternal(TransactionHolder<TXN> transactionHolder) {
        try {
            logWarningIfTimeoutAlmostReached(transactionHolder);
            recoverAndCommit(transactionHolder.handle);
        } catch (final Exception e) {
            final long elapsedTime = clock.millis() - transactionHolder.transactionStartTime;
            if (ignoreFailuresAfterTransactionTimeout && elapsedTime > transactionTimeout) {
                LOG.error(
                        "Error while committing transaction {}. "
                                + "Transaction has been open for longer than the transaction timeout ({})."
                                + "Commit will not be attempted again. Data loss might have occurred.",
                        transactionHolder.handle,
                        transactionTimeout,
                        e);
            } else {
                throw e;
            }
        }
    }

    private void logWarningIfTimeoutAlmostReached(TransactionHolder<TXN> transactionHolder) {
        final long elapsedTime = transactionHolder.elapsedTime(clock);
        if (transactionTimeoutWarningRatio >= 0
                && elapsedTime > transactionTimeout * transactionTimeoutWarningRatio) {
            LOG.warn(
                    "Transaction {} has been open for {} ms. "
                            + "This is close to or even exceeding the transaction timeout of {} ms.",
                    transactionHolder.handle,
                    elapsedTime,
                    transactionTimeout);
        }
    }

我有什么遗漏的吗?问题是,即使 1 小时后开始作业,较旧的事务也会被提交?

发生这种情况是因为 FLink 正在恢复事务吗?如果给定相同的生产者 ID 和纪元,kafka 是否允许恢复中止的消息?

 producer = initTransactionalProducer(transaction.transactionalId, false);
                producer.resumeTransaction(transaction.producerId, transaction.epoch);
                producer.commitTransaction();

如果不是超时,哪些参数实际上决定提交失败?

仅供参考:transaction.abort.timed.out.transaction.cleanup.interval.ms使用默认值。

apache-kafka apache-flink flink-streaming exactly-once
1个回答
0
投票

您看到的警告消息由 logWarningIfTimeoutAlmostReached 方法记录,该方法检查事务打开的持续时间是否接近或超过事务超时。然而,这并不一定意味着事务将无法提交。

在两阶段提交协议中,协议可能会停滞很长时间。这在 2PC 中被称为阻塞问题。

警告是为了提醒您交易已打开很长时间,这可能会导致问题,但不会自动导致交易失败。

关于 Kafka 在给定相同的生产者 ID 和纪元的情况下允许恢复中止的消息的问题,答案是肯定的。 Kafka 的事务生产者使用生产者 ID 和纪元来识别和管理正在进行的事务。如果具有相同 id 和 epoch 的生产者恢复交易,它可以从上次中断的地方继续。

© www.soinside.com 2019 - 2024. All rights reserved.