我定义了一些 Iceberg v2 表和一个 Flink 作业,该作业在转换为另一个 Iceberg 表之前以流式传输方式读取它们。
如果源表是基本表,那么订阅它们效果很好,并且 SQL 查询可以连续运行。
但是如果表是用
'write.upsert.enabled'='true'
定义的,那么订阅的Flink SQL将只读取一次,并且不会对新的快照做出反应。即使 SQL 定义要求它监视间隔并且流策略是任何增量版本。
正常运行的Flink流式查询:
INSERT INTO iceberg.target_packaging
SELECT
usr.`user_id` AS `user_id`,
usr.`adress` AS `address`,
ord.`item_id` AS `item_id`,
....
FROM
iceberg.source_users /*+ OPTIONS('streaming'='true', 'monitor-interval'='15s') */ usr
JOIN
iceberg.source_orders /*+ OPTIONS('streaming'='true', 'monitor-interval'='15s') */ ord ON usr.`user_id` = ord.`user_id`;
如果源 Iceberg 表定义如下,则流式连接效果很好:
CREATE TABLE iceberg.source_users (
`user_id` STRING,
`adress` STRING,
....
PRIMARY KEY (`user_id`) NOT ENFORCED
) with ('format-version'='2');
Resulting table properties example:
[current-snapshot-id=7980858807056176990,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd]
但是流式连接仅运行一次,然后停止在新快照上触发。但它并没有完成,只是停止从源头做出反应并且不产生新记录。
CREATE TABLE iceberg.source_users (
`user_id` STRING,
`adress` STRING,
....
PRIMARY KEY (`user_id`) NOT ENFORCED
) with ('format-version'='2', 'write.upsert.enabled'='true');
Resulting table properties example:
[current-snapshot-id=3566387524956156231,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd,write.upsert.enabled=true]
在我的 Flink 作业中,我只需定义连接器并运行 SQL 连接/插入。源表和目标表都已定义。
我还注意到,如果我有一个 SQL Join,如果至少一个表启用了 upsert,它也会停止流式传输。
查看 Iceberg 和 Flink 的文档,我没有发现任何迹象表明启用 upsert 会改变行为。仅启用 Flink 监控策略和检查点,确实如此。我正在使用 Flink 1.17.2 和 Iceberg 1.4.2
我在Github上从Iceberg社区得到了答案,基本上不支持这个。
Flink 流式只读支持仅附加表 ATM。
当您插入表时,它会生成appedds,但是当您切换到 > upsert 模式时,您开始向表添加更新更改。追加 > 以流模式读取,但更新被忽略,因为我们没有 > 正确发出删除记录的能力
查看 Spark 文档,这同样适用于 Spark Streaming。