ZKStringSerializer的标量包问题

问题描述 投票:1回答:1

我正在尝试使用与之配套的ZKStringSerializer类

import kafka.utils.ZKStringSerializer

根据整个互联网,甚至是我自己的代码,在我通过计算机重新启动之前,这应该可以使我的代码正常工作。但是,我现在得到了一个令人难以置信的令人困惑的编译错误,

object ZKStringSerializer in package utils cannot be accessed in package kafka.utils

这令人困惑,因为该文件不应放在任何包中,并且我没有在任何地方指定包。这是我的代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.types._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.ZkConnection
import java.util.Properties

import org.apache.kafka.clients.admin
import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}


object SpeedTester {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder.master("local[4]").appName("SpeedTester").config("spark.driver.memory", "8g").getOrCreate()
    val rootLogger = Logger.getRootLogger()
    rootLogger.setLevel(Level.ERROR)
    import spark.implicits._
    val zookeeperConnect = "localhost:2181"
    val sessionTimeoutMs = 10000
    val connectionTimeoutMs = 10000
    val zkClient = new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer)
    val topicName = "testTopic"
    val numPartitions = 8
    val replicationFactor = 1
    val topicConfig = new Properties
    val isSecureKafkaCluster = false
    val zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster)
    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig)

    // Create producer for topic testTopic and actually push values to the topic
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9592")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
    val TOPIC = "testTopic"
    for (i <- 1 to 50) {
      val record = new ProducerRecord(TOPIC, "key", s"hello $i")
      producer.send(record)
    }

    val record = new ProducerRecord(TOPIC, "key", "the end" + new java.util.Date)
    producer.send(record)
    producer.flush()
    producer.close()
  }
}
scala apache-kafka packages
1个回答
0
投票
我知道这为时已晚,但对于其他将寻找相同问题的人-
© www.soinside.com 2019 - 2024. All rights reserved.