How to query an Elasticsearch index using Pyspark and Dataframes How to query an Elasticsearch index using Pyspark and Dataframes elasticsearch elasticsearch

How to query an Elasticsearch index using Pyspark and Dataframes


Below is how I do it.

General environment settings and command:

export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6export PYSPARK_DRIVER_PYTHON=ipython2./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar

Code:

from pyspark import SparkConffrom pyspark.sql import SQLContextconf = SparkConf().setAppName("ESTest")sc = SparkContext(conf=conf)sqlContext = SQLContext(sc)q ="""{  "query": {    "filtered": {      "filter": {        "exists": {          "field": "label"        }      },      "query": {        "match_all": {}      }    }  }}"""es_read_conf = {    "es.nodes" : "localhost",    "es.port" : "9200",    "es.resource" : "titanic/passenger",    "es.query" : q}es_rdd = sc.newAPIHadoopRDD(    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",    keyClass="org.apache.hadoop.io.NullWritable",     valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",     conf=es_read_conf)sqlContext.createDataFrame(es_rdd).collect()

You can also define data-frame columns. Refer Here for more info.

Hope that it helps!


I am running my code in a EMR cluster from Amazon using pyspark. Then, the way I made it work was following these steps:

1) Put this bootstrap action in the cluster creation (to create localhost elasticsearch server):

s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb

2) I run these commands to populate the elasticsearch database with some data:

 curl -XPUT "http://localhost:9200/movies/movie/1" -d' {   "title": "The Godfather",   "director": "Francis Ford Coppola",   "year": 1972  }'

You can also run other curl commands if you wish, like:

curl -XGET http://localhost:9200/_search?pretty=true&q={'matchAll':{''}}

3) I inited pyspark using the following parameters:

pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar

I had downloaded the elasticsearch python client previously

4) I run the following code:

from pyspark import SparkConffrom pyspark.sql import SQLContextq ="""{  "query": {    "match_all": {}  }  }"""es_read_conf = {    "es.nodes" : "localhost",    "es.port" : "9200",    "es.resource" : "movies/movie",    "es.query" : q}es_rdd = sc.newAPIHadoopRDD(    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",    keyClass="org.apache.hadoop.io.NullWritable",     valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",     conf=es_read_conf)sqlContext.createDataFrame(es_rdd).collect()

Then I finally got successful result from the command.


I have faced a issue similar to this to get geo-filtered data into a PySpark DataFrame. I am using elasticsearch-spark-20_2.11-5.2.2.jar with Spark version 2.1.1 and ES version 5.2. I was able to load the data into a DataFrame by specifying my query as an option while creating the DataFrame

My geo-query

q ="""{  "query": {        "bool" : {            "must" : {                "match_all" : {}            },           "filter" : {                "geo_distance" : {                    "distance" : "100km",                    "location" : {                        "lat" : 35.825,                        "lon" : -87.99                    }                }            }        }    }}"""

I used the following command to load data into DataFrame

spark_df = spark.read.format("es").option("es.query", q).load("index_name")