Pyspark - how to backfill a DataFrame?
Actually backfill on distributed dataset is not as easy task as in pandas (local) dataframe - you cannot be sure that value to fill exists in the same partition. I would use crossJoin with windowing, for example fo DF:
df = spark.createDataFrame([ ('2017-01-01', None), ('2017-01-02', 'B'), ('2017-01-03', None), ('2017-01-04', None), ('2017-01-05', 'E'), ('2017-01-06', None), ('2017-01-07', 'G')], ['date', 'value'])df.show()+----------+-----+| date|value|+----------+-----+|2017-01-01| null||2017-01-02| B||2017-01-03| null||2017-01-04| null||2017-01-05| E||2017-01-06| null||2017-01-07| G|+----------+-----+
The code would be:
from pyspark.sql.window import Windowdf.alias('a').crossJoin(df.alias('b')) \ .where((col('b.date') >= col('a.date')) & (col('a.value').isNotNull() | col('b.value').isNotNull())) \ .withColumn('rn', row_number().over(Window.partitionBy('a.date').orderBy('b.date'))) \ .where(col('rn') == 1) \ .select('a.date', coalesce('a.value', 'b.value').alias('value')) \ .orderBy('a.date') \ .show()+----------+-----+| date|value|+----------+-----+|2017-01-01| B||2017-01-02| B||2017-01-03| E||2017-01-04| E||2017-01-05| E||2017-01-06| G||2017-01-07| G|+----------+-----+
The last
and first
functions, with their ignorenulls=True
flags, can be combined with the rowsBetween
windowing. If we want to fill backwards, we select the first non-null that is between the current row and the end. If we want to fill forwards, we select the last non-null that is between the beginning and the current row.
from pyspark.sql import functions as Ffrom pyspark.sql.window import Window as Wimport sysdf.withColumn( 'data', F.first( F.col('data'), ignorenulls=True ) \ .over( W.orderBy('date').rowsBetween(0, sys.maxsize) ) )
source on filling in spark: https://towardsdatascience.com/end-to-end-time-series-interpolation-in-pyspark-filling-the-gap-5ccefc6b7fc9