Python spark Dataframe to Elasticsearch
For starters saveAsNewAPIHadoopFile
expects a RDD
of (key, value)
pairs and in your case this may happen only accidentally. The same thing applies to the value format you declare.
I am not familiar with Elastic but just based on the arguments you should probably try something similar to this:
kpi1.rdd.map(lambda row: (None, row.asDict()).saveAsNewAPIHadoopFile(...)
Since Elastic-Hadoop provide SQL Data Source you should be also able to skip that and save data directly:
df.write.format("org.elasticsearch.spark.sql").save(...)
As zero323 said, the easiest way to load a Dataframe from PySpark to Elasticsearch is with the method
Dataframe.write.format("org.elasticsearch.spark.sql").save("index/type")
You can use something like this:
df.write.mode('overwrite').format("org.elasticsearch.spark.sql").option("es.resource", '%s/%s' % (conf['index'], conf['doc_type'])).option("es.nodes", conf['host']).option("es.port", conf['port']).save()