Merge Spark output CSV files with a single header Merge Spark output CSV files with a single header hadoop hadoop

Merge Spark output CSV files with a single header


you can walk around like this.

  • 1.Create a new DataFrame(headerDF) containing header names.
  • 2.Union it with the DataFrame(dataDF) containing the data.
  • 3.Output the union-ed DataFrame to disk with option("header", "false").
  • 4.merge partition files(part-0000**0.csv) using hadoop FileUtil

In this ways, all partitions have no header except for a single partition's content has a row of header names from the headerDF. When all partitions are merged together, there is a single header in the top of the file. Sample code are the following

  //dataFrame is the data to save on disk  //cast types of all columns to String  val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)  //create a new data frame containing only header names  import scala.collection.JavaConverters._  val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)  //merge header names with data  headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)  //use hadoop FileUtil to merge all partition csv files into a single file  val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)  FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)


  1. Output the header using dataframe.schema( val header = dataDF.schema.fieldNames.reduce(_ + "," + _))
  2. create a file with the header on dsefs
  3. append all the partition files (headerless) to the file in #2 using hadoop Filesystem API


To merge files in a folder into one file:

import org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs._def merge(srcPath: String, dstPath: String): Unit =  {  val hadoopConfig = new Configuration()  val hdfs = FileSystem.get(hadoopConfig)  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)}

If you want to merge all files into one file, but still in the same folder (but this brings all data to the driver node):

dataFrame      .coalesce(1)      .write      .format("com.databricks.spark.csv")      .option("header", "true")      .save(out)

Another solution would be to use solution #2 then move the one file inside the folder to another path (with the name of our CSV file).

def df2csv(df: DataFrame, fileName: String, sep: String = ",", header: Boolean = false): Unit = {    val tmpDir = "tmpDir"    df.repartition(1)      .write      .format("com.databricks.spark.csv")      .option("header", header.toString)      .option("delimiter", sep)      .save(tmpDir)    val dir = new File(tmpDir)    val tmpCsvFile = tmpDir + File.separatorChar + "part-00000"    (new File(tmpCsvFile)).renameTo(new File(fileName))    dir.listFiles.foreach( f => f.delete )    dir.delete}