无法在Kafka流中加入相同的主题

问题描述 投票:1回答:1

我最近在流应用程序中遇到了一个我以前从未遇到过的问题,并且很难找到与键控/联接(以及在更新,分区之后)相关的问题。

我有两个主题(raw_events和processed_users),它们的键都相同,但是当我尝试对两个主题执行连接时,尽管键相同,但只有some个连接成功。

工作流(用于上下文)

为简洁起见,应用程序的基本工作流程如下:

  1. 数据通过生产者流入raw_event主题。
  2. 一系列流应用程序侦听raw_event主题,并根据一系列业务规则(例如IP地址,用户等)从中提取各种实体]
  3. raw_event主题中标识的实体放入preprocessing_{type}主题中,其中包含有关raw_event中找到的提取和相关信息的元数据(例如,对于用户来说,这可能是诸如帐户名, -邮件等)。 这些主题中的项目由raw_event键。
  4. 另一系列的流应用程序将收听各种preprocessing_{type}主题,并使其与一系列GlobalKTables保持连接,这些GlobalKTables代表该给定实体final_{type}的所有已知实例。对于成功的联接,将使用来自final_{type}主题的新信息来丰富来自raw_event/preprocessing_{type}的实例。不成功的连接将指示给定类型的新实体,然后将其键入关键字并置于final_{type}主题中。 preprocessing_{type}的所有丰富实例都插入到processing_{type}主题中,该主题包含实体的丰富(或新)实例以及创建它的元数据。最重要的是-processed_{type}主题中的项目由raw_event静止键输入。
  5. [最后,流应用程序运行,并尝试通过与raw_event进行连接来充实processing_{type}的原始实例,该实例将被设置为相同的键,并使用来自充实实体的各种信息来充实raw_event实例。将其推送到final_event主题。

问题

问题本身是在上面的步骤5(事件增强)中引起的,因为raw_event主题和processing_users主题之间的某些联接仅按预期工作。

使用通过整个管道的24条记录的子集,主题中的24对中只有5条成功加入了。可行的方案似乎是相同的一致方案,但是我在数据中看不到任何东西可以表明为什么一个可行而另一个却不可行:

raw_event keys          processing_user keys
mawjuG0B9k3AiALz0_2S    0q0juG0B9k3AiALz8ApP 
xEEcv20B9k3AiALzEN0m    m60juG0B9k3AiALz5gU5 
zqwjuG0B9k3AiALzz_tg    ua0juG0B9k3AiALz7wqa 
v60juG0B9k3AiALz6Aal    xEEcv20B9k3AiALzEN0m 
0q0juG0B9k3AiALz8ApP    zqwjuG0B9k3AiALzz_tg 
RK0juG0B9k3AiALz5QUw    zK0juG0B9k3AiALz6Aal
0a0juG0B9k3AiALz6Aal    Ta0juG0B9k3AiALz5QUw 
8KwjuG0B9k3AiALz1v58    RKwjuG0B9k3AiALz1P7C 
c60juG0B9k3AiALz5gU4    -60juG0B9k3AiALz3gGn 
RKwjuG0B9k3AiALz1P7C    Va0juG0B9k3AiALz5QUw 
zK0juG0B9k3AiALz6Aal    560juG0B9k3AiALz3QGh 
Ta0juG0B9k3AiALz5QUw    mawjuG0B9k3AiALz0_2S 
Va0juG0B9k3AiALz5QUw    -K0juG0B9k3AiALz3QGh 
pK0juG0B9k3AiALz5gU5    zq0juG0B9k3AiALz6Aal 
Xa0juG0B9k3AiALz2QCh    RK0juG0B9k3AiALz5QUw 
560juG0B9k3AiALz3QGh    v60juG0B9k3AiALz6Aal 
-K0juG0B9k3AiALz3QGh    Xa0juG0B9k3AiALz2QCh 
-60juG0B9k3AiALz3gGn    P60juG0B9k3AiALz5QUw 
F60juG0B9k3AiALz3gKn    pK0juG0B9k3AiALz5gU5 
m60juG0B9k3AiALz5gU5    0a0juG0B9k3AiALz6Aal 
zq0juG0B9k3AiALz6Aal    3K0juG0B9k3AiALz3QGh 
ua0juG0B9k3AiALz7wqa    8KwjuG0B9k3AiALz1v58 
3K0juG0B9k3AiALz3QGh    F60juG0B9k3AiALz3gKn 
P60juG0B9k3AiALz5QUw    c60juG0B9k3AiALz5gU4 

我已经尝试过将主题同时作为KStreams和KTables进行组合(以及我能想到的每种组合),但是在这个小子集中的24条消息中,只有大约5种联接是成功的。

当前代码的当前示例(略微简化):

val events = streams.createKTable<RawEvent>("raw_events)
val users = streams.createKStream<ProcessingUser>("processing_users)

val finalEvents = events
    .join(users, eventsProcessor::enrichWithUsers)
    .to("final_events")

鉴于raw_eventsprocessing_users主题中有对应的对(1:1),是否有任何解释说明为什么某些联接会成功而另一些联接会失败?只有5对将始终进入final_events主题(总是相同的对)。

欢迎任何其他建议!

配置

为了详细起见,以下是有关设置的一些注意事项:

  • 使用Kafka Streams 2.3.0
  • 分别为所有适用的物化呼叫启用/禁用缓存和日志记录
  • 启用拓扑优化
  • 缓存缓冲设置为0

更新

花费了几个小时仔细研究并挖掘数据之后,简而言之,该问题似乎与分区有关。

一直以来成功的五个联接似乎只是这样做,因为每个主题的键都位于相同的分区上:

successful events       raw_events partition  processing_users partition
RK0juG0B9k3AiALz5QUw    3                     3
m60juG0B9k3AiALz5gU5    7                     7
ua0juG0B9k3AiALz7wqa    7                     7
8KwjuG0B9k3AiALz1v58    8                     8
RKwjuG0B9k3AiALz1P7C    9                     9

尽管所有密钥都出现在两个主题中,但它们似乎没有使用相同的策略进行分区(即,两个主题contain所有具有相同密钥的消息,但是有些消息可能会出现在一个分区中raw_events,但processing_users中有一个不同的分区,如以下此分区/计数表示所示:

enter image description here

值得强调的是,出现在raw_events主题中的消息是在上述流应用程序工作流之外生成的,这使我相信需要回答以下问题:

  • 是否有可能使分区策略的责任仅落在流工作流的入口点上,前提是它会导致跨分区的标准化分布? (例如,如果给定键位于raw_events的分区7中,并且您向preprocessing_users发送了具有相同键的记录,则该记录将属于分区7?
  • 如果是这样,这是一个合理的策略吗?还是有一种方法可以在不编写供所有生产者和流应用程序使用的自定义分区的情况下强制执行此行为?
  • 如果没有,是否可以采用现有主题(在这种情况下为raw_event,并且基本上将整个主题重新分区,以便使用默认的分区策略?] >>

我最近在流应用程序中遇到了一个我以前从未遇到过的问题,并且很难找到与键控/联接(在更新,分区之后...)相关的问题...

kotlin apache-kafka apache-kafka-streams
1个回答
0
投票
如原始文章的更新中所详述,问题本身是.NET Producer应用程序之间的分区策略差异的结果,默认情况下,该应用程序使用consistent_random分区策略,而不是使用默认的Java Streams应用程序。使用murmur2random策略。
© www.soinside.com 2019 - 2024. All rights reserved.