我正在使用 Flink EO。重新启动 Flink 作业时,我收到此警告:
2023-12-30 13:07:44.538 [共平面地图 -> 水槽: 水槽1 (3/8)#0] 警告 o.a.f.s.api.functions.sink.TwoPhaseCommitSinkFunction - 事务 KafkaTransactionState [transactionalId=ABCD, ProducerId=1028910, epoch=3] 已打开 2813524 毫秒。这接近甚至是 超过交易超时900000毫秒。
但是提交成功了。我不明白如果事务打开时间超过配置的超时时间,为什么提交会成功。 (交易.最大超时.ms)
2023-12-30 13:07:44.907 [共平地图 -> 水槽: 水槽1 (3/8)#0] INFO o.a.f.s.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer 3/8 已提交恢复事务 TransactionHolder{handle=KafkaTransactionState [transactionalId=ABCD, ProducerId=1028910,纪元=3],transactionStartTime=1703938851014}
即使检查了 TwoPhaseCommitFunction 代码,这个警告不应该意味着提交肯定会失败吗?
private void recoverAndCommitInternal(TransactionHolder<TXN> transactionHolder) {
try {
logWarningIfTimeoutAlmostReached(transactionHolder);
recoverAndCommit(transactionHolder.handle);
} catch (final Exception e) {
final long elapsedTime = clock.millis() - transactionHolder.transactionStartTime;
if (ignoreFailuresAfterTransactionTimeout && elapsedTime > transactionTimeout) {
LOG.error(
"Error while committing transaction {}. "
+ "Transaction has been open for longer than the transaction timeout ({})."
+ "Commit will not be attempted again. Data loss might have occurred.",
transactionHolder.handle,
transactionTimeout,
e);
} else {
throw e;
}
}
}
private void logWarningIfTimeoutAlmostReached(TransactionHolder<TXN> transactionHolder) {
final long elapsedTime = transactionHolder.elapsedTime(clock);
if (transactionTimeoutWarningRatio >= 0
&& elapsedTime > transactionTimeout * transactionTimeoutWarningRatio) {
LOG.warn(
"Transaction {} has been open for {} ms. "
+ "This is close to or even exceeding the transaction timeout of {} ms.",
transactionHolder.handle,
elapsedTime,
transactionTimeout);
}
}
我有什么遗漏的吗?问题是,即使 1 小时后开始作业,较旧的事务也会被提交?
发生这种情况是因为 FLink 正在恢复事务吗?如果给定相同的生产者 ID 和纪元,kafka 是否允许恢复中止的消息?
producer = initTransactionalProducer(transaction.transactionalId, false);
producer.resumeTransaction(transaction.producerId, transaction.epoch);
producer.commitTransaction();
如果不是超时,哪些参数实际上决定提交失败?
仅供参考:transaction.abort.timed.out.transaction.cleanup.interval.ms使用默认值。
您看到的警告消息由 logWarningIfTimeoutAlmostReached 方法记录,该方法检查事务打开的持续时间是否接近或超过事务超时。然而,这并不一定意味着事务将无法提交。
在两阶段提交协议中,协议可能会停滞很长时间。这在 2PC 中被称为阻塞问题。
警告是为了提醒您交易已打开很长时间,这可能会导致问题,但不会自动导致交易失败。
实际的提交操作是由 recoverAndCommit 方法执行的。如果提交操作期间发生异常,recoverAndCommitInternal方法会检查事务打开时间是否超过事务超时时间。如果有,并且 ignoreFailuresAfterTransactionTimeout 为 true,则记录错误并且不会重试提交。否则,将重新抛出异常。
关于 Kafka 在给定相同的生产者 ID 和纪元的情况下允许恢复中止的消息的问题,答案是肯定的。 Kafka 的事务生产者使用生产者 ID 和纪元来识别和管理正在进行的事务。如果具有相同 id 和 epoch 的生产者恢复交易,它可以从上次中断的地方继续。
决定提交是否失败的参数通常与recoverAndCommit方法的具体实现相关,这取决于您使用的具体sink函数。例如,就 FlinkKafkaProducer 而言,如果 Kafka 代理出现问题、生产者 ID 或纪元不再有效或者存在网络问题,则提交可能会失败。