我正在努力在
StreamingTableEnvironment
中将 DataStream 注册为 Flink 表。
我使用的语法如下:
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.createTemporaryView(
"foo",
dataStream,
$("f0").as("foo-id"),
$("f1").as("foo-value")
)
在这种情况下,“foo-id”和“foo-value”是原始类型。在数据流中,我还有 JSON 对象,我希望理想地将其注册为行类型,类似于此处针对 Flink
CREATE TABLE
命令的建议:
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR)
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
);
参见:使用 Apache Flink SQL 从 Kafka 消息中获取嵌套字段
是否有类似的方法使用表达式定义嵌套类型?
我没有使用带有连接器的创建表命令来注册流,因为数据格式是自定义的,这就是我诉诸注册流的原因。
我尝试了返回类型并嵌套更多,这个解决方案似乎有效:
DataStream<Row> testDataStream = env.fromCollection(Arrays.asList(
Row.of("sherin", 1L, Row.of(100L, 200L)),
Row.of("thomas", 1L, Row.of(100L, 200L))
)).returns(
Types.ROW(
Types.STRING,
Types.LONG,
Types.ROW_NAMED(
new String[]{"val1", "val2"},
Types.LONG,
Types.LONG)
));
tableEnv.createTemporaryView(
"foo",
testDataStream,
$("f0").as("name"),
$("f1").as("c"),
$("f2").as("age"));
Table testTable = tableEnv.sqlQuery(
"select name, c, age.val1, age.val2 from foo"
);
DataStream<Row> result = tableEnv.toAppendStream(
testTable,
TypeInformation.of(Row.class)
);
result.print().setParallelism(2);
如果有更好的方法可以做到这一点,我仍然愿意接受想法。