我有 flink-sql 应用程序,只需通过连接多个表来执行简单的简单插入到丰富表中。
create table T1 (...) WITH ( 'connector' = 'upsert-kafka','topic' = 'T1', ...)
create table T2 (...) WITH ( 'connector' = 'upsert-kafka','topic' = 'T2', ...)
create table enrich (...) WITH ( 'connector' = 'upsert-kafka','topic' = 'enrich', ...)
CREATE TEMPORARY VIEW distinct_t1 AS
SELECT *
FROM (SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY change_date desc) AS rownum
FROM T1)
WHERE rownum = 1;
CREATE TEMPORARY VIEW distinct_t2 AS
SELECT *
FROM (SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY change_date desc) AS rownum
FROM T2)
WHERE rownum = 1;
insert into enrich
select ... from distinct_t1 t1 inner join distinct_t2 t2 ... t2.t1_id = t1.id and t2.client_id=t1.client_id
注:
问题1:基于上述配置,我正在可视化我的作业图(包括在TM1上运行的源、排名、连接和接收器操作符,从分区0读取数据) 像这样的事情..我对吗?
TM1
TM2
TM8
**问题2:我如何验证我的操作员是否被链接?或我必须在环境中设置的任何配置 **
问题3:如果这些运算符没有像我想象的那样链接,那么我如何验证哪些运算符正在哪里运行??
如果您可以访问 Flink Web UI,该信息会以易于理解的方式呈现。这通常在作业管理器的 8081 端口上运行。