如何在Storm上制作同步KafkaSpout

问题描述 投票:0回答:1

[我正在尝试让一个Kafka使用者同时使用来自Kafka的消息。

我遇到的实际问题是消息队列存储在Storm Spout中。

[我正在试图让Storm等待Kafka ACK回复,然后才让Storm使用下一条消息。

我正在使用Storm KafkaSpout:

/**
     * Creates a configured kafka spout.
     * @param topicName Topic where the kafka spout subscribes
     * @return An instance of configured KafkaSpout
     */
    public KafkaSpout getkafkaSpout(String topicName){
        return new KafkaSpout(this.getSpoutConfig(topicName));
    }

    /**
     * Create the necessary configuration to create a new kafka spout.
     * @param topicName Topic where the kafka spout subscribes
     * @return Spout configuration
     */
    public SpoutConfig getSpoutConfig(String topicName) {
        SpoutConfig spoutConfig=new SpoutConfig(this.getZkHosts(),topicName, "", String.join("-",topicName,RandomStringUtils.randomAlphanumeric(20)));
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
        return spoutConfig;
    }



builder.setSpout("kafkaPackedData", stormConfig.getkafkaSpout("topic_kafka"), 2);

我尝试使用另一个Kafka客户端:

//Kafka spout test
            KafkaSpoutRetryService kafkaSpoutRetryService =  new KafkaSpoutRetryExponentialBackoff(
            KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
            KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
            Integer.MAX_VALUE,
            KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));

                //String topic = "testTopic";

                KafkaSpoutConfig spoutConf =  KafkaSpoutConfig.builder("stream1:9092", "kafkaToStormAlarms")
                    .setGroupId(RandomStringUtils.randomAlphanumeric(20))    //Set consumption groupId
                    .setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000") //Set session timeout
                    .setProp(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000") //Set request timeout
                    .setOffsetCommitPeriodMs(10000)    //Set automatic confirmation time (in ms)
                    .setFirstPollOffsetStrategy(EARLIEST)    //Set to pull the latest messages
                    .setRetry(kafkaSpoutRetryService)
                    .build();

builder.setSpout("kafkaPackedAlarms", new KafkaSpout(spoutConf), 1);

但是当我收到有关Kafka主题的消息时,此Spout从不会激活。

apache-kafka apache-storm
1个回答
0
投票

关于节流喷口:是的,您可以通过将拓扑配置中的topology.max.spout.pending选项设置为1来实现。如果您想要具有良好的吞吐量,我不会真的推荐它,但是我认为您已经仔细考虑了为什么需要拓扑以这种方式运行。

关于新喷嘴:stream1:9092服务器正在运行Kafka,kafkaToStormAlarms是您要发送到的主题吗?如果没有,那可能是您的问题。否则,请检查storm/logs/workers-artifacts中的工作人员日志,它可能会告诉您为什么喷口没有发出任何东西。

最后,是的,您绝对应该使用storm-kafka-client而不是storm-kafka,否则您将无法升级到Storm 2.0.0或最新的Kafka版本。

© www.soinside.com 2019 - 2024. All rights reserved.