Converting EPOCH to Date in Elasticsearch Spark

Converting EPOCH to Date in Elasticsearch Spark

Let's consider the DataFrame example from your question :

scala> val df ="EVTExit")// df: org.apache.spark.sql.DataFrame = [EVTExit: date]scala> df.printSchema// root//  |-- EVTExit: date (nullable = true)

You would need to cast the column into a string and disable the which is true by default.

The parameter define whether to create a rich Date like object for Date fields in Elasticsearch or returned them as primitives (String or long). The actual object type is based on the library used; noteable exception being Map/Reduce which provides no built-in Date object and as such LongWritable and Text are returned regardless of this setting.

I agree, this is counter intuitive but it's the only solution for now if you wish that elasticsearch doesn't convert it into long format. This is actually quite painful.

scala> val df2 = df.withColumn("EVTExit_1", $"EVTExit".cast("string"))// df2: org.apache.spark.sql.DataFrame = [EVTExit: date, EVTExit_1: string]scala> +----------+----------+// |   EVTExit| EVTExit_1|// +----------+----------+// |2014-06-03|2014-06-03|// |      null|      null|// |2012-10-23|2012-10-23|// |2014-06-03|2014-06-03|// |2015-11-05|2015-11-05|// +----------+----------+

Now you can write your data to elasticsearch:

scala> df2.write.format("org.elasticsearch.spark.sql").option("", "false").save("workset/workset1")

Now let's check what's on ES. First let's see the mapping :

$ curl -XGET localhost:9200/workset?pretty=true{  "workset" : {    "aliases" : { },    "mappings" : {      "workset1" : {        "properties" : {          "EVTExit" : {            "type" : "long"          },          "EVTExit_1" : {            "type" : "date",            "format" : "strict_date_optional_time||epoch_millis"          }        }      }    },    "settings" : {      "index" : {        "creation_date" : "1475063310916",        "number_of_shards" : "5",        "number_of_replicas" : "1",        "uuid" : "i3Rb014sSziCmYm9LyIc5A",        "version" : {          "created" : "2040099"        }      }    },    "warmers" : { }  }}

It seems like we have our dates. Now let's check the contents :

$ curl -XGET localhost:9200/workset/_search?pretty=true -d '{ "size" : 1 }'{  "took" : 2,  "timed_out" : false,  "_shards" : {    "total" : 5,    "successful" : 5,    "failed" : 0  },  "hits" : {    "total" : 5,    "max_score" : 1.0,    "hits" : [ {      "_index" : "workset",      "_type" : "workset1",      "_id" : "AVdwn-vFWzMbysX5OjMA",      "_score" : 1.0,      "_source" : {        "EVTExit" : 1401746400000,        "EVTExit_1" : "2014-06-03"      }    } ]  }}

Note 1: I kept both fields for the demonstration purpose but I think that you get the point.

Note 2: Tested with Elasticsearch 2.4, Spark 1.6.2, scala 2.10 and elasticsearch-spark 2.3.2 inside spark-shell

$ spark-shell --master local[*] --packages org.elasticsearch:elasticsearch-spark_2.10:2.3.2

Note 3: Same solution in with pyspark :

from pyspark.sql.functions import coldf2 = df.withColumn("EVTExit_1",col("EVTExit").cast("string"))df2.write.format("org.elasticsearch.spark.sql") \   .option("", "false").save("workset/workset1")