How to execute a SQL query against ElasticSearch (using org.elasticsearch.spark.sql format)? How to execute a SQL query against ElasticSearch (using org.elasticsearch.spark.sql format)? elasticsearch elasticsearch

How to execute a SQL query against ElasticSearch (using org.elasticsearch.spark.sql format)?


In my case, I am running the ElasticSearch and Spark instances as Docker containers within Google Cloud. After reading and searching everywhere, I came across this: https://www.elastic.co/guide/en/elasticsearch/hadoop/master/cloud.html. The ES Spark connector has a setting called es.nodes.wan.only which makes it function within cloud environment such as Google Cloud or AWS. I had the exact same problem as the OP before, e.g. printSchema worked but any aggregrates did not work, and this exact setting fixed it for me!

You can configure it when initialising your SparkConfig, like this:

val conf = new SparkConf().setAppName("MySparkModel").set("es.nodes.wan.only", "true)


DISCLAIMER: I've got no experience with ElasticSearch so mistakes are no exception.

I use the very latest unreleased Spark version 2.1.0-SNAPSHOT I built today's morning.

$ ./bin/spark-shell --versionWelcome to      ____              __     / __/__  ___ _____/ /__    _\ \/ _ \/ _ `/ __/  '_/   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-SNAPSHOT      /_/Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_112Branch masterCompiled by user jacek on 2016-12-02T05:06:30ZRevision a5f02b00291e0a22429a3dca81f12cf6d38fea0bUrl https://github.com/apache/spark.gitType --help for more information.

I use ElasticSearch 5.0.2 that I installed using the official documentation Installation Steps.

With no changes I ran an ES instance using ./bin/elasticsearch. No changes to the default configuration.

$ curl http://localhost:9200/{  "name" : "5PW1rOj",  "cluster_name" : "elasticsearch",  "cluster_uuid" : "nqHBMN7JTw2j8_FD0FZpDg",  "version" : {    "number" : "5.0.2",    "build_hash" : "f6b4951",    "build_date" : "2016-11-24T10:07:18.101Z",    "build_snapshot" : false,    "lucene_version" : "6.2.1"  },  "tagline" : "You Know, for Search"}

I found the elasticsearch-hadoop module for Spark. They have just released 5.0.2 so (after some reading about how to work with Spark and ES) I ran spark-shell as follows:

$ ./bin/spark-shell --packages org.elasticsearch:elasticsearch-spark-20_2.11:5.0.2

Please note that I knew and still know nothing about ElasticSearch.

When spark-shell was up I executed the following:

scala> import org.elasticsearch.spark.sql._import org.elasticsearch.spark.sql._scala> (0 to 10).toDF.saveToEs("spark/helloworld")

I did notice that the Spark job executed properly as I could've noticed it in the logs of ElasticSearch:

[2016-12-02T21:23:02,628][INFO ][o.e.c.m.MetaDataMappingService] [5PW1rOj] [spark/jXB6Km6xSjuHxitxjj6Ebw] create_mapping [helloworld]

After playing with ES for some time I ended up with the following indices:

$ http http://localhost:9200/sparkHTTP/1.1 200 OKcontent-encoding: gzipcontent-type: application/json; charset=UTF-8transfer-encoding: chunked{    "spark": {        "aliases": {},        "mappings": {            "hello": {                "properties": {                    "value": {                        "type": "long"                    }                }            },            "hello2": {                "properties": {                    "value": {                        "type": "long"                    }                }            },            "helloworld": {                "properties": {                    "value": {                        "type": "long"                    }                }            }        },        "settings": {            "index": {                "creation_date": "1480709570254",                "number_of_replicas": "1",                "number_of_shards": "5",                "provided_name": "spark",                "uuid": "jXB6Km6xSjuHxitxjj6Ebw",                "version": {                    "created": "5000299"                }            }        }    }}

I could find my last spark/helloworld using the following query:

$ http http://localhost:9200/spark/helloworld/7HTTP/1.1 404 Not Foundcontent-encoding: gzipcontent-type: application/json; charset=UTF-8transfer-encoding: chunked{    "_id": "7",    "_index": "spark",    "_type": "helloworld",    "found": false}

And the last but not least, time for Spark's web UI with the Spark job for ElasticSearch query.

Spark's web UI with a job for ElasticSearch

It appears that it works very well from the very beginning with no changes to ElasticSearch or the module.

Ah, I'd have forgotten about count:

scala> spark.read.format("es").load("spark/helloworld").countres4: Long = 22

count in Spark's web UI


The question is quite old, but I will describe the approach that I used to work with ES 5.0 and Spark 2.0 for future references. I think the ES-Hadoop documentation is a bit unclear on what artifact and API to use.

I used org.elasticsearch:elasticsearch-spark-20_2.11:5.0 and the following code:

// add to your class importsimport org.elasticsearch.spark.sql._// Use Spark 2.0 SparkSession object to provide your configval sparkSession = SparkSession.builder().config(...).getOrCreate()// Optional step, imports things like $"column"import sparkSession.implicits._// Specify your index and type in ESval df = spark.esDF("index/type")// Perform an actiondf.count()

I assume the situation is quite similar for Spark 1.6 with some minor changes. In particular, you should use SQLContext or HiveContext instead of SparkSession.