FlinkSQL:如何在SQL中过滤出错误格式的JSON?

问题描述 投票:0回答:2
CREATE TABLE user_log (
    a STRING,
    b STRING
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'test',
    'connector.properties.0.key' = 'zookeeper.connect',
    'connector.properties.0.value' = '',
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = '',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true',
    'format.fail-on-missing-field' = 'false'
);

正确的格式是{“ a”:1,“ b”:2},但是kafka发送了错误的数据:AABB ,该程序将停止。如何在SQL中过滤出错误格式的JSON?

apache-flink flink-streaming flink-sql
2个回答
0
投票

定义表的sc配置时,您可以设置类似以下内容:

'format.ignore-parse-errors' = 'true',    -- optional: skip fields and rows with parse errors instead of failing;

应该完全按照自己的意愿去做。


0
投票

在Flink 1.11(即将发布)中,这些格式选项已添加(并且默认都为false)。参见FLINK-17663

'json.fail-on-missing-field' = false,
'json.ignore-parse-errors' = false,

我不确定您在早期版本中打算做什么。

© www.soinside.com 2019 - 2024. All rights reserved.