我是新来的火花流。我试图做从卡夫卡获取数据,并与蜂巢table.i我不知道该怎么办的火花流(而不是结构化数据流)JOIN加入一些练习。这里是我的代码
val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "dofff2.dl.uk.feefr.com:8002",
"security.protocol" -> "SASL_PLAINTEXT",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("csvstream")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val strmk = stream.map(record => (record.value,record.timestamp))
现在我想在蜂巢表中的一个做加盟。在火花结构化数据流,我可以直接调用spark.table(“表NANME”),并做一些加盟,但在火花流我怎么能做到这一点,因为根据其RDD一切。有人能帮我吗 ?
你需要变换。
事情是这样的要求:
val dataset: RDD[String, String] = ... // From Hive
val windowedStream = stream.window(Seconds(20))... // From dStream
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
从手册:
变换操作(及其变型等transformWith一起)允许在DSTREAM施加任意RDD到RDD功能。它可以用来申请未在DSTREAM API暴露任何RDD操作。例如,在与另一数据集的数据流接合每一批的功能不直接露出的DSTREAM的API中。但是,您可以轻松地使用变换来做到这一点。这使得非常强大的可能性。
这样的例子可以在这里找到:How to join a DStream with a non-stream file?
下面的指南帮助:https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html