在kafka ofset mgt期间面临问题的“无法访问包kafka010中的对象分配”

问题描述 投票:0回答:1

环境:Kafka 10,Spark2.1

我正在尝试存储Kafka偏移外部存储。通过Apache Spark网站和一些在线研究后,我能够编写以下代码。现在得到错误 -

"Error:(190, 7) object Assign in package kafka010 cannot be accessed in package org.apache.spark.streaming.kafka010
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)"

我的代码:

import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
           import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
           import org.apache.kafka.common.TopicPartition
           import org.apache.spark.{SparkConf, SparkContext, TaskContext}
           import org.apache.spark.rdd.RDD
           import org.apache.spark.sql.{Row, SparkSession}
           import org.apache.spark.streaming.dstream.InputDStream
           import org.apache.spark.streaming.kafka010._
           import org.apache.spark.streaming.{Seconds, StreamingContext}
           import org.apache.spark.sql.functions._
           import org.apache.spark.sql.types.{StringType, StructField, StructType}

           object offTest {
             def main(args: Array[String]) {

           val Array(impalaHost, brokers, topics, consumerGroupId, ssl, truststoreLocation, truststorePassword, wInterval) = args

               val sparkSession = SparkSession.builder
                 .config("spark.hadoop.parquet.enable.summary-metadata", "false")
                 .enableHiveSupport()
                 .getOrCreate

               val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(wInterval.toInt))

               val isUsingSsl = ssl.toBoolean

               // Create direct kafka stream with brokers and topics
               val topicsSet = topics.split(",").toSet
               val commonParams = Map[String, Object](
                 "bootstrap.servers" -> brokers,
                 "security.protocol" -> (if (isUsingSsl) "SASL_SSL" else "SASL_PLAINTEXT"),
                 "sasl.kerberos.service.name" -> "kafka",
                 "auto.offset.reset" -> "latest",
                 "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
                 "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
                 "group.id" -> consumerGroupId,
                 "enable.auto.commit" -> (false: java.lang.Boolean)
               )

               val additionalSslParams = if (isUsingSsl) {
                 Map(
                   "ssl.truststore.location" -> truststoreLocation,
                   "ssl.truststore.password" -> truststorePassword
                 )
               } else {
                 Map.empty
               }

               val kafkaParams = commonParams ++ additionalSslParams
               val fromOffsets= Map[Object,Long](new TopicPartition(topics, 4) -> 4807048129L)

               val stream = KafkaUtils.createDirectStream[String, String](
                 ssc,
                 LocationStrategies.PreferConsistent,
                 Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
               )
               stream.foreachRDD { rdd =>
                 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
                 rdd.foreachPartition { iter =>
                   val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
                   println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
           // I will insert those values to other database later
                 }
               }    
              val data= stream.map(record => (record.key, record.value))
               data.foreachRDD(rdd1 => {
                 val value = rdd1.map(x => x._2)
                 if (! value.isEmpty()) {
                   value.foreach(println)
                 }
                 else
                 {println("no data")}
               })
               ssc.start()
               ssc.awaitTermination()
             }
           }

错误在以下行 -

分配[String,String](fromOffsets.keys.toList,kafkaParams,fromOffsets)

我做了一些研究,这背后的原因似乎是导入包之间的冲突。但是,我无法解决这个问题。任何形式的帮助或代码样本都将受到高度赞赏。

谢谢Raisha

scala apache-kafka spark-streaming offset
1个回答
0
投票

你需要通过Assign对象创建一个ConsumerStrategies实例,如下所示:

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
© www.soinside.com 2019 - 2024. All rights reserved.