如何使用flink sql从Kafka读取空值

问题描述 投票:0回答:1

我有一个表定义为

CREATE TABLE kafka_table (
  field1 INT,
  field2 STRING,
  field3 DOUBLE
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic_name',
  'properties.bootstrap.servers' = 'your_bootstrap_servers',
  'properties.group.id' = 'your_consumer_group_id',
  'format' = 'json'
);

我想读取一条空值的Kafka消息。整个身体都是空的。

上面的代码当前跳过读取消息。我使用的是flink 1.18

apache-flink flink-sql
1个回答
0
投票

您应该看到 json 格式选项

也许你应该更改json.map-null-key.mode默认情况下它处于“FAIL”状态(遇到带有空键的map时会抛出异常。)

如果你想要字符串“null”或其他字符串(如“n/a”),你可以设置json.map-null-key.literal...

© www.soinside.com 2019 - 2024. All rights reserved.