将连接表转换为数据流

问题描述 投票:0回答:1

我有一个数据流,需要使用 JDBC 将其与静态表连接,然后返回数据流。问题是我明白了

表接收器'anonymous_datastream_sink$1'不支持消费 更新和删除由节点 Join 产生的更改

有没有一种方法可以像仅追加流而不是动态表一样处理连接?我知道我可以将表转换为变更日志,但这使得为这种相对简单的任务编写代码变得非常麻烦。

public class Main {

    public static class myPojo {
        public String id;

        public myPojo() {
            // Empty ctor
        }

        public myPojo(String id) {
            this.id = id;
        }
    }

    public static class myPojoExtra extends myPojo {
        public String extra_data;

        public myPojoExtra() {
            // Empty ctor
        }
    }

    public static void createStaticTable(StreamTableEnvironment tEnv) {
        // Define a table schema
        Schema schema = Schema.newBuilder()
                .column("id", DataTypes.STRING())
                .column("extra_data", DataTypes.STRING())
                .build();

        // Create a table
        tEnv.createTable("StaticTable", 
                TableDescriptor.forConnector("filesystem")
                .schema(schema)
                .option("path", "file:///tmp/flink")
                .format("json")
                .build());

        // Insert data
        tEnv.executeSql("INSERT INTO StaticTable VALUES ('a', 'xxxx'), ('b', 'yyyy')");
    }

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // Start with datastream
        DataStream<myPojo> source = env.fromElements(new myPojo("a"), new myPojo("b"));
        tEnv.createTemporaryView("tempView", source);
        // Simulate JDBC table with filesystem table
        createStaticTable(tEnv);
        
        // Join with static table
        Table result = tEnv.sqlQuery("SELECT * FROM tempView LEFT JOIN StaticTable USING (id)");
        // Move back to datastream
        DataStream<myPojoExtra> ds = tEnv.toDataStream(result, myPojoExtra.class);
        ds.print();
        env.execute("");
    }
java apache-flink flink-sql
1个回答
0
投票

为了使流连接在语义上正确,更新流(收回或更新插入)是唯一可能的结果。您可以考虑用临时连接替换流连接。这将为您提供一个仅附加流作为输出。

© www.soinside.com 2019 - 2024. All rights reserved.