我惊讶地发现 Flink 中没有
DataStream
的外连接(DataStream 文档)。
对于
DataSet
,除了常规的 leftOuterJoin
(DataSet docs)之外,您还有所有选项:
rightOuterJoin
、fullOuterJoin
和 join
。但对于 DataStream
你只有普通的旧连接。
这是由于
DataStream
的一些基本属性导致不可能有外连接吗?或者也许我们可以在(不久的?)未来期待这一点?
我真的可以在
DataStream
上使用外连接来解决我正在解决的问题...有什么方法可以实现类似的行为吗?
您可以使用
DataStream.coGroup()
转换来实现外连接。 CoGroupFunction
接收两个迭代器(每个输入一个),它们服务于某个键的所有元素,如果没有找到匹配元素,则可能为空。这允许实现外连接功能。
在 Flink 的下一个版本中,可能会将对外连接的一流支持添加到 DataStream API 中。目前我不知道有任何此类努力。但是,在 Apache Flink JIRA 中创建问题可能会有所帮助。
一种方法是使用以下 api 从流 -> 表 -> 流:FLINK TABLE API - OUTER JOIN
这是一个java示例:
DataStream<String> data = env.readTextFile( ... );
DataStream<String> data2Merge = env.readTextFile( ... );
...
tableEnv.registerDataStream("myDataLeft", data, "left_column1, left_column2");
tableEnv.registerDataStream("myDataRight", data2Merge, "right_column1, right_column2");
String queryLeft = "SELECT left_column1, left_column2 FROM myDataLeft";
String queryRight = "SELECT right_column1, right_column2 FROM myDataRight";
Table tableLeft = tableEnv.sqlQuery(queryLeft);
Table tableRight = tableEnv.sqlQuery(queryRight);
Table fullOuterResult = tableLeft.fullOuterJoin(tableRight, "left_column1 == right_column1").select("left_column1, left_column2, right_column2");
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(fullOuterResult, Row.class);
这里是使用 DataStream 和 Table API 的完整工作示例。请参阅 flink 官方页面上的 Table API 下有关 DataStream API 集成的更多详细信息。
/**
* Two input data streams
*
* <p>@<code>
* 1. "Alice", "Bob", "John"
* 2. "Mike", "Sam", "Adam", "Alice"
* <p>
* The expected full outer join is
* (Alice), (Alice)
* (John), (null)
* (Bob), (null)
* (null), (Mike)
* (null), (Sam)
* (null), (Adam)
* </code>
*/
@Slf4j
public class StreamOuterJoinUsingTable {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(streamEnv, EnvironmentSettings.inBatchMode());
// convert first stream into a table and register it as an object to perform outer join
// later
DataStream<String> nameStream = streamEnv.fromElements("Alice", "Bob", "John");
Table nameTable = tableEnv.fromDataStream(nameStream).as("name");
tableEnv.createTemporaryView("nameTable", nameTable);
// convert first stream into a table and register it as an object to perform outer join
// later
DataStream<String> detailStream = streamEnv.fromElements("Mike", "Sam", "Adam", "Alice");
Table detailTable = tableEnv.fromDataStream(detailStream).as("detail");
tableEnv.createTemporaryView("detailTable", detailTable);
// outer join on the tables result into a table
Table result =
tableEnv.sqlQuery(
"SELECT * FROM "
+ "nameTable FULL OUTER JOIN detailTable "
+ "ON nameTable.name = detailTable.detail");
// convert the table to a dataStream and map the Row objects to String using map
DataStream<String> resultStream =
tableEnv.toDataStream(result)
.map(
(MapFunction<Row, String>)
row ->
"First : ("
+ row.getField(0)
+ ") second : ("
+ row.getField(1)
+ ")");
// print as a sink
resultStream.print();
/*
First : (Alice) second : (Alice)
First : (John) second : (null)
First : (Bob) second : (null)
First : (null) second : (Mike)
First : (null) second : (Sam)
First : (null) second : (Adam)
*/
streamEnv.execute();
}
}