Apache Flink 只生成partFiles

问题描述 投票:0回答:1

我目前正在尝试在apche flink中使用tableApi并将流数据保存为CSV文件,但我得到的只是partFiles而不是CSV文件。

我的代码如下所示:

CREATE TABLE MyTable3 (
  transactionId STRING,
  timestamps STRING 
) WITH (
  'connector' = 'kafka',
  'topic' = 'financial_transactions',
  'properties.bootstrap.servers' = 'kafka:29092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset'
  'format' = 'json'
);

CREATE TABLE fs_table2 (
  transactionId STRING 
) WITH (
  'connector'='filesystem',
  'path'='abfs://[email protected]/data-project/data/output',
  'format'='csv',
  'sink.rolling-policy.file-size'='128MB',
  'sink.rolling-policy.rollover-interval'='2 min'
);

INSERT INTO fs_table2 SELECT transactionId FROM MyTable3;

我得到的只是这些类型的partfiles:.part-0c9f9243-f9cc-4671-b734-0ba4185adf6e-0-1.inprogress.14ad1c71-15ec-470b-ac83-98a781e3afe4

如何将其保存为真正的csv文件?

我尝试过各种格式,但总是得到这些partFiles。

apache-flink
1个回答
0
投票

正如评论中提到的,您需要为 Flink 启用检查点来finalise部分文件,否则它将无限期地处于 inprogress 状态。

正如这里提到的

重要提示:在 STREAMING 模式下使用 FileSink 时需要启用检查点。零件文件只能在成功的检查点上完成。如果禁用检查点,零件文件将永远处于进行中或挂起状态,并且无法被下游系统安全地读取。

您可以使用启用检查点功能

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the checkpoint interval (e.g., every 5000 milliseconds)
env.enableCheckpointing(5000);

// Configure other checkpointing settings if needed
// env.getCheckpointConfig().setCheckpointTimeout(60000); // set checkpoint timeout
// env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // set max concurrent checkpoints
© www.soinside.com 2019 - 2024. All rights reserved.