Transfer Scala case class to JsValue in rdd.map func but Task not serializable Transfer Scala case class to JsValue in rdd.map func but Task not serializable elasticsearch elasticsearch

Transfer Scala case class to JsValue in rdd.map func but Task not serializable


I ran the below code, similar to your code and it works for me:

import models.Infoimport org.apache.spark.rdd.RDDimport play.api.libs.json.Jsonimport domain.utils.Implicits._class CustomFunctions(rdd : RDD[Info]) {  def save() = {    rdd.map(i => Json.toJson(i).toString ).saveAsTextFile("/home/training/so-123")  }}

Wrote the corresponding Implicits:

package domain.utilsimport play.api.libs.json.{JsObject, Json, Writes}import models.Infoclass Implicits {  implicit val InfoWrites = new Writes[Info]{    def writes(i : Info): JsObject = Json.obj(      "key1" -> i.key1,      "key2" -> i.key2,      "key3" -> i.key3    )  }}object Implicits extends  Implicits

Created the model Info:

package modelscase class Info(key1 : String, key2 : String, key3 : String)

Created a SparkOperationsDao to compose and create spark context:

package daoimport domain.utils.CustomFunctionsimport models.Infoimport org.apache.spark.{SparkConf, SparkContext}class SparkOperationsDao {  val conf:SparkConf = new SparkConf().setAppName("driverTrack").setMaster("local")  val sc = new SparkContext(conf)  def writeToElastic() = {    val sample = List(Info("name1", "city1", "123"), Info("name2", "city2", "234"))    val rdd = sc.parallelize(sample)    val converter = new CustomFunctions(rdd)    converter.save()  }}object SparkOperationsDao extends SparkOperationsDao

Run the App:

import dao.SparkOperationsDaoobject RapidTests extends App {  SparkOperationsDao.writeToElastic()    //.collect.foreach(println)}