InvalidTopologyException(msg:Component:[x]从不存在的流[y]中订阅

问题描述 投票:0回答:1

我正在尝试从卡夫卡读取数据并使用Storm插入cassandra。我也配置了拓扑,但是遇到了一些问题,我不知道为什么会这样。

这是我的投稿人。

        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout", new KafkaSpout(spoutConfig));
        topologyBuilder.setBolt("checkingbolt", new CheckingBolt("cassandraBoltStream")).shuffleGrouping("spout");
        topologyBuilder.setBolt("cassandrabolt", new CassandraInsertBolt()).shuffleGrouping("checkingbolt"); 

这里,如果我评论最后一行,我看不到任何例外。最后一行,出现以下错误:

InvalidTopologyException(msg:Component: [cassandrabolt] subscribes from non-existent stream: [default] of component [checkingbolt])

有人可以帮我,这是怎么了?

这里是CheckingBolt中的outputFieldDeclarer

public void declareOutputFields(OutputFieldsDeclarer ofd) {
    ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));
}

我在CassandraInsertBolt的clarifyOutputFields方法中没有任何内容,因为该螺栓不发出任何值。

TIA

apache-storm apache-storm-topology
1个回答
0
投票

这里的问题是您要混淆流名称和组件(即喷口/螺栓)名称。组件名称用于引用不同的螺栓,而流名称用于引用从同一螺栓引出的不同流。例如,如果您有一个名为“ evenOrOddBolt”的螺栓,它可能会发出两个流,即“偶​​数”流和“奇数”流。但是,在许多情况下,只有一条流从一个螺栓中流出,这就是为什么Storm有一些使用默认流名称的便捷方法的原因。

[当您执行.shuffleGrouping("checkingbolt")时,您正在使用这些便捷方法之一,有效地说:“我希望此螺栓消耗checkingbolt中的默认流”。如果要显式命名流,则可以使用此方法的重载版本,但是仅当您有多个流来自同一螺栓时,此方法才有用。

[当您执行ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));时,您是在说螺栓将在名为“ cassandraBoltStream”的流上发射。这可能不是您想要执行的操作,您想要声明它将在默认流上发出。您可以使用ofd.declare方法来代替。

有关更多详细信息,请参见the documentation。>>

© www.soinside.com 2019 - 2024. All rights reserved.