如何注册具有嵌套字段的 Flink Table schema?

问题描述 投票:0回答:1

我正在努力在

StreamingTableEnvironment
中将 DataStream 注册为 Flink 表。

我使用的语法如下:

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

tableEnv.createTemporaryView(
       "foo",
       dataStream,
       $("f0").as("foo-id"),
       $("f1").as("foo-value") 
)

在这种情况下,“foo-id”和“foo-value”是原始类型。在数据流中,我还有 JSON 对象,我希望理想地将其注册为行类型,类似于此处针对 Flink

CREATE TABLE
命令的建议:

CREATE TABLE input(
     id VARCHAR,
     title VARCHAR,
     properties ROW(`foo` VARCHAR)
) WITH (
    'connector' = 'kafka-0.11',
    'topic' = 'my-topic',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'python-test',
    'format' = 'json'
);

参见:使用 Apache Flink SQL 从 Kafka 消息中获取嵌套字段

是否有类似的方法使用表达式定义嵌套类型?

我没有使用带有连接器的创建表命令来注册流,因为数据格式是自定义的,这就是我诉诸注册流的原因。

apache-flink flink-streaming flink-sql flink-table-api
1个回答
2
投票

我尝试了返回类型并嵌套更多,这个解决方案似乎有效:

DataStream<Row> testDataStream = env.fromCollection(Arrays.asList(
     Row.of("sherin", 1L, Row.of(100L, 200L)),
     Row.of("thomas", 1L, Row.of(100L, 200L))
)).returns(
    Types.ROW(
             Types.STRING,
             Types.LONG,
             Types.ROW_NAMED(
                  new String[]{"val1", "val2"},
                  Types.LONG,
                  Types.LONG)
));

tableEnv.createTemporaryView(
    "foo",
    testDataStream,
    $("f0").as("name"),
    $("f1").as("c"),
    $("f2").as("age"));

Table testTable = tableEnv.sqlQuery(
     "select name, c, age.val1, age.val2 from foo"
);

DataStream<Row> result = tableEnv.toAppendStream(
    testTable,
    TypeInformation.of(Row.class)
);

result.print().setParallelism(2);

如果有更好的方法可以做到这一点,我仍然愿意接受想法。

© www.soinside.com 2019 - 2024. All rights reserved.