[我正在尝试让一个Kafka使用者同时使用来自Kafka的消息。
我遇到的实际问题是消息队列存储在Storm Spout中。
[我正在试图让Storm等待Kafka ACK回复,然后才让Storm使用下一条消息。
我正在使用Storm KafkaSpout:
/**
* Creates a configured kafka spout.
* @param topicName Topic where the kafka spout subscribes
* @return An instance of configured KafkaSpout
*/
public KafkaSpout getkafkaSpout(String topicName){
return new KafkaSpout(this.getSpoutConfig(topicName));
}
/**
* Create the necessary configuration to create a new kafka spout.
* @param topicName Topic where the kafka spout subscribes
* @return Spout configuration
*/
public SpoutConfig getSpoutConfig(String topicName) {
SpoutConfig spoutConfig=new SpoutConfig(this.getZkHosts(),topicName, "", String.join("-",topicName,RandomStringUtils.randomAlphanumeric(20)));
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
return spoutConfig;
}
builder.setSpout("kafkaPackedData", stormConfig.getkafkaSpout("topic_kafka"), 2);
我尝试使用另一个Kafka客户端:
//Kafka spout test
KafkaSpoutRetryService kafkaSpoutRetryService = new KafkaSpoutRetryExponentialBackoff(
KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
Integer.MAX_VALUE,
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
//String topic = "testTopic";
KafkaSpoutConfig spoutConf = KafkaSpoutConfig.builder("stream1:9092", "kafkaToStormAlarms")
.setGroupId(RandomStringUtils.randomAlphanumeric(20)) //Set consumption groupId
.setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000") //Set session timeout
.setProp(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000") //Set request timeout
.setOffsetCommitPeriodMs(10000) //Set automatic confirmation time (in ms)
.setFirstPollOffsetStrategy(EARLIEST) //Set to pull the latest messages
.setRetry(kafkaSpoutRetryService)
.build();
builder.setSpout("kafkaPackedAlarms", new KafkaSpout(spoutConf), 1);
但是当我收到有关Kafka主题的消息时,此Spout从不会激活。
关于节流喷口:是的,您可以通过将拓扑配置中的topology.max.spout.pending
选项设置为1
来实现。如果您想要具有良好的吞吐量,我不会真的推荐它,但是我认为您已经仔细考虑了为什么需要拓扑以这种方式运行。
关于新喷嘴:stream1:9092
服务器正在运行Kafka,kafkaToStormAlarms
是您要发送到的主题吗?如果没有,那可能是您的问题。否则,请检查storm/logs/workers-artifacts
中的工作人员日志,它可能会告诉您为什么喷口没有发出任何东西。
最后,是的,您绝对应该使用storm-kafka-client
而不是storm-kafka
,否则您将无法升级到Storm 2.0.0或最新的Kafka版本。