如何做到从卡夫卡的火花流流数据JOIN

问题描述 投票:0回答:1

我是新来的火花流。我试图做从卡夫卡获取数据,并与蜂巢table.i我不知道该怎么办的火花流(而不是结构化数据流)JOIN加入一些练习。这里是我的代码

   val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))   

   val kafkaParams = Map[String, Object](
   "bootstrap.servers" -> "dofff2.dl.uk.feefr.com:8002",
   "security.protocol" -> "SASL_PLAINTEXT",
   "key.deserializer" -> classOf[StringDeserializer],
   "value.deserializer" -> classOf[StringDeserializer],
   "group.id" -> "1",
   "auto.offset.reset" -> "latest",
   "enable.auto.commit" -> (false: java.lang.Boolean)
   )

   val topics = Array("csvstream")
   val stream = KafkaUtils.createDirectStream[String, String](
   ssc,
   PreferConsistent,
   Subscribe[String, String](topics, kafkaParams)
   )

   val strmk = stream.map(record => (record.value,record.timestamp))

现在我想在蜂巢表中的一个做加盟。在火花结构化数据流,我可以直接调用spark.table(“表NANME”),并做一些加盟,但在火花流我怎么能做到这一点,因为根据其RDD一切。有人能帮我吗 ?

apache-spark spark-streaming
1个回答
0
投票

你需要变换。

事情是这样的要求:

val dataset: RDD[String, String] = ... // From Hive
val windowedStream = stream.window(Seconds(20))... // From dStream
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

从手册:

变换操作(及其变型等transformWith一起)允许在DSTREAM施加任意RDD到RDD功能。它可以用来申请未在DSTREAM API暴露任何RDD操作。例如,在与另一数据集的数据流接合每一批的功能不直接露出的DSTREAM的API中。但是,您可以轻松地使用变换来做到这一点。这使得非常强大的可能性。

这样的例子可以在这里找到:How to join a DStream with a non-stream file?

下面的指南帮助:https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html

© www.soinside.com 2019 - 2024. All rights reserved.