我在使用 Apache Flink 写入 HBase 表时遇到问题。我已经成功配置了从 Kafka 读取和写入以及从 HBase 读取 RowKey。但是,当尝试写入 HBase 时,仅写入 RowKey,而列包含空值。下面是表创建和插入代码我尝试了两种方法,一种是sql,第二种是TableApi:
tEnv.executeSql("CREATE TABLE hTable (\n" +
" rowkey INT,\n" +
" persona_data ROW<age INT, name STRING>,\n" +
" PRIMARY KEY (rowkey) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'hbase-2.2',\n" +
" 'table-name' = 'test1',\n" +
" 'zookeeper.quorum' = 'This line is deleted due to privacy'\n" +
");");
tEnv.executeSql("insert into hTable select 2 as rowKey, ROW(age, name) as persona_data from kafka_source");
TableResult result = tEnv.from("kafka_source")
.select(lit(2).as("rowKey"), row($("age"), $("name")).as("persona_data"))
.insertInto("hTable")
.execute();
我希望 persona_data 列(年龄和姓名)使用从 Kafka 获取的值进行填充。但是,当我执行此代码后检查 HBase 表时,仅存在行键,而列族仍为空。如果您能深入了解列值为何未写入 HBase,我们将不胜感激。
null 值。它可以帮助做到
TableResult result = tEnv.from("kafka_source")
.select(lit(2).as("rowKey"), row($("age"), $("name")).as("persona_data"))
result.printSchema()
插入之前。我认为当你构建你的行时,你会有类似的东西
`persona_data` ROW<`f0` INT NOT NULL, `f1` STRING> NOT NULL
但是你的表有字段<age INT, name STRING>