Python spark Dataframe to Elasticsearch Python spark Dataframe to Elasticsearch elasticsearch elasticsearch

Python spark Dataframe to Elasticsearch


For starters saveAsNewAPIHadoopFile expects a RDD of (key, value) pairs and in your case this may happen only accidentally. The same thing applies to the value format you declare.

I am not familiar with Elastic but just based on the arguments you should probably try something similar to this:

kpi1.rdd.map(lambda row: (None, row.asDict()).saveAsNewAPIHadoopFile(...)

Since Elastic-Hadoop provide SQL Data Source you should be also able to skip that and save data directly:

df.write.format("org.elasticsearch.spark.sql").save(...)


As zero323 said, the easiest way to load a Dataframe from PySpark to Elasticsearch is with the method

Dataframe.write.format("org.elasticsearch.spark.sql").save("index/type")  


You can use something like this:

df.write.mode('overwrite').format("org.elasticsearch.spark.sql").option("es.resource", '%s/%s' % (conf['index'], conf['doc_type'])).option("es.nodes", conf['host']).option("es.port", conf['port']).save()