How to write data in Elasticsearch from Pyspark?
I was having a similar issue and here's how I managed to solve it. First I used a dataframe vs using a RDD.
Once in a dataframe
from pyspark.sql import SQLContextdf.write.format("org.elasticsearch.spark.sql").option("es.resource", "logstash-2016.01.12/errors").save()
Similarly to the accepted answer right now, I was in the same boat, attempting to write out the data as an RDD. The answer above is really close, but there are a myriad of configuration options that would also be helpful. Unless you are using the default localhost
for your node, this answer will not work.
A dataframe is the way to go, much cleaner, simpler. If you are using the pyspark shell, when you start the shell, add a path to the elasticsearch hadoop jar.
From the cli start the shell using:
$ pyspark2 --jars <pathtojar>/elasticsearch-hadoop-5.X.X.jar
You do not necessarily need the following line:
from pyspark.sql import SQLContext
When you have your dataframe, you simply need the following, plus possible additional options:
df.write.format("org.elasticsearch.spark.sql").option("es.resource", "<index/type>").option("es.nodes", "<enter node address or name>").save()
If the index/type you specify doesn't already exist in Elasticsearch, it will be created.
You can add additional options, which can be found here:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
You can use mapping RDD before writing it to Elastic:
import jsonerrors.map(json.dumps)\.map(lambda x: ('key', x))
But be sure to set input json option as true in conf option:
conf= {"es.resource" : "logstash-2016.01.12/errors,"es.input.json": "true"}
This will work in addition to solve "java.util.HashMap cannot be used" error if you're facing it.