我最近在流应用程序中遇到了一个我以前从未遇到过的问题,并且很难找到与键控/联接(以及在更新,分区之后)相关的问题。
我有两个主题(raw_events和processed_users),它们的键都相同,但是当我尝试对两个主题执行连接时,尽管键相同,但只有some个连接成功。
为简洁起见,应用程序的基本工作流程如下:
raw_event
主题。raw_event
主题,并根据一系列业务规则(例如IP地址,用户等)从中提取各种实体]raw_event
主题中标识的实体放入preprocessing_{type}
主题中,其中包含有关raw_event
中找到的提取和相关信息的元数据(例如,对于用户来说,这可能是诸如帐户名, -邮件等)。 这些主题中的项目由raw_event
键。preprocessing_{type}
主题,并使其与一系列GlobalKTables保持连接,这些GlobalKTables代表该给定实体final_{type}
的所有已知实例。对于成功的联接,将使用来自final_{type}
主题的新信息来丰富来自raw_event/preprocessing_{type}
的实例。不成功的连接将指示给定类型的新实体,然后将其键入关键字并置于final_{type}
主题中。 preprocessing_{type}
的所有丰富实例都插入到processing_{type}
主题中,该主题包含实体的丰富(或新)实例以及创建它的元数据。最重要的是-processed_{type}
主题中的项目由raw_event
静止键输入。raw_event
进行连接来充实processing_{type}
的原始实例,该实例将被设置为相同的键,并使用来自充实实体的各种信息来充实raw_event
实例。将其推送到final_event
主题。问题本身是在上面的步骤5(事件增强)中引起的,因为raw_event
主题和processing_users
主题之间的某些联接仅按预期工作。
使用通过整个管道的24条记录的子集,主题中的24对中只有5条成功加入了。可行的方案似乎是相同的一致方案,但是我在数据中看不到任何东西可以表明为什么一个可行而另一个却不可行:
raw_event keys processing_user keys
mawjuG0B9k3AiALz0_2S 0q0juG0B9k3AiALz8ApP
xEEcv20B9k3AiALzEN0m m60juG0B9k3AiALz5gU5
zqwjuG0B9k3AiALzz_tg ua0juG0B9k3AiALz7wqa
v60juG0B9k3AiALz6Aal xEEcv20B9k3AiALzEN0m
0q0juG0B9k3AiALz8ApP zqwjuG0B9k3AiALzz_tg
RK0juG0B9k3AiALz5QUw zK0juG0B9k3AiALz6Aal
0a0juG0B9k3AiALz6Aal Ta0juG0B9k3AiALz5QUw
8KwjuG0B9k3AiALz1v58 RKwjuG0B9k3AiALz1P7C
c60juG0B9k3AiALz5gU4 -60juG0B9k3AiALz3gGn
RKwjuG0B9k3AiALz1P7C Va0juG0B9k3AiALz5QUw
zK0juG0B9k3AiALz6Aal 560juG0B9k3AiALz3QGh
Ta0juG0B9k3AiALz5QUw mawjuG0B9k3AiALz0_2S
Va0juG0B9k3AiALz5QUw -K0juG0B9k3AiALz3QGh
pK0juG0B9k3AiALz5gU5 zq0juG0B9k3AiALz6Aal
Xa0juG0B9k3AiALz2QCh RK0juG0B9k3AiALz5QUw
560juG0B9k3AiALz3QGh v60juG0B9k3AiALz6Aal
-K0juG0B9k3AiALz3QGh Xa0juG0B9k3AiALz2QCh
-60juG0B9k3AiALz3gGn P60juG0B9k3AiALz5QUw
F60juG0B9k3AiALz3gKn pK0juG0B9k3AiALz5gU5
m60juG0B9k3AiALz5gU5 0a0juG0B9k3AiALz6Aal
zq0juG0B9k3AiALz6Aal 3K0juG0B9k3AiALz3QGh
ua0juG0B9k3AiALz7wqa 8KwjuG0B9k3AiALz1v58
3K0juG0B9k3AiALz3QGh F60juG0B9k3AiALz3gKn
P60juG0B9k3AiALz5QUw c60juG0B9k3AiALz5gU4
我已经尝试过将主题同时作为KStreams和KTables进行组合(以及我能想到的每种组合),但是在这个小子集中的24条消息中,只有大约5种联接是成功的。
当前代码的当前示例(略微简化):
val events = streams.createKTable<RawEvent>("raw_events)
val users = streams.createKStream<ProcessingUser>("processing_users)
val finalEvents = events
.join(users, eventsProcessor::enrichWithUsers)
.to("final_events")
鉴于raw_events
和processing_users
主题中有对应的对(1:1),是否有任何解释说明为什么某些联接会成功而另一些联接会失败?只有5对将始终进入final_events
主题(总是相同的对)。
欢迎任何其他建议!
为了详细起见,以下是有关设置的一些注意事项:
花费了几个小时仔细研究并挖掘数据之后,简而言之,该问题似乎与分区有关。
一直以来成功的五个联接似乎只是这样做,因为每个主题的键都位于相同的分区上:
successful events raw_events partition processing_users partition
RK0juG0B9k3AiALz5QUw 3 3
m60juG0B9k3AiALz5gU5 7 7
ua0juG0B9k3AiALz7wqa 7 7
8KwjuG0B9k3AiALz1v58 8 8
RKwjuG0B9k3AiALz1P7C 9 9
尽管所有密钥都出现在两个主题中,但它们似乎没有使用相同的策略进行分区(即,两个主题contain所有具有相同密钥的消息,但是有些消息可能会出现在一个分区中raw_events
,但processing_users
中有一个不同的分区,如以下此分区/计数表示所示:
值得强调的是,出现在raw_events
主题中的消息是在上述流应用程序工作流之外生成的,这使我相信需要回答以下问题:
raw_events
的分区7中,并且您向preprocessing_users
发送了具有相同键的记录,则该记录将属于分区7?raw_event
,并且基本上将整个主题重新分区,以便使用默认的分区策略?] >>我最近在流应用程序中遇到了一个我以前从未遇到过的问题,并且很难找到与键控/联接(在更新,分区之后...)相关的问题...
consistent_random
分区策略,而不是使用默认的Java Streams应用程序。使用murmur2random
策略。