我有 flink-sql 应用程序,只需通过连接多个表来执行简单的简单插入到丰富表中。
create table T1 (...) WITH ( 'connector' = 'upsert-kafka','topic' = 'T1', ...)
create table T2 (...) WITH ( 'connector' = 'upsert-kafka','topic' = 'T2', ...)
create table enrich (...) WITH ( 'connector' = 'upsert-kafka','topic' = 'enrich', ...)
创建临时视图distinct_t1 AS 选择 * 从(选择*, ROW_NUMBER() OVER (PARTITION BY id ORDER BY change_date desc) AS rownum 从 T1 出发) WHERE rownum = 1;
创建临时视图distinct_t2 AS 选择 * 从(选择*, ROW_NUMBER() OVER (PARTITION BY id ORDER BY change_date desc) AS rownum 从T2航站楼出发) WHERE rownum = 1;
insert into enrich
select ... from distinct_t1 t1 inner join distinct_t2 t2 ... t2.t1_id = t1.id and t2.client_id=t1.client_id
注:
问题1:我在哪里可以看到任务的关键组分配(我有1个taskSlot)
问题2:我在哪里可以看到正在运行的应用程序中键(数据)到键组的映射?
问题3:加入中使用的列(如t1.client_id,t1.id)成为密钥并随机播放到相同的密钥组?或者主键中定义的列也使用并在源运算符中触发洗牌??
问题 1 和 2 的答案是该信息没有在任何地方呈现。这是您不需要关心的实现细节。
实现的作用是使用此公式计算每个密钥的密钥组
keygroupId = MathUtils.murmurHash(key.hashCode()) % maxParallelism
其中
maxParallelism
是关键组的总数;默认值为 128。
然后使用以下公式将密钥组分配给插槽:
slotId = keygroupId * parallelism / maxParallelism
我不明白问题 3,但也许这些信息足以让您自己弄清楚。