Converting JavaRDD to DataFrame in Spark java Converting JavaRDD to DataFrame in Spark java hadoop hadoop

Converting JavaRDD to DataFrame in Spark java


Imports:

import java.io.Serializable;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;

Create a POJO class for URL. I'd recommend you to write for Log line which consists of url, date, time, method, target,.. etc as members

public static class Url implements Serializable {  private String value;  public String getValue() {    return value;  }  public void setValue(String value) {    this.value = value;  }}  

Create an RDD of Url objects from a text file

JavaRDD<Url> urlsRDD = spark.read()  .textFile("/Users/karuturi/Downloads/log.txt")  .javaRDD()  .map(new Function<String, Url>() {    @Override    public Url call(String line) throws Exception {      String[] parts = line.split("\\t");      Url url = new Url();      url.setValue(parts[0].replaceAll("[", ""));      return url;    }  });

Create DataFrame from RDD

Dataset<Row> urlsDF = spark.createDataFrame(urlsRDD, Url.class);

RDD to DataFrame - Spark 2.0
RDD to DataFrame - Spark 1.6


You can do something like (I am converting on the fly from scala so excuse any typos):

import org.apache.spark.sql.Rowimport org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;JavaRDD<Row> rowRDD = urlrdd.map(new Function<String, Row>() {    @Override    public Row call(String record) throws Exception {        return RowFactory.create(record());    }}// now you wish to create the target schema. This is basically a list of// fields (each field would be a column) which you are adding to a StructTypeList<StructField> fields = new ArrayList<>();StructField field = DataTypes.createStructField("url", DataTypes.StringType, true);fields.add(field);StructType schema = DataTypes.createStructType(fields);// now you can create the dataframe:DataFrame df= sqlContext.createDataFrame(rowRDD, schema);    

A couple additional notes:

  • Why are you flatmaping when you are only taking the first element? You could have simply done:

    JavaRDD<String> urlrdd=diskfile.flatMap(line -> line.split("\t")[0]);

  • I assume in real life you would want to remove the '[' from the url (you can easily do this in the map).

  • If you are moving to spark 2.0 or later then instead of sqlContext you should be using spark session (spark).

  • You can create a single dataframe with all columns. You can do this by adding all fields to the schema (i.e. instead of just doing a single add to the fields add all of them). Instead of using urlrdd, use diskfile and do the split inside the "public Row call" creation. This would be something like this:

    JavaRDD<Row> rowRDD = diskfile.map(new Function<String, Row>() { @override public Row call(String record) throws Exception { String[] recs = record.split("\t") return RowFactory.create(recs[0], recs[1], ...); }});

  • You can create it directly: Just use

    sqlContext.read.option("sep","\t").csv.load(filename,schema)


Just flatmap your data according to 7 column table and use code snippet below

String[] columns = new String[7] {"clumn1","column2","column3","column4","column5","column6","column7"};List<String> tableColumns = Arrays.asList(columns);StrucType schema = createSchema(tableColumns);    public StructType createSchema(List<String> tableColumns){        List<StructField> fields  = new ArrayList<StructField>();        for(String column : tableColumns){                         fields.add(DataTypes.createStructField(column, DataTypes.StringType, true));                    }        return DataTypes.createStructType(fields);    }sqlContext.createDataFrame(urlRDD, schema);