为什么我的 Kafka Streams 拓扑无法正确重放/重新处理?

问题描述 投票:0回答:2

我的拓扑如下所示:

KTable<ByteString, User> users = topology.table(USERS);

KStream<ByteString, JoinRequest> joinRequests = topology.stream(JOIN_REQUESTS)
    .mapValues(entityTopologyProcessor::userNew)
    .to(USERS);

topology.stream(SETTINGS_CONFIRM_REQUESTS)
    .join(users, entityTopologyProcessor::userSettingsConfirm)
    .to(USERS);

topology.stream(SETTINGS_UPDATE_REQUESTS)
    .join(users, entityTopologyProcessor::userSettingsUpdate)
    .to(USERS);

在运行时,此拓扑工作正常。用户是根据加入请求创建的。他们通过设置确认请求来确认其设置。他们通过设置更新请求来更新其设置。

但是,重新处理该拓扑并不会产生原始结果。具体来说,设置更新连接器看不到由设置确认连接器产生的用户,即使就时间戳而言,从创建用户的时间到用户被确认的时间到用户更新的时间已经过去了很多秒他们的设置。

我很茫然。我尝试关闭用户表上的缓存/日志记录。不知道该怎么做才能正确进行此重新处理。

apache-kafka apache-kafka-streams
2个回答
2
投票

更新

多年来,我们投入了大量精力来使 Kafka Streams 更具确定性。

Kafka Streams 2.1 添加了

max.task.idle.ms
配置来帮助“同步”从不同输入主题的读取(KIP-353),这在 3.0 版本中得到了进一步改进(KIP-695

特别是对于流表连接,版本化状态存储(在 3.5 中添加:KIP-889)现在可以利用流表连接“宽限期”(在 3.6 中添加:KIP-923)来提供确定性且时间准确的连接结果。

原答案

KStream-KTable 连接不是 100% 确定性的(并且可能永远不会成为 100% 确定性的)。我们意识到问题并讨论解决方案,至少可以缓解问题。

一个问题是,如果消费者从代理获取数据,我们无法轻松控制代理返回哪些主题和/或分区的数据。根据我们从经纪商接收数据的顺序,结果可能会略有不同。

一个相关问题:https://issues.apache.org/jira/browse/KAFKA-3514

这篇博文也可能有帮助:https://www.confluence.io/blog/crossing-streams-joins-apache-kafka/


0
投票

我能够通过将问题中的代码替换为以下内容来部分解决我的问题:

KTable<ByteString, User> users = topology.table(JOIN_REQUESTS)
    .mapValue(entityTopologyProcessor::user)
    .leftJoin(topology
                 .stream(CONFIRM_SETTINGS_REQUESTS)
                 .groupByKey()
                 .reduce((a, b) -> b),
              entityTopologyProcessor::confirmSettings)
    .leftJoin(topology
                 .stream(SETTINGS_UPDATE_REQUESTS)
                 .groupByKey()
                 .reduce(entityTopologyProcessor::settingsUpdateReduce),
              entityTopologyProcessor::settingsUpdate);

该解决方案利用了所有表-表连接都是确定性的这一事实。在重新处理期间,结果状态可能暂时不正确,但是一旦拓扑被捕获,最终值就是正确的(给定结果的最终时间戳仍然不确定)。一般来说,这种方法将给定实体(本例中:用户)的所有事件(本例中:加入请求、确认设置请求、设置更新请求)分组到单个任务中,并将它们的累积连接到单个产品中。通过在末尾加入另一个流来清空结果,可以使用删除事件来扩展此示例。

除了这种方法之外,通常,编写可重新处理的拓扑需要从两个维度考虑拓扑:实时和重新处理时间。从 Kafka Streams 1.0.0 开始,这对于开发人员来说是一门艺术。

© www.soinside.com 2019 - 2024. All rights reserved.