How to perform union on two DataFrames with different amounts of columns in spark? How to perform union on two DataFrames with different amounts of columns in spark? python python

How to perform union on two DataFrames with different amounts of columns in spark?


In Scala you just have to append all missing columns as nulls.

import org.apache.spark.sql.functions._// let df1 and df2 the Dataframes to mergeval df1 = sc.parallelize(List(  (50, 2),  (34, 4))).toDF("age", "children")val df2 = sc.parallelize(List(  (26, true, 60000.00),  (32, false, 35000.00))).toDF("age", "education", "income")val cols1 = df1.columns.toSetval cols2 = df2.columns.toSetval total = cols1 ++ cols2 // uniondef expr(myCols: Set[String], allCols: Set[String]) = {  allCols.toList.map(x => x match {    case x if myCols.contains(x) => col(x)    case _ => lit(null).as(x)  })}df1.select(expr(cols1, total):_*).unionAll(df2.select(expr(cols2, total):_*)).show()+---+--------+---------+-------+|age|children|education| income|+---+--------+---------+-------+| 50|       2|     null|   null|| 34|       4|     null|   null|| 26|    null|     true|60000.0|| 32|    null|    false|35000.0|+---+--------+---------+-------+

Update

Both temporal DataFrames will have the same order of columns, because we are mapping through total in both cases.

df1.select(expr(cols1, total):_*).show()df2.select(expr(cols2, total):_*).show()+---+--------+---------+------+|age|children|education|income|+---+--------+---------+------+| 50|       2|     null|  null|| 34|       4|     null|  null|+---+--------+---------+------++---+--------+---------+-------+|age|children|education| income|+---+--------+---------+-------+| 26|    null|     true|60000.0|| 32|    null|    false|35000.0|+---+--------+---------+-------+


Spark 3.1+

df = df1.unionByName(df2, allowMissingColumns=True)

Test results:

from pyspark.sql import SparkSessionspark = SparkSession.builder.getOrCreate()data1=[(1 , '2016-08-29', 1 , 2, 3),(2 , '2016-08-29', 1 , 2, 3),(3 , '2016-08-29', 1 , 2, 3)]df1 = spark.createDataFrame(data1, ['code' , 'date' , 'A' , 'B', 'C'])data2=[(5 , '2016-08-29', 1, 2, 3, 4),(6 , '2016-08-29', 1, 2, 3, 4),(7 , '2016-08-29', 1, 2, 3, 4)]df2 = spark.createDataFrame(data2, ['code' , 'date' , 'B', 'C', 'D', 'E'])df = df1.unionByName(df2, allowMissingColumns=True)df.show()#     +----+----------+----+---+---+----+----+#     |code|      date|   A|  B|  C|   D|   E|#     +----+----------+----+---+---+----+----+#     |   1|2016-08-29|   1|  2|  3|null|null|#     |   2|2016-08-29|   1|  2|  3|null|null|#     |   3|2016-08-29|   1|  2|  3|null|null|#     |   5|2016-08-29|null|  1|  2|   3|   4|#     |   6|2016-08-29|null|  1|  2|   3|   4|#     |   7|2016-08-29|null|  1|  2|   3|   4|#     +----+----------+----+---+---+----+----+


Here is the code for Python 3.0 using pyspark:

from pyspark.sql.functions import litdef __order_df_and_add_missing_cols(df, columns_order_list, df_missing_fields):    """ return ordered dataFrame by the columns order list with null in missing columns """    if not df_missing_fields:  # no missing fields for the df        return df.select(columns_order_list)    else:        columns = []        for colName in columns_order_list:            if colName not in df_missing_fields:                columns.append(colName)            else:                columns.append(lit(None).alias(colName))        return df.select(columns)def __add_missing_columns(df, missing_column_names):    """ Add missing columns as null in the end of the columns list """    list_missing_columns = []    for col in missing_column_names:        list_missing_columns.append(lit(None).alias(col))    return df.select(df.schema.names + list_missing_columns)def __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols):    """ return union of data frames with ordered columns by left_df. """    left_df_all_cols = __add_missing_columns(left_df, left_list_miss_cols)    right_df_all_cols = __order_df_and_add_missing_cols(right_df, left_df_all_cols.schema.names,                                                        right_list_miss_cols)    return left_df_all_cols.union(right_df_all_cols)def union_d_fs(left_df, right_df):    """ Union between two dataFrames, if there is a gap of column fields,     it will append all missing columns as nulls """    # Check for None input    if left_df is None:        raise ValueError('left_df parameter should not be None')    if right_df is None:        raise ValueError('right_df parameter should not be None')        # For data frames with equal columns and order- regular union    if left_df.schema.names == right_df.schema.names:        return left_df.union(right_df)    else:  # Different columns        # Save dataFrame columns name list as set        left_df_col_list = set(left_df.schema.names)        right_df_col_list = set(right_df.schema.names)        # Diff columns between left_df and right_df        right_list_miss_cols = list(left_df_col_list - right_df_col_list)        left_list_miss_cols = list(right_df_col_list - left_df_col_list)        return __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols)