Converting EPOCH to Date in Elasticsearch Spark Converting EPOCH to Date in Elasticsearch Spark elasticsearch elasticsearch

Converting EPOCH to Date in Elasticsearch Spark


Let's consider the DataFrame example from your question :

scala> val df = workset.select("EVTExit")// df: org.apache.spark.sql.DataFrame = [EVTExit: date]scala> df.printSchema// root//  |-- EVTExit: date (nullable = true)

You would need to cast the column into a string and disable the es.mapping.date.rich which is true by default.

The parameter define whether to create a rich Date like object for Date fields in Elasticsearch or returned them as primitives (String or long). The actual object type is based on the library used; noteable exception being Map/Reduce which provides no built-in Date object and as such LongWritable and Text are returned regardless of this setting.

I agree, this is counter intuitive but it's the only solution for now if you wish that elasticsearch doesn't convert it into long format. This is actually quite painful.

scala> val df2 = df.withColumn("EVTExit_1", $"EVTExit".cast("string"))// df2: org.apache.spark.sql.DataFrame = [EVTExit: date, EVTExit_1: string]scala> df2.show// +----------+----------+// |   EVTExit| EVTExit_1|// +----------+----------+// |2014-06-03|2014-06-03|// |      null|      null|// |2012-10-23|2012-10-23|// |2014-06-03|2014-06-03|// |2015-11-05|2015-11-05|// +----------+----------+

Now you can write your data to elasticsearch:

scala> df2.write.format("org.elasticsearch.spark.sql").option("es.mapping.date.rich", "false").save("workset/workset1")

Now let's check what's on ES. First let's see the mapping :

$ curl -XGET localhost:9200/workset?pretty=true{  "workset" : {    "aliases" : { },    "mappings" : {      "workset1" : {        "properties" : {          "EVTExit" : {            "type" : "long"          },          "EVTExit_1" : {            "type" : "date",            "format" : "strict_date_optional_time||epoch_millis"          }        }      }    },    "settings" : {      "index" : {        "creation_date" : "1475063310916",        "number_of_shards" : "5",        "number_of_replicas" : "1",        "uuid" : "i3Rb014sSziCmYm9LyIc5A",        "version" : {          "created" : "2040099"        }      }    },    "warmers" : { }  }}

It seems like we have our dates. Now let's check the contents :

$ curl -XGET localhost:9200/workset/_search?pretty=true -d '{ "size" : 1 }'{  "took" : 2,  "timed_out" : false,  "_shards" : {    "total" : 5,    "successful" : 5,    "failed" : 0  },  "hits" : {    "total" : 5,    "max_score" : 1.0,    "hits" : [ {      "_index" : "workset",      "_type" : "workset1",      "_id" : "AVdwn-vFWzMbysX5OjMA",      "_score" : 1.0,      "_source" : {        "EVTExit" : 1401746400000,        "EVTExit_1" : "2014-06-03"      }    } ]  }}

Note 1: I kept both fields for the demonstration purpose but I think that you get the point.

Note 2: Tested with Elasticsearch 2.4, Spark 1.6.2, scala 2.10 and elasticsearch-spark 2.3.2 inside spark-shell

$ spark-shell --master local[*] --packages org.elasticsearch:elasticsearch-spark_2.10:2.3.2

Note 3: Same solution in with pyspark :

from pyspark.sql.functions import coldf2 = df.withColumn("EVTExit_1",col("EVTExit").cast("string"))df2.write.format("org.elasticsearch.spark.sql") \   .option("es.mapping.date.rich", "false").save("workset/workset1")