PySpark / Spark Window Function First/ Last Issue
It is not incorrect. Your window definition is just not what you think it is.
If you provide ORDER BY
clause then the default frame is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
:
from pyspark.sql.window import Windowfrom pyspark.sql.functions import first, lastw = Window.partitionBy('Dept').orderBy('Age')df = spark.createDataFrame( [(38, "medicine", 4), (41, "medicine", 5), (55, "medicine", 7)], ("Age", "Dept", "ID"))df.select( "*", first('ID').over(w).alias("first_id"), last('ID').over(w).alias("last_id")).explain()
== Physical Plan ==Window [first(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS first_id#38L, last(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS last_id#40L], [Dept#23], [Age#22L ASC NULLS FIRST]+- *(1) Sort [Dept#23 ASC NULLS FIRST, Age#22L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(Dept#23, 200) +- Scan ExistingRDD[Age#22L,Dept#23,ID#24L]
This means that the window function never looks ahead and the last row in the frame is the current row.
You should redefine the window as
w_uf = (Window .partitionBy('Dept') .orderBy('Age') .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))result = df.select( "*", first('ID').over(w_uf).alias("first_id"), last('ID').over(w_uf).alias("last_id"))
== Physical Plan ==Window [first(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS first_id#56L, last(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS last_id#58L], [Dept#23], [Age#22L ASC NULLS FIRST]+- *(1) Sort [Dept#23 ASC NULLS FIRST, Age#22L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(Dept#23, 200) +- Scan ExistingRDD[Age#22L,Dept#23,ID#24L]
result.show()
+---+--------+---+--------+-------+|Age| Dept| ID|first_id|last_id|+---+--------+---+--------+-------+| 38|medicine| 4| 4| 7|| 41|medicine| 5| 4| 7|| 55|medicine| 7| 4| 7|+---+--------+---+--------+-------+