`我对 flinkSql 相当陌生,对于我们的一个用例,我们需要定义一个 Map 类型的列
这是一个示例表
CREATE TABLE IF NOT EXISTS <TABLE_NAME>
( org VARCHAR,
displayName VARCHAR NOT NULL,
....
....
attributes MAP<STRING, BINARY>,
....
created TIMESTAMP_LTZ(3),
PRIMARY KEY (org, id) NOT ENFORCED )
WITH (
'connector' = 'upsert-kafka',
'topic' = '${output_topic_name}',
'properties.bootstrap.servers' = '${sink_kafka_servers}',
'value.format' = 'json',
'key.format' = 'json',
'properties.allow.auto.create.topics' = 'true',
'properties.num.partitions' = '${sink_properties_num_partitions}',
'value.json.timestamp-format.standard' = 'ISO-8601',
'sink.parallelism' = '${sink_parallelism}',
'sink.buffer-flush.interval' = '${sink_flush_interval}',
'sink.buffer-flush.max-rows' = '${sink_flush_max_rows}' );
用户自定义函数
@FunctionHint(
output = @DataTypeHint(value = "MAP\<STRING, OBJECT\>", bridgedTo = MapData.class))
@SuppressWarnings("unchecked")
public class JsonToMapData extends ScalarFunction {
private static final Logger LOG = LoggerFactory.getLogger(JsonToMapData.class);
private final static ObjectMapper objectMapper = new ObjectMapper();
public MapData eval(String json){
Map<StringData, BinaryRawValueData> genericMapData;
....
....
}
运行服务器时出错
java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: OBJECT java.lang.RuntimeException: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: OBJECT
如有任何指点,我们将不胜感激。
尝试使用 BINARY 和 ROW 作为 Map
您好,您解决了吗?