Flink 没有写入 hbase

问题描述 投票:0回答:1

我在使用 Apache Flink 写入 HBase 表时遇到问题。我已经成功配置了从 Kafka 读取和写入以及从 HBase 读取 RowKey。但是,当尝试写入 HBase 时,仅写入 RowKey,而列包含空值。下面是表创建和插入代码我尝试了两种方法,一种是sql,第二种是TableApi:

tEnv.executeSql("CREATE TABLE hTable (\n" +
    " rowkey INT,\n" +
    " persona_data ROW<age INT, name STRING>,\n" +
    " PRIMARY KEY (rowkey) NOT ENFORCED\n" +
    ") WITH (\n" +
    " 'connector' = 'hbase-2.2',\n" +
    " 'table-name' = 'test1',\n" +
    " 'zookeeper.quorum' = 'This line is deleted due to privacy'\n" +
    ");");

tEnv.executeSql("insert into hTable select 2 as rowKey, ROW(age, name) as persona_data from kafka_source");

TableResult result = tEnv.from("kafka_source")
    .select(lit(2).as("rowKey"), row($("age"), $("name")).as("persona_data"))
    .insertInto("hTable")
    .execute();

我希望 persona_data 列(年龄和姓名)使用从 Kafka 获取的值进行填充。但是,当我执行此代码后检查 HBase 表时,仅存在行键,而列族仍为空。如果您能深入了解列值为何未写入 HBase,我们将不胜感激。

java apache-flink hbase flink-streaming
1个回答
0
投票
当数据模式和表模式之间出现问题(预期)时,通常会出现

null 值。它可以帮助做到

TableResult result = tEnv.from("kafka_source") .select(lit(2).as("rowKey"), row($("age"), $("name")).as("persona_data")) result.printSchema()
插入之前。

我认为当你构建你的行时,你会有类似的东西

`persona_data` ROW<`f0` INT NOT NULL, `f1` STRING> NOT NULL
但是你的表有字段

<age INT, name STRING>


    

© www.soinside.com 2019 - 2024. All rights reserved.