风暴螺栓跟随kafka螺栓

问题描述 投票:0回答:1

我有一个Storm拓扑,我必须将输出发送到kafka以及更新redis中的值。为此,我有一个Kafkabolt以及一个RedisBolt。以下是我的拓扑结构 -

tp.setSpout("kafkaSpout", kafkaSpout, 3);

tp.setBolt("EvaluatorBolt", evaluatorBolt, 6).shuffleGrouping("kafkaStream");

tp.setBolt("ResultToRedisBolt",ResultsToRedisBolt,3).shuffleGrouping("EvaluatorBolt","ResultStream");

tp.setBolt("ResultToKafkaBolt", ResultsToKafkaBolt, 3).shuffleGrouping("EvaluatorBolt","ResultStream");

问题是两个结束螺栓(Redis和Kafka)都在监听前一个螺栓(ResultStream)中的相同流,因此两者都可以独立失败。我真正需要的是,如果结果在Kafka中成功发布,那么我只更新Redis中的值。有没有办法从kafkaBolt获得输出流,我可以将消息成功发布到Kafka?然后,我可以在我的RedisBolt中收听该流并采取相应的行动。

apache-storm
1个回答
1
投票

除非您修改螺栓代码,否则目前无法实现。你可能会稍微改变你的设计,因为在将元组写入Kafka之后进行额外的处理有一些缺点。如果您将元组写入Kafka并且您未能写入Redis,您将在Kafka中获得重复项,因为处理将在喷口处重新开始。

根据您的使用情况,将结果写入Kafka可能会更好,然后让另一个拓扑从Kafka读取结果并写入Redis。

如果你仍然需要能够从螺栓中发出新的元组,它应该很容易实现。 bolt最近能够添加自定义Producer回调,因此我们可以扩展该机制。

有关上下文,请参阅https://github.com/apache/storm/pull/2790#issuecomment-411709331的讨论。

© www.soinside.com 2019 - 2024. All rights reserved.