How to write data in Elasticsearch from Pyspark? How to write data in Elasticsearch from Pyspark? elasticsearch elasticsearch

How to write data in Elasticsearch from Pyspark?


I was having a similar issue and here's how I managed to solve it. First I used a dataframe vs using a RDD.

Once in a dataframe

from pyspark.sql import SQLContextdf.write.format("org.elasticsearch.spark.sql").option("es.resource", "logstash-2016.01.12/errors").save()


Similarly to the accepted answer right now, I was in the same boat, attempting to write out the data as an RDD. The answer above is really close, but there are a myriad of configuration options that would also be helpful. Unless you are using the default localhost for your node, this answer will not work.

A dataframe is the way to go, much cleaner, simpler. If you are using the pyspark shell, when you start the shell, add a path to the elasticsearch hadoop jar.

From the cli start the shell using:

$ pyspark2 --jars <pathtojar>/elasticsearch-hadoop-5.X.X.jar

You do not necessarily need the following line:

from pyspark.sql import SQLContext

When you have your dataframe, you simply need the following, plus possible additional options:

df.write.format("org.elasticsearch.spark.sql").option("es.resource", "<index/type>").option("es.nodes", "<enter node address or name>").save()

If the index/type you specify doesn't already exist in Elasticsearch, it will be created.

You can add additional options, which can be found here:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html


You can use mapping RDD before writing it to Elastic:

import jsonerrors.map(json.dumps)\.map(lambda x: ('key', x))

But be sure to set input json option as true in conf option:

conf= {"es.resource" : "logstash-2016.01.12/errors,"es.input.json": "true"}

This will work in addition to solve "java.util.HashMap cannot be used" error if you're facing it.