How to perform union on two DataFrames with different amounts of columns in spark?
In Scala you just have to append all missing columns as nulls
.
import org.apache.spark.sql.functions._// let df1 and df2 the Dataframes to mergeval df1 = sc.parallelize(List( (50, 2), (34, 4))).toDF("age", "children")val df2 = sc.parallelize(List( (26, true, 60000.00), (32, false, 35000.00))).toDF("age", "education", "income")val cols1 = df1.columns.toSetval cols2 = df2.columns.toSetval total = cols1 ++ cols2 // uniondef expr(myCols: Set[String], allCols: Set[String]) = { allCols.toList.map(x => x match { case x if myCols.contains(x) => col(x) case _ => lit(null).as(x) })}df1.select(expr(cols1, total):_*).unionAll(df2.select(expr(cols2, total):_*)).show()+---+--------+---------+-------+|age|children|education| income|+---+--------+---------+-------+| 50| 2| null| null|| 34| 4| null| null|| 26| null| true|60000.0|| 32| null| false|35000.0|+---+--------+---------+-------+
Update
Both temporal DataFrames
will have the same order of columns, because we are mapping through total
in both cases.
df1.select(expr(cols1, total):_*).show()df2.select(expr(cols2, total):_*).show()+---+--------+---------+------+|age|children|education|income|+---+--------+---------+------+| 50| 2| null| null|| 34| 4| null| null|+---+--------+---------+------++---+--------+---------+-------+|age|children|education| income|+---+--------+---------+-------+| 26| null| true|60000.0|| 32| null| false|35000.0|+---+--------+---------+-------+
Spark 3.1+
df = df1.unionByName(df2, allowMissingColumns=True)
Test results:
from pyspark.sql import SparkSessionspark = SparkSession.builder.getOrCreate()data1=[(1 , '2016-08-29', 1 , 2, 3),(2 , '2016-08-29', 1 , 2, 3),(3 , '2016-08-29', 1 , 2, 3)]df1 = spark.createDataFrame(data1, ['code' , 'date' , 'A' , 'B', 'C'])data2=[(5 , '2016-08-29', 1, 2, 3, 4),(6 , '2016-08-29', 1, 2, 3, 4),(7 , '2016-08-29', 1, 2, 3, 4)]df2 = spark.createDataFrame(data2, ['code' , 'date' , 'B', 'C', 'D', 'E'])df = df1.unionByName(df2, allowMissingColumns=True)df.show()# +----+----------+----+---+---+----+----+# |code| date| A| B| C| D| E|# +----+----------+----+---+---+----+----+# | 1|2016-08-29| 1| 2| 3|null|null|# | 2|2016-08-29| 1| 2| 3|null|null|# | 3|2016-08-29| 1| 2| 3|null|null|# | 5|2016-08-29|null| 1| 2| 3| 4|# | 6|2016-08-29|null| 1| 2| 3| 4|# | 7|2016-08-29|null| 1| 2| 3| 4|# +----+----------+----+---+---+----+----+
Here is the code for Python 3.0 using pyspark:
from pyspark.sql.functions import litdef __order_df_and_add_missing_cols(df, columns_order_list, df_missing_fields): """ return ordered dataFrame by the columns order list with null in missing columns """ if not df_missing_fields: # no missing fields for the df return df.select(columns_order_list) else: columns = [] for colName in columns_order_list: if colName not in df_missing_fields: columns.append(colName) else: columns.append(lit(None).alias(colName)) return df.select(columns)def __add_missing_columns(df, missing_column_names): """ Add missing columns as null in the end of the columns list """ list_missing_columns = [] for col in missing_column_names: list_missing_columns.append(lit(None).alias(col)) return df.select(df.schema.names + list_missing_columns)def __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols): """ return union of data frames with ordered columns by left_df. """ left_df_all_cols = __add_missing_columns(left_df, left_list_miss_cols) right_df_all_cols = __order_df_and_add_missing_cols(right_df, left_df_all_cols.schema.names, right_list_miss_cols) return left_df_all_cols.union(right_df_all_cols)def union_d_fs(left_df, right_df): """ Union between two dataFrames, if there is a gap of column fields, it will append all missing columns as nulls """ # Check for None input if left_df is None: raise ValueError('left_df parameter should not be None') if right_df is None: raise ValueError('right_df parameter should not be None') # For data frames with equal columns and order- regular union if left_df.schema.names == right_df.schema.names: return left_df.union(right_df) else: # Different columns # Save dataFrame columns name list as set left_df_col_list = set(left_df.schema.names) right_df_col_list = set(right_df.schema.names) # Diff columns between left_df and right_df right_list_miss_cols = list(left_df_col_list - right_df_col_list) left_list_miss_cols = list(right_df_col_list - left_df_col_list) return __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols)