Does Spark not support arraylist when writing to elasticsearch? Does Spark not support arraylist when writing to elasticsearch? elasticsearch elasticsearch

Does Spark not support arraylist when writing to elasticsearch?


A bit late to the game, but this is the solution we came up with after running in to this yesterday. Add 'es.input.json': 'true' to your conf, and then run json.dumps() on your data.

Modifying your example, this would look like:

import jsonrdd = sc.parallelize([{"key1": ["val1", "val2"]}])json_rdd = rdd.map(json.dumps)json_rdd.saveAsNewAPIHadoopFile(     path='-',     outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",     keyClass="org.apache.hadoop.io.NullWritable",     valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",     conf={         "es.nodes" : "localhost",         "es.port" : "9200",         "es.resource" : "mboyd/mboydtype",        "es.input.json": "true"    }) 


Just had this problem, and the solution passes by converting all lists to tuples .Converting to json does same.


I feel there are a few points missing in other answers like you'll have to return a 2-tuple (I don't know why) from your RDD and will also need the Elasticsearch hadoop jar file to make it work. So I'll write the whole process that I had to follow to make it work.

  1. Download the Elasticsearch Hadoop jar file. You can download it from the central maven repository (the latest version should work in most cases - check out their official requirements README for more).

  2. Create a file run.py with the following minimal code snippet for the demonstration.

    import jsonimport pymongo_sparkpymongo_spark.activate()from pyspark import SparkContext, SparkConfconf = SparkConf().setAppName('demo').setMaster('local')sc = SparkContext(conf=conf)rdd = sc.parallelize([{"key1": ["val1", "val2"]}])final_rdd = rdd.map(json.dumps).map(lambda x: ('key', x))final_rdd.saveAsNewAPIHadoopFile(    path='-',    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",    keyClass="org.apache.hadoop.io.NullWritable",    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",    conf={        "es.nodes" : "<server-ip>",        "es.port" : "9200",        "es.resource" : "index_name/doc_type_name",        "es.input.json": "true"    })
  3. Run your Spark job with the following command ./bin/spark-submit --jars /path/to/your/jar/file/elasticsearch-hadoop-5.6.4.jar --driver-class-path /path/to/you/jar/file/elasticsearch-hadoop-5.6.4.jar --master yarn /path/to/your/run/file/run.py

HTH!