How to query an Elasticsearch index using Pyspark and Dataframes
Below is how I do it.
General environment settings and command:
export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6export PYSPARK_DRIVER_PYTHON=ipython2./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar
Code:
from pyspark import SparkConffrom pyspark.sql import SQLContextconf = SparkConf().setAppName("ESTest")sc = SparkContext(conf=conf)sqlContext = SQLContext(sc)q ="""{ "query": { "filtered": { "filter": { "exists": { "field": "label" } }, "query": { "match_all": {} } } }}"""es_read_conf = { "es.nodes" : "localhost", "es.port" : "9200", "es.resource" : "titanic/passenger", "es.query" : q}es_rdd = sc.newAPIHadoopRDD( inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_read_conf)sqlContext.createDataFrame(es_rdd).collect()
You can also define data-frame columns. Refer Here for more info.
Hope that it helps!
I am running my code in a EMR cluster from Amazon using pyspark. Then, the way I made it work was following these steps:
1) Put this bootstrap action in the cluster creation (to create localhost elasticsearch server):
s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb
2) I run these commands to populate the elasticsearch database with some data:
curl -XPUT "http://localhost:9200/movies/movie/1" -d' { "title": "The Godfather", "director": "Francis Ford Coppola", "year": 1972 }'
You can also run other curl commands if you wish, like:
curl -XGET http://localhost:9200/_search?pretty=true&q={'matchAll':{''}}
3) I inited pyspark using the following parameters:
pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar
I had downloaded the elasticsearch python client previously
4) I run the following code:
from pyspark import SparkConffrom pyspark.sql import SQLContextq ="""{ "query": { "match_all": {} } }"""es_read_conf = { "es.nodes" : "localhost", "es.port" : "9200", "es.resource" : "movies/movie", "es.query" : q}es_rdd = sc.newAPIHadoopRDD( inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_read_conf)sqlContext.createDataFrame(es_rdd).collect()
Then I finally got successful result from the command.
I have faced a issue similar to this to get geo-filtered data into a PySpark DataFrame. I am using elasticsearch-spark-20_2.11-5.2.2.jar with Spark version 2.1.1 and ES version 5.2. I was able to load the data into a DataFrame by specifying my query as an option while creating the DataFrame
My geo-query
q ="""{ "query": { "bool" : { "must" : { "match_all" : {} }, "filter" : { "geo_distance" : { "distance" : "100km", "location" : { "lat" : 35.825, "lon" : -87.99 } } } } }}"""
I used the following command to load data into DataFrame
spark_df = spark.read.format("es").option("es.query", q).load("index_name")