kafka如何使用Scala将参数传递给消费者

问题描述 投票:1回答:1

我需要将参数传递给我的使用者,如果我写1则我的组ID为true,但我不知道如何传递参数。我将Intelliji配置为接受参数,但是我不知道如何传递它

Consumer.scala

package utils

import java.util._

import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}

import scala.collection.JavaConverters._

object Consumer {

import java.util.Properties

val TOPIC="topic1"

val  props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.GROUP_ID_CONFIG,"_")


def main(args: Array[String]): Unit = {

if(args(0) == 1){

  props.put(ConsumerConfig.GROUP_ID_CONFIG, "true")

}else if (args(0) == 2){
  props.put(ConsumerConfig.GROUP_ID_CONFIG, false)

}else{
  println("--------")
}

val consumer = new KafkaConsumer[String, String](props)

consumer.subscribe(util.Collections.singletonList(TOPIC))

while (true) {
  val records = consumer.poll(100)
  for (record <- records.asScala) {

    println(record)
    }
  }
}
}

producer.scala

package kafka

import org.apache.log4j.Logger

object KafkaLogger {

val logger = Logger.getLogger(this.getClass.getName)

def main(args: Array[String]): Unit = {

    logger.info("---------Info message---------")

}
}

我启动我的生产者而不是消费者,但是我没有控制台可以传递--option = 1或--option = 2

scala apache-kafka producer-consumer
1个回答
0
投票

argsArray[String],它永远不会保留12

您需要比较== "1"== "2"Integer.parseInt(args(0))

在IntelliJ中,您为VM参数创建运行配置,并且您的代码未解析任何内容,因此您将不会键入--option。为此,您需要一个CLI解析器,或编写args(0) == "--option=1"),我想这会起作用


要解决字符串问题,请使用class + def yourMethod(option: Int)而不是object + main(args: Array[String])


此外,org.log4j具有安全缺陷,应改为使用slf4j

© www.soinside.com 2019 - 2024. All rights reserved.