Use Apache Spark efficiently to push data to elasticsearch Use Apache Spark efficiently to push data to elasticsearch elasticsearch elasticsearch

Use Apache Spark efficiently to push data to elasticsearch


Not a complete answer but still a bit long for a comment. There are a few tips I would like to suggest.

It's not clear but I assume your worry hear is the execution time. As suggested in the comments you can improve the performance by adding more nodes/executors to the cluster. If the gzip file is loaded without partitioning in spark, then you should split it to a reasonable size. (Not too small - This will make the processing slow. Not too big - executors will run OOM).

parquet is a good file format when working with Spark. If you can convert your XML to parquet. It's super compressed and lightweight.

Reading on your comments, coalesce does not do a full shuffle. The coalesce algorithm changes the number of nodes by moving data from some partitions to existing partitions. This algorithm obviously cannot increase the number of partitions. Use repartition instead. The operation is costly but it can increase the number of partitions. Check this for more facts: https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4


Here are some of the tips from my side.

Read the data in parquet format or any format. Re-partition it as per your need. Data conversion may consume time so read it in spark and then process it. Try to create map and format data before starting load. This would help easy debugging in case of complex map.

  val spark = SparkSession    .builder()    .appName("PushToES")    .enableHiveSupport()    .getOrCreate()val batchSizeInMB=4; // change it as you needval batchRetryCount= 3val batchWriteRetryWait = 10val batchEntries= 10val enableSSL = trueval wanOnly = trueval enableIdempotentInserts = trueval esNodes = [yourNode1, yourNode2, yourNode3]var esConfig = Map[String, String]()esConfig = esConfig + ("es.node"-> esNodes.mkString)(","))esConfig = esConfig + ("es.port"->port.toString())esConfig = esConfig + ("es.batch.size.bytes"->(batchSizeInMB*1024*1024).toString())esConfig = esConfig + ("es.batch.size.entries"->batchEntries.toString())esConfig = esConfig + ("es.batch.write.retry.count"->batchRetryCount.toString())esConfig = esConfig + ("es.batch.write.retry.wait"->batchWriteRetryWait.toString())esConfig = esConfig + ("es.batch.write.refresh"->"false")if(enableSSL){esConfig = esConfig + ("es.net.ssl"->"true")esConfig = esConfig + ("es.net.ssl.keystore.location"->"identity.jks")esConfig = esConfig + ("es.net.ssl.cert.allow.self.signed"->"true")}if (wanOnly){esConfig = esConfig + ("es.nodes.wan.only"->"true")}// This helps if some task fails , so data won't be dublicateif(enableIdempotentInserts){  esConfig = esConfig + ("es.mapping.id" ->"your_primary_key_column")}val df = "suppose you created it using parquet format or any format"

Actually data is inserted at executor level and not at driver leveltry giving only 2-4 core to each executor so that not so many connections are open at same time.You can vary document size or entries as per your ease. Please read about them.

write data in chunks this would help you in loading large dataset in futureand try creating index map before loading data. And prefer little nested data as you have that functionality in ESI mean try to keep some primary key in your data.

val dfToInsert = df.withColumn("salt", ceil(rand())*10).cast("Int").persist()for (i<-0 to 10){val start = System.currentTimeMillisval finalDF = dfToInsert.filter($"salt"===i)val counts = finalDF.count()println(s"count of record in chunk $i -> $counts")finalDF.drop("salt").saveToES("indexName",esConfig)val totalTime = System.currentTimeMillis - startprintln(s"ended Loading data for chunk $i. Total time taken in Seconds : ${totalTime/1000}")}

Try to give some alias to your final DF and update that in each run. As you would not like to disturb your production serverat time of load

Memory

This can not be generic. But just to give you a kick start

keep 10-40 executor as per your data size or budget. keep eachexecutor 8-16gb size and 5 gb overhead. (This can vary as yourdocument can be large or small in size). If needed keep maxResultSize 8gb.Driver can have 5 cores and 30 g ram

Important Things.

  • You need to keep config in variable as you can change it as per Index

  • Insertion happens on executor not on driver, So try to keep lesserconnection while writing. Each core would open one connection.

  • document insertion can be with batch entry size or document size.Change it as per your learning while doing multiple runs.

  • Try to make your solution robust. It should be able to handle all size data.Reading and writing both can be tuned but try to format your data asper document map before starting load. This would help in easydebugging, If data document is little complex and nested.

  • Memory of spark-submit can also be tuned as per your learning while runningjobs. Just try to look at insertion time by varying memory and batchsize.

  • Most important thing is design. If you are using ES than createyour map while keeping end queries and requirement in mind.