我有一个数据流,需要使用 JDBC 将其与静态表连接,然后返回数据流。问题是我明白了
表接收器'anonymous_datastream_sink$1'不支持消费 更新和删除由节点 Join 产生的更改
有没有一种方法可以像仅追加流而不是动态表一样处理连接?我知道我可以将表转换为变更日志,但这使得为这种相对简单的任务编写代码变得非常麻烦。
public class Main {
public static class myPojo {
public String id;
public myPojo() {
// Empty ctor
}
public myPojo(String id) {
this.id = id;
}
}
public static class myPojoExtra extends myPojo {
public String extra_data;
public myPojoExtra() {
// Empty ctor
}
}
public static void createStaticTable(StreamTableEnvironment tEnv) {
// Define a table schema
Schema schema = Schema.newBuilder()
.column("id", DataTypes.STRING())
.column("extra_data", DataTypes.STRING())
.build();
// Create a table
tEnv.createTable("StaticTable",
TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "file:///tmp/flink")
.format("json")
.build());
// Insert data
tEnv.executeSql("INSERT INTO StaticTable VALUES ('a', 'xxxx'), ('b', 'yyyy')");
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Start with datastream
DataStream<myPojo> source = env.fromElements(new myPojo("a"), new myPojo("b"));
tEnv.createTemporaryView("tempView", source);
// Simulate JDBC table with filesystem table
createStaticTable(tEnv);
// Join with static table
Table result = tEnv.sqlQuery("SELECT * FROM tempView LEFT JOIN StaticTable USING (id)");
// Move back to datastream
DataStream<myPojoExtra> ds = tEnv.toDataStream(result, myPojoExtra.class);
ds.print();
env.execute("");
}
为了使流连接在语义上正确,更新流(收回或更新插入)是唯一可能的结果。您可以考虑用临时连接替换流连接。这将为您提供一个仅附加流作为输出。