Spark SQL window function with complex condition Spark SQL window function with complex condition sql sql

Spark SQL window function with complex condition


Spark >= 3.2

Recent Spark releases provide native support for session windows in both batch and structured streaming queries (see SPARK-10816 and its sub-tasks, especially SPARK-34893).

The official documentation provides nice usage example.

Spark < 3.2

Here is the trick. Import a bunch of functions:

import org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}

Define windows:

val userWindow = Window.partitionBy("user_name").orderBy("login_date")val userSessionWindow = Window.partitionBy("user_name", "session")

Find the points where new sessions starts:

val newSession =  (coalesce(  datediff($"login_date", lag($"login_date", 1).over(userWindow)),  lit(0)) > 5).cast("bigint")val sessionized = df.withColumn("session", sum(newSession).over(userWindow))

Find the earliest date per session:

val result = sessionized  .withColumn("became_active", min($"login_date").over(userSessionWindow))  .drop("session")

With dataset defined as:

val df = Seq(  ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),  ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),   ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),  ("SirChillingtonIV", "2012-08-11")).toDF("user_name", "login_date")

The result is:

+----------------+----------+-------------+|       user_name|login_date|became_active|+----------------+----------+-------------+|  OprahWinfreyJr|2012-01-10|   2012-01-10||SirChillingtonIV|2012-01-04|   2012-01-04| <- The first session for user|SirChillingtonIV|2012-01-11|   2012-01-11| <- The second session for user|SirChillingtonIV|2012-01-14|   2012-01-11| |SirChillingtonIV|2012-08-11|   2012-08-11| <- The third session for user|Booooooo99900098|2012-01-04|   2012-01-04||Booooooo99900098|2012-01-06|   2012-01-04|+----------------+----------+-------------+


Refactoring the other answer to work with Pyspark

In Pyspark you can do like below.

create data frame

df = sqlContext.createDataFrame([("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"), ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"), ("SirChillingtonIV", "2012-08-11")], ("user_name", "login_date"))

The above code creates a data frame like below

+----------------+----------+|       user_name|login_date|+----------------+----------+|SirChillingtonIV|2012-01-04||Booooooo99900098|2012-01-04||Booooooo99900098|2012-01-06||  OprahWinfreyJr|2012-01-10||SirChillingtonIV|2012-01-11||SirChillingtonIV|2012-01-14||SirChillingtonIV|2012-08-11|+----------------+----------+

Now we want to first find out the difference between login_date is more than 5 days.

For this do like below.

Necessary imports

from pyspark.sql import functions as ffrom pyspark.sql import Window# defining window partitions  login_window = Window.partitionBy("user_name").orderBy("login_date")session_window = Window.partitionBy("user_name", "session")session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))

When we run the above line of code if the date_diff is NULL then the coalesce function will replace NULL to 0.

+----------------+----------+-------+|       user_name|login_date|session|+----------------+----------+-------+|  OprahWinfreyJr|2012-01-10|      0||SirChillingtonIV|2012-01-04|      0||SirChillingtonIV|2012-01-11|      1||SirChillingtonIV|2012-01-14|      1||SirChillingtonIV|2012-08-11|      2||Booooooo99900098|2012-01-04|      0||Booooooo99900098|2012-01-06|      0|+----------------+----------+-------+# add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above stepfinal_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")+----------------+----------+-------------+|       user_name|login_date|became_active|+----------------+----------+-------------+|  OprahWinfreyJr|2012-01-10|   2012-01-10||SirChillingtonIV|2012-01-04|   2012-01-04||SirChillingtonIV|2012-01-11|   2012-01-11||SirChillingtonIV|2012-01-14|   2012-01-11||SirChillingtonIV|2012-08-11|   2012-08-11||Booooooo99900098|2012-01-04|   2012-01-04||Booooooo99900098|2012-01-06|   2012-01-04|+----------------+----------+-------------+