Flink:DataStream 上没有外连接?

问题描述 投票:0回答:3

我惊讶地发现 Flink 中没有

DataStream
的外连接(DataStream 文档)。

对于

DataSet
,除了常规的
leftOuterJoin
DataSet docs
)之外,您还有所有选项:
rightOuterJoin
fullOuterJoin
join。但对于
DataStream
你只有普通的旧连接。

这是由于

DataStream
的一些基本属性导致不可能有外连接吗?或者也许我们可以在(不久的?)未来期待这一点?

我真的可以在

DataStream
上使用外连接来解决我正在解决的问题...有什么方法可以实现类似的行为吗?

apache-flink flink-streaming
3个回答
1
投票

您可以使用

DataStream.coGroup()
转换来实现外连接。
CoGroupFunction
接收两个迭代器(每个输入一个),它们服务于某个键的所有元素,如果没有找到匹配元素,则可能为空。这允许实现外连接功能。

在 Flink 的下一个版本中,可能会将对外连接的一流支持添加到 DataStream API 中。目前我不知道有任何此类努力。但是,在 Apache Flink JIRA 中创建问题可能会有所帮助。


0
投票

一种方法是使用以下 api 从流 -> 表 -> 流:FLINK TABLE API - OUTER JOIN

这是一个java示例:

    DataStream<String> data = env.readTextFile( ... );
    DataStream<String> data2Merge = env.readTextFile( ... );

    ...

    tableEnv.registerDataStream("myDataLeft", data, "left_column1, left_column2");
    tableEnv.registerDataStream("myDataRight", data2Merge, "right_column1, right_column2");

    String queryLeft = "SELECT left_column1, left_column2 FROM myDataLeft";
    String queryRight = "SELECT right_column1, right_column2 FROM myDataRight";

    Table tableLeft = tableEnv.sqlQuery(queryLeft);
    Table tableRight = tableEnv.sqlQuery(queryRight);

    Table fullOuterResult = tableLeft.fullOuterJoin(tableRight, "left_column1 == right_column1").select("left_column1, left_column2, right_column2");
    DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(fullOuterResult, Row.class);

0
投票

这里是使用 DataStream 和 Table API 的完整工作示例。请参阅 flink 官方页面上的 Table API 下有关 DataStream API 集成的更多详细信息。

  1. 将两个数据流转换为表。
  2. 将表注册为视图来执行SQL查询。
  3. 对注册视图执行完整外连接 SQL 查询。
  4. SQL查询的结果将是一个表。
  5. 使用toDataStream将结果表转换为dataStream。
/**
 * Two input data streams
 *
 * <p>@<code>
 * 1. "Alice", "Bob", "John"
 * 2. "Mike", "Sam", "Adam", "Alice"
 * <p>
 * The expected full outer join is
 *  (Alice), (Alice)
 *  (John), (null)
 *  (Bob), (null)
 *  (null), (Mike)
 *  (null), (Sam)
 *  (null), (Adam)
 *  </code>
 */
@Slf4j
public class StreamOuterJoinUsingTable {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment streamEnv =
                StreamExecutionEnvironment.getExecutionEnvironment();
        streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

        final StreamTableEnvironment tableEnv =
                StreamTableEnvironment.create(streamEnv, EnvironmentSettings.inBatchMode());

        // convert first stream into a table and register it as an object to perform outer join
        // later
        DataStream<String> nameStream = streamEnv.fromElements("Alice", "Bob", "John");
        Table nameTable = tableEnv.fromDataStream(nameStream).as("name");
        tableEnv.createTemporaryView("nameTable", nameTable);

        // convert first stream into a table and register it as an object to perform outer join
        // later
        DataStream<String> detailStream = streamEnv.fromElements("Mike", "Sam", "Adam", "Alice");
        Table detailTable = tableEnv.fromDataStream(detailStream).as("detail");
        tableEnv.createTemporaryView("detailTable", detailTable);

        // outer join on the tables result into a table
        Table result =
                tableEnv.sqlQuery(
                        "SELECT * FROM "
                                + "nameTable FULL OUTER JOIN detailTable "
                                + "ON nameTable.name = detailTable.detail");

        // convert the table to a dataStream and map the Row objects to String using map
        DataStream<String> resultStream =
                tableEnv.toDataStream(result)
                        .map(
                                (MapFunction<Row, String>)
                                        row ->
                                                "First : ("
                                                        + row.getField(0)
                                                        + ") second : ("
                                                        + row.getField(1)
                                                        + ")");
        // print as a sink
        resultStream.print();
        /*
        First : (Alice) second : (Alice)
        First : (John) second : (null)
        First : (Bob) second : (null)
        First : (null) second : (Mike)
        First : (null) second : (Sam)
        First : (null) second : (Adam)
         */
        streamEnv.execute();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.