我目前正在尝试在apche flink中使用tableApi并将流数据保存为CSV文件,但我得到的只是partFiles而不是CSV文件。
我的代码如下所示:
CREATE TABLE MyTable3 (
transactionId STRING,
timestamps STRING
) WITH (
'connector' = 'kafka',
'topic' = 'financial_transactions',
'properties.bootstrap.servers' = 'kafka:29092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset'
'format' = 'json'
);
CREATE TABLE fs_table2 (
transactionId STRING
) WITH (
'connector'='filesystem',
'path'='abfs://[email protected]/data-project/data/output',
'format'='csv',
'sink.rolling-policy.file-size'='128MB',
'sink.rolling-policy.rollover-interval'='2 min'
);
INSERT INTO fs_table2 SELECT transactionId FROM MyTable3;
我得到的只是这些类型的partfiles:.part-0c9f9243-f9cc-4671-b734-0ba4185adf6e-0-1.inprogress.14ad1c71-15ec-470b-ac83-98a781e3afe4
如何将其保存为真正的csv文件?
我尝试过各种格式,但总是得到这些partFiles。
正如评论中提到的,您需要为 Flink 启用检查点来finalise部分文件,否则它将无限期地处于 inprogress 状态。
重要提示:在 STREAMING 模式下使用 FileSink 时需要启用检查点。零件文件只能在成功的检查点上完成。如果禁用检查点,零件文件将永远处于进行中或挂起状态,并且无法被下游系统安全地读取。
您可以使用启用检查点功能
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the checkpoint interval (e.g., every 5000 milliseconds)
env.enableCheckpointing(5000);
// Configure other checkpointing settings if needed
// env.getCheckpointConfig().setCheckpointTimeout(60000); // set checkpoint timeout
// env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // set max concurrent checkpoints