我是ScalaSpark的新手,我有case类的RDD。
case class Info(key1 : String, key2 : String, key3 : String)
我想把RDD[Info]转成RDD[JsString]并保存到ElasticSearch,我使用play.api.libs并定义了写转换器。
implicit val InfoWrites = new Writes[Info]{
def writes(i : Info): JsObject = Json.obj(
"key1" -> i.key1,
"key2" -> i.key2,
"key3" -> i.key3
)
}
然后我定义了隐式类来使用save func。
implicit class Saver(rdd : RDD[Info]) {
def save() : Unit = {
rdd.map{ i => Json.toJson(i).toString }.saveJsonToEs("resource"))
}
}
所以我可以用以下方法保存RDD[Info]
infoRDD.save()
但我一直觉得 "任务不可序列化" 错用 rdd.map()中的Json.toJson()。
我也尝试定义可序列化的对象,就像这样。
object jsonUtils extends Serializable{
def toJsString(i : Info) : String = {
Json.toJson(i).toString()
}
}
rdd.map{ i => jsonUtils.toJsString(i) }
但一直得到错误 "任务不可序列化 "如何改变代码?谢谢你!
我运行了下面的代码,和你的代码类似,对我来说是可行的。
import models.Info
import org.apache.spark.rdd.RDD
import play.api.libs.json.Json
import domain.utils.Implicits._
class CustomFunctions(rdd : RDD[Info]) {
def save() = {
rdd.map(i => Json.toJson(i).toString ).saveAsTextFile("/home/training/so-123")
}
}
写了相应的代码 Implicits
:
package domain.utils
import play.api.libs.json.{JsObject, Json, Writes}
import models.Info
class Implicits {
implicit val InfoWrites = new Writes[Info]{
def writes(i : Info): JsObject = Json.obj(
"key1" -> i.key1,
"key2" -> i.key2,
"key3" -> i.key3
)
}
}
object Implicits extends Implicits
创建模型 Info
:
package models
case class Info(key1 : String, key2 : String, key3 : String)
创建了一个 SparkOperationsDao
组成并创建火花上下文。
package dao
import domain.utils.CustomFunctions
import models.Info
import org.apache.spark.{SparkConf, SparkContext}
class SparkOperationsDao {
val conf:SparkConf = new SparkConf().setAppName("driverTrack").setMaster("local")
val sc = new SparkContext(conf)
def writeToElastic() = {
val sample = List(Info("name1", "city1", "123"), Info("name2", "city2", "234"))
val rdd = sc.parallelize(sample)
val converter = new CustomFunctions(rdd)
converter.save()
}
}
object SparkOperationsDao extends SparkOperationsDao
运行应用程序。
import dao.SparkOperationsDao
object RapidTests extends App {
SparkOperationsDao.writeToElastic()
//.collect.foreach(println)
}