如果表有更新插入,带有 Iceberg 快照流的 Flink SQL 不会做出反应

问题描述 投票:0回答:1

我定义了一些 Iceberg v2 表和一个 Flink 作业,该作业在转换为另一个 Iceberg 表之前以流式传输方式读取它们。

如果源表是基本表,那么订阅它们效果很好,并且 SQL 查询可以连续运行。

但是如果表是用

'write.upsert.enabled'='true'
定义的,那么订阅的Flink SQL将只读取一次,并且不会对新的快照做出反应。即使 SQL 定义要求它监视间隔并且流策略是任何增量版本。

正常运行的Flink流式查询:

INSERT INTO iceberg.target_packaging
SELECT
  usr.`user_id` AS `user_id`,
  usr.`adress` AS `address`,
  ord.`item_id` AS `item_id`,
  ....
FROM
  iceberg.source_users /*+  OPTIONS('streaming'='true', 'monitor-interval'='15s')  */ usr
JOIN
  iceberg.source_orders /*+  OPTIONS('streaming'='true', 'monitor-interval'='15s')  */ ord ON usr.`user_id` = ord.`user_id`;

如果源 Iceberg 表定义如下,则流式连接效果很好:

CREATE TABLE iceberg.source_users (
  `user_id` STRING,
  `adress` STRING,
  ....
  PRIMARY KEY (`user_id`) NOT ENFORCED
) with ('format-version'='2');

Resulting table properties example: 
[current-snapshot-id=7980858807056176990,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd]

但是流式连接仅运行一次,然后停止在新快照上触发。但它并没有完成,只是停止从源头做出反应并且不产生新记录。

CREATE TABLE iceberg.source_users (
  `user_id` STRING,
  `adress` STRING,
  ....
  PRIMARY KEY (`user_id`) NOT ENFORCED
) with ('format-version'='2', 'write.upsert.enabled'='true');

Resulting table properties example: 
[current-snapshot-id=3566387524956156231,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd,write.upsert.enabled=true]

在我的 Flink 作业中,我只需定义连接器并运行 SQL 连接/插入。源表和目标表都已定义。

我还注意到,如果我有一个 SQL Join,如果至少一个表启用了 upsert,它也会停止流式传输。

查看 Iceberg 和 Flink 的文档,我没有发现任何迹象表明启用 upsert 会改变行为。仅启用 Flink 监控策略和检查点,确实如此。我正在使用 Flink 1.17.2 和 Iceberg 1.4.2

apache-flink flink-streaming flink-sql apache-iceberg
1个回答
0
投票

我在Github上从Iceberg社区得到了答案,基本上不支持这个。

Flink 流式只读支持仅附加表 ATM。

当您插入表时,它会生成appedds,但是当您切换到 > upsert 模式时,您开始向表添加更新更改。追加 > 以流模式读取,但更新被忽略,因为我们没有 > 正确发出删除记录的能力

查看 Spark 文档,这同样适用于 Spark Streaming。

© www.soinside.com 2019 - 2024. All rights reserved.