如何查看我的Task到KeyGroup到flink-sql应用程序中的Key分配是什么

问题描述 投票:0回答:1

我有 flink-sql 应用程序,只需通过连接多个表来执行简单的简单插入到丰富表中。

create table T1 (...) WITH ( 'connector' = 'upsert-kafka','topic' = 'T1', ...)
create table T2 (...) WITH ( 'connector' = 'upsert-kafka','topic' = 'T2', ...)
create table enrich (...) WITH ( 'connector' = 'upsert-kafka','topic' = 'enrich', ...)

创建临时视图distinct_t1 AS 选择 * 从(选择*, ROW_NUMBER() OVER (PARTITION BY id ORDER BY change_date desc) AS rownum 从 T1 出发) WHERE rownum = 1;

创建临时视图distinct_t2 AS 选择 * 从(选择*, ROW_NUMBER() OVER (PARTITION BY id ORDER BY change_date desc) AS rownum 从T2航站楼出发) WHERE rownum = 1;

insert into enrich 
select ... from distinct_t1 t1 inner join distinct_t2 t2 ... t2.t1_id = t1.id and t2.client_id=t1.client_id

注:

  1. Kafka 原始主题分区为 8,并行度设置为 8 Max
  2. 并行度是默认值(我相信下限是 128)
  3. TaskSlot=1 总共有 8 个 TaskManager 处理正在运行

问题1:我在哪里可以看到任务的关键组分配(我有1个taskSlot)

问题2:我在哪里可以看到正在运行的应用程序中键(数据)到键组的映射?

问题3:加入中使用的列(如t1.client_id,t1.id)成为密钥并随机播放到相同的密钥组?或者主键中定义的列也使用并在源运算符中触发洗牌??

apache-flink flink-sql
1个回答
0
投票

问题 1 和 2 的答案是该信息没有在任何地方呈现。这是您不需要关心的实现细节。

实现的作用是使用此公式计算每个密钥的密钥组

keygroupId = MathUtils.murmurHash(key.hashCode()) % maxParallelism

其中

maxParallelism
是关键组的总数;默认值为 128。

然后使用以下公式将密钥组分配给插槽:

slotId = keygroupId * parallelism / maxParallelism

我不明白问题 3,但也许这些信息足以让您自己弄清楚。

© www.soinside.com 2019 - 2024. All rights reserved.