Flink TableAPI:Parquet 文件中缺少 PartitionedBy 列

问题描述 投票:0回答:1

我正在使用

filesystem
连接器使用 TableAPI 以
parquet
格式将数据导入 S3。我观察到镶木地板文件中缺少 partitionedBy 列。以下是我正在使用的查询:

CREATE TABLE data_to_sink (
    record_id STRING NOT NULL,
    request_id STRING NOT NULL,
    source_name STRING NOT NULL,
    event_type STRING NOT NULL,
    event_name STRING NOT NULL,
    `date` STRING,
    results_count BIGINT
) PARTITIONED BY (record_id, source_name, `date`) WITH (
    'connector' = 'filesystem',
    'path' = '<S3 path>',
    'format' = 'parquet'
);

INSERT INTO data_to_sink
SELECT record_id, request_id, source_name, event_type, event_name,
DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '2' MINUTE), 'yyyy-MM-dd') AS record_date, COUNT(*) results_count
FROM data_from_source
GROUP BY record_id, request_id, source_name, event_type, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE);

我可以看到正在创建的 parquet 文件,但是当我使用

parquet-cli
工具验证架构时,架构不显示
record_id
source_name
date
字段。我也去查了Flink的文档,但是没找到这方面的设置

这有什么已知的问题吗?

apache-flink flink-sql
1个回答
0
投票

我通过克隆 record_id、source_name 列然后按这些列进行分区来解决这个问题。

CREATE TABLE data_to_sink (
    record_id STRING NOT NULL,
    request_id STRING NOT NULL,
    source_name STRING NOT NULL,
    event_type STRING NOT NULL,
    event_name STRING NOT NULL,
    `date` STRING,
    results_count BIGINT,
    recordId STRING,
    sourceName STRING
) PARTITIONED BY (recordId, sourceName, `date`) WITH (
    'connector' = 'filesystem',
    'path' = '<S3 path>',
    'format' = 'parquet'
);

INSERT INTO data_to_sink
SELECT record_id, request_id, source_name, event_type, event_name,
DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '2' MINUTE), 'yyyy-MM-dd') AS record_date, COUNT(*) results_count, 
record_id AS recordId, source_name AS sourceName 
FROM data_from_source
GROUP BY record_id, request_id, source_name, event_type, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE);
© www.soinside.com 2019 - 2024. All rights reserved.