PySpark / Spark Window Function First/ Last Issue PySpark / Spark Window Function First/ Last Issue sql sql

PySpark / Spark Window Function First/ Last Issue


It is not incorrect. Your window definition is just not what you think it is.

If you provide ORDER BY clause then the default frame is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:

from pyspark.sql.window import Windowfrom pyspark.sql.functions import first, lastw = Window.partitionBy('Dept').orderBy('Age')df = spark.createDataFrame(    [(38, "medicine", 4), (41, "medicine", 5), (55, "medicine", 7)],    ("Age", "Dept", "ID"))df.select(    "*",    first('ID').over(w).alias("first_id"),     last('ID').over(w).alias("last_id")).explain()
== Physical Plan ==Window [first(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS first_id#38L, last(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS last_id#40L], [Dept#23], [Age#22L ASC NULLS FIRST]+- *(1) Sort [Dept#23 ASC NULLS FIRST, Age#22L ASC NULLS FIRST], false, 0   +- Exchange hashpartitioning(Dept#23, 200)      +- Scan ExistingRDD[Age#22L,Dept#23,ID#24L]

This means that the window function never looks ahead and the last row in the frame is the current row.

You should redefine the window as

w_uf = (Window   .partitionBy('Dept')   .orderBy('Age')   .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))result = df.select(    "*",     first('ID').over(w_uf).alias("first_id"),    last('ID').over(w_uf).alias("last_id"))
== Physical Plan ==Window [first(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS first_id#56L, last(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS last_id#58L], [Dept#23], [Age#22L ASC NULLS FIRST]+- *(1) Sort [Dept#23 ASC NULLS FIRST, Age#22L ASC NULLS FIRST], false, 0   +- Exchange hashpartitioning(Dept#23, 200)      +- Scan ExistingRDD[Age#22L,Dept#23,ID#24L]
result.show()
+---+--------+---+--------+-------+|Age|    Dept| ID|first_id|last_id|+---+--------+---+--------+-------+| 38|medicine|  4|       4|      7|| 41|medicine|  5|       4|      7|| 55|medicine|  7|       4|      7|+---+--------+---+--------+-------+