如何通过datastrem API或Flink Table API / SQL在给定密钥和公共窗口上连接三个或更多数据流/表?

问题描述 投票:0回答:1

我想在给定的密钥和公共窗口上加入三个或更多数据流或表。但是我不知道如何正确编写代码。官方文档https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/给出了下面的示例,但是它只是连接两个数据流,那么如何在给定的密钥和公共窗口上连接三个或更多数据流?

dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});

我试图找出我首先使用公共窗口加入两个数据流,并使用结果数据流将第三个数据流加入公共窗口?但是,当我们将TimeCharacteristic设置为事件时间时,这三个数据流的事件时间的语义似乎会发生变化。

==================

对于FlinK Table API和SQL,同样的问题,如何在给定键和公共窗口上连接三个或更多表?官方文档https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sql.html只给出了单表的下面的例子。

Table result1 = tableEnv.sqlQuery(
"SELECT user, " +
"  TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,  " +
"  SUM(amount) FROM Orders " +
"GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");

我尝试编写如下的SQL来连接给定键和公共窗口上的三个表,但我不认为这是正确的。

String SQL = "SELECT" +
            " grades.user1  , SUM(salaries.amount)   FROM grades " +
            " INNER JOIN salaries ON   grades.user1 =   salaries.user1 " +
            " INNER JOIN person ON   grades.user1 =   person.user1 "+
             "GROUP BY grades.user1, TUMBLE(grades.proctime,  INTERVAL '5' SECOND) "   

那么,通过datastrem API或Flink Table API / SQL在给定密钥和公共窗口上连接三个或更多数据流/表的正确方法是什么?

在6/16/2018更新,以便更清楚地提出问题。

对于Flink SQL,我需要的,就像下面的Pseudocode一样,是连接三个表和一个共同的TumblingEventTimeWindow,也就是说DataStream API的替代版本,无论如何由Flink SQL表示,也意味着连接来自三个表的所有事件,发生在同一个TumblingEventTimeWindow中。

SELECT A.a, B.b, C.c
FROM A, B, C
WHERE A.x = B.x AND A.x = C.x AND
window(TumblingEventTimeWindows.of(Time.seconds(3))

似乎连接功能也在下面的Flink设计文档中提到:“事件时间翻滚窗口的Stream-Stream连接:加入两个流的元组,它们处于相同的翻滚事件时间窗口中”,我不知道是否Flink SQL已经实现了这种类型的Flink SQL连接功能。

https://docs.google.com/document/d/1TLayJNOTBle_-m1rQfgA6Ouj1oYsfqRjPcp1h2TVqdI/edit#

apache-flink flink-streaming flink-sql
1个回答
1
投票

很难对你的问题给出明确的答案,因为你需要的连接的语义不明确。 DataStream API的窗口化连接实现的语义与Table API / SQL的窗口连接不同。

在DataStream API上,您可以简单地定义另一个连接,如下所示:

firstStream
  .join(secondStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...})
  .join(thirdStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...})

由于Flink实现了标准SQL,您可以像往常一样定义三个表的连接:

SELECT A.a, B.b, C.c
  FROM A, B, C
  WHERE A.x = B.x AND A.x = C.x AND
        A.ts BETWEEN B.ts - INTERVAL '10' MINUTE AND B.ts + INTERVAL '10' MINUTE AND
        A.ts BETWEEN C.ts - INTERVAL '10' MINUTE AND C.ts + INTERVAL '10' MINUTE

窗口范围(A.ts BETWEEN B.ts - X AND B.ts + Y)可以根据需要定义。

© www.soinside.com 2019 - 2024. All rights reserved.