在rdd.map func中把Scala case类转移到JsValue,但Task不可序列化。

问题描述 投票:1回答:1

我是ScalaSpark的新手,我有case类的RDD。

case class Info(key1 : String, key2 : String, key3 : String)

我想把RDD[Info]转成RDD[JsString]并保存到ElasticSearch,我使用play.api.libs并定义了写转换器。

implicit val InfoWrites = new Writes[Info]{
   def writes(i : Info): JsObject = Json.obj( 
     "key1" -> i.key1,
     "key2" -> i.key2,
     "key3" -> i.key3
   )
}

然后我定义了隐式类来使用save func。

implicit class Saver(rdd : RDD[Info]) {
    def save() : Unit = {
       rdd.map{ i => Json.toJson(i).toString }.saveJsonToEs("resource"))
    }
}

所以我可以用以下方法保存RDD[Info]

infoRDD.save()

但我一直觉得 "任务不可序列化" 错用 rdd.map()中的Json.toJson()。

我也尝试定义可序列化的对象,就像这样。

object jsonUtils extends Serializable{
   def toJsString(i : Info) : String = {
       Json.toJson(i).toString()
   }
}
rdd.map{ i => jsonUtils.toJsString(i) }  

但一直得到错误 "任务不可序列化 "如何改变代码?谢谢你!

scala apache-spark elasticsearch rdd
1个回答
2
投票

我运行了下面的代码,和你的代码类似,对我来说是可行的。

import models.Info
import org.apache.spark.rdd.RDD
import play.api.libs.json.Json
import domain.utils.Implicits._

class CustomFunctions(rdd : RDD[Info]) {
  def save() = {
    rdd.map(i => Json.toJson(i).toString ).saveAsTextFile("/home/training/so-123")
  }

}

写了相应的代码 Implicits:

package domain.utils

import play.api.libs.json.{JsObject, Json, Writes}
import models.Info

class Implicits {
  implicit val InfoWrites = new Writes[Info]{
    def writes(i : Info): JsObject = Json.obj(
      "key1" -> i.key1,
      "key2" -> i.key2,
      "key3" -> i.key3
    )
  }

}

object Implicits extends  Implicits

创建模型 Info:

package models

case class Info(key1 : String, key2 : String, key3 : String)

创建了一个 SparkOperationsDao 组成并创建火花上下文。

package dao

import domain.utils.CustomFunctions
import models.Info
import org.apache.spark.{SparkConf, SparkContext}


class SparkOperationsDao {
  val conf:SparkConf = new SparkConf().setAppName("driverTrack").setMaster("local")
  val sc = new SparkContext(conf)

  def writeToElastic() = {
    val sample = List(Info("name1", "city1", "123"), Info("name2", "city2", "234"))
    val rdd = sc.parallelize(sample)
    val converter = new CustomFunctions(rdd)
    converter.save()
  }

}

object SparkOperationsDao extends SparkOperationsDao

运行应用程序。

import dao.SparkOperationsDao

object RapidTests extends App {
  SparkOperationsDao.writeToElastic()
    //.collect.foreach(println)

}
© www.soinside.com 2019 - 2024. All rights reserved.