Does Spark not support arraylist when writing to elasticsearch?
A bit late to the game, but this is the solution we came up with after running in to this yesterday. Add 'es.input.json': 'true'
to your conf, and then run json.dumps()
on your data.
Modifying your example, this would look like:
import jsonrdd = sc.parallelize([{"key1": ["val1", "val2"]}])json_rdd = rdd.map(json.dumps)json_rdd.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf={ "es.nodes" : "localhost", "es.port" : "9200", "es.resource" : "mboyd/mboydtype", "es.input.json": "true" })
Just had this problem, and the solution passes by converting all lists to tuples .Converting to json does same.
I feel there are a few points missing in other answers like you'll have to return a 2-tuple (I don't know why) from your RDD and will also need the Elasticsearch hadoop jar file to make it work. So I'll write the whole process that I had to follow to make it work.
Download the Elasticsearch Hadoop jar file. You can download it from the central maven repository (the latest version should work in most cases - check out their official requirements README for more).
Create a file
run.py
with the following minimal code snippet for the demonstration.import jsonimport pymongo_sparkpymongo_spark.activate()from pyspark import SparkContext, SparkConfconf = SparkConf().setAppName('demo').setMaster('local')sc = SparkContext(conf=conf)rdd = sc.parallelize([{"key1": ["val1", "val2"]}])final_rdd = rdd.map(json.dumps).map(lambda x: ('key', x))final_rdd.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf={ "es.nodes" : "<server-ip>", "es.port" : "9200", "es.resource" : "index_name/doc_type_name", "es.input.json": "true" })
Run your Spark job with the following command
./bin/spark-submit --jars /path/to/your/jar/file/elasticsearch-hadoop-5.6.4.jar --driver-class-path /path/to/you/jar/file/elasticsearch-hadoop-5.6.4.jar --master yarn /path/to/your/run/file/run.py
HTH!