Parse CSV as DataFrame/DataSet with Apache Spark and Java
Procedure
Create a Class (Schema) to encapsulate your structure (it’s not required for the approach B, but it would make your code easier to read if you are using Java)
public class Record implements Serializable { String department; String designation; long costToCompany; String state; // constructor , getters and setters }
Loading CVS (JSON) file
JavaSparkContext sc;JavaRDD<String> data = sc.textFile("path/input.csv");//JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unifiedJavaRDD<Record> rdd_records = sc.textFile(data).map( new Function<String, Record>() { public Record call(String line) throws Exception { // Here you can use JSON // Gson gson = new Gson(); // gson.fromJson(line, Record.class); String[] fields = line.split(","); Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]); return sd; }});
At this point you have 2 approaches:
A. SparkSQL
Register a table (using the your defined Schema Class)
JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);table.registerAsTable("record_table");table.printSchema();
Query the table with your desired Query-group-by
JavaSchemaRDD res = sqlContext.sql(" select department,designation,state,sum(costToCompany),count(*) from record_table group by department,designation,state");
Here you would also be able to do any other query you desire, using a SQL approach
B. Spark
Mapping using a composite key:
Department
,Designation
,State
JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = rdd_records.mapToPair(new PairFunction<Record, String, Tuple2<Long, Integer>>(){ public Tuple2<String, Tuple2<Long, Integer>> call(Record record){ Tuple2<String, Tuple2<Long, Integer>> t2 = new Tuple2<String, Tuple2<Long,Integer>>( record.Department + record.Designation + record.State, new Tuple2<Long, Integer>(record.costToCompany,1) ); return t2;}
});
reduceByKey using the composite key, summing
costToCompany
column, and accumulating the number of records by keyJavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long, Integer>, Tuple2<Long, Integer>>() { public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1, Tuple2<Long, Integer> v2) throws Exception { return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2); }});
CSV file can be parsed with Spark built-in CSV reader. It will return DataFrame/DataSet on the successful read of the file. On top of DataFrame/DataSet, you apply SQL-like operations easily.
Using Spark 2.x(and above) with Java
Create SparkSession object aka spark
import org.apache.spark.sql.SparkSession;SparkSession spark = SparkSession .builder() .appName("Java Spark SQL Example") .getOrCreate();
Create Schema for Row with StructType
import org.apache.spark.sql.types.StructType;StructType schema = new StructType() .add("department", "string") .add("designation", "string") .add("ctc", "long") .add("state", "string");
Create dataframe from CSV file and apply schema to it
Dataset<Row> df = spark.read() .option("mode", "DROPMALFORMED") .schema(schema) .csv("hdfs://path/input.csv");
more option on reading data from CSV file
Now we can aggregation on data in 2 ways
1. SQL way
Register a table in spark sql metastore to perform SQL operation
df.createOrReplaceTempView("employee");
Run SQL query on registered dataframe
Dataset<Row> sqlResult = spark.sql( "SELECT department, designation, state, SUM(ctc), COUNT(department)" + " FROM employee GROUP BY department, designation, state");sqlResult.show(); //for testing
We can even execute SQL directly on CSV file with out creating table with Spark SQL
2. Object chaining or Programming or Java-like way
Do the necessary import for sql functions
import static org.apache.spark.sql.functions.count;import static org.apache.spark.sql.functions.sum;
Use
groupBy
andagg
on dataframe/dataset to performcount
andsum
on data
Dataset<Row> dfResult = df.groupBy("department", "designation", "state") .agg(sum("ctc"), count("department"));// After Spark 1.6 columns mentioned in group by will be added to result by defaultdfResult.show();//for testing
dependent libraries
"org.apache.spark" % "spark-core_2.11" % "2.0.0" "org.apache.spark" % "spark-sql_2.11" % "2.0.0"
The following might not be entirely correct, but it should give you some idea of how to juggle data. It's not pretty, should be replaced with case classes etc, but as a quick example of how to use the spark api, I hope it's enough :)
val rawlines = sc.textfile("hdfs://.../*.csv")case class Employee(dep: String, des: String, cost: Double, state: String)val employees = rawlines .map(_.split(",") /*or use a proper CSV parser*/ .map( Employee(row(0), row(1), row(2), row(3) )# the 1 is the amount of employees (which is obviously 1 per line)val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost))val results = keyVals.reduceByKey{ a,b => (a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost )}#debug outputresults.take(100).foreach(println)results .map( keyval => someThingToFormatAsCsvStringOrWhatever ) .saveAsTextFile("hdfs://.../results")
Or you can use SparkSQL:
val sqlContext = new SQLContext(sparkContext)# case classes can easily be registered as tablesemployees.registerAsTable("employees")val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*) from employees group by dep,des,state"""