如何在flinkSql中使用Map<STRING, OBJECT>数据类型(列)

问题描述 投票:0回答:1

`我对 flinkSql 相当陌生,对于我们的一个用例,我们需要定义一个 Map 类型的列。数据通过 json 格式接收,需要具体化并转发到新的 kafka 主题。我也试图制定一个自定义的用户定义函数 但遇到错误。是否有另一种方法可以在 flink sql 中定义通用对象类型,相当于 java 中的 Object 类。

这是一个示例表

CREATE TABLE IF NOT EXISTS <TABLE_NAME> 
(     org                     VARCHAR,     
      displayName             VARCHAR NOT NULL,     
      ....
      ....
      attributes              MAP<STRING, BINARY>,
      ....     
      created                 TIMESTAMP_LTZ(3),
      PRIMARY KEY (org, id) NOT ENFORCED ) 
WITH (     
'connector' = 'upsert-kafka',     
'topic' = '${output_topic_name}',     
'properties.bootstrap.servers' = '${sink_kafka_servers}',     
'value.format' = 'json',     
'key.format' = 'json',     
'properties.allow.auto.create.topics' = 'true',     
'properties.num.partitions' = '${sink_properties_num_partitions}',     
'value.json.timestamp-format.standard' = 'ISO-8601',     
'sink.parallelism' = '${sink_parallelism}',     
'sink.buffer-flush.interval' = '${sink_flush_interval}',     
'sink.buffer-flush.max-rows' = '${sink_flush_max_rows}' );

用户自定义函数

@FunctionHint(
output = @DataTypeHint(value = "MAP\<STRING, OBJECT\>", bridgedTo = MapData.class))
@SuppressWarnings("unchecked")
public class JsonToMapData extends ScalarFunction {

    private static final Logger LOG = LoggerFactory.getLogger(JsonToMapData.class);
    private final static ObjectMapper objectMapper = new ObjectMapper();
    
    public MapData eval(String json){
        Map<StringData, BinaryRawValueData> genericMapData;
        ....
        ....

}

运行服务器时出错

java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: OBJECT java.lang.RuntimeException: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: OBJECT

如有任何指点,我们将不胜感激。

尝试使用 BINARY 和 ROW 作为 Map> 和 MAP 的列类型的替代,但它们似乎都不起作用`

apache apache-flink flink-streaming flink-sql
1个回答
0
投票

您好,您解决了吗?

© www.soinside.com 2019 - 2024. All rights reserved.