Create Spark Dataframe from SQL Query Create Spark Dataframe from SQL Query mysql mysql

Create Spark Dataframe from SQL Query


I found this here Bulk data migration through Spark SQL

The dbname parameter can be any query wrapped in parenthesis with an alias. So in my case, I need to do this:

val query = """  (select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d    join DialogLine as dl on dl.DialogID=d.DialogID    join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID    join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID    join WordRoot as wr on wr.WordRootID=wi.WordRootID    where d.InSite=1 and dl.Active=1    limit 100) foo"""val df = sqlContext.format("jdbc").  option("url", "jdbc:mysql://localhost:3306/local_content").  option("driver", "com.mysql.jdbc.Driver").  option("useUnicode", "true").  option("continueBatchOnError","true").  option("useSSL", "false").  option("user", "root").  option("password", "").  option("dbtable",query).  load()

As expected, loading each table as its own Dataframe and joining them in Spark was very inefficient.


If you have your table already registered in your SQLContext, you could simply use sql method.

val resultDF = sqlContext.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")


TL;DR: just create a view in your database.

Detail:I have a table t_city in my postgres database, on which I create a view:

create view v_city_3500 as    select asciiname, country, population, elevation    from t_city    where elevation>3500    and population>100000select * from v_city_3500; asciiname | country | population | elevation-----------+---------+------------+----------- Potosi    | BO      |     141251 |      3967 Oruro     | BO      |     208684 |      3936 La Paz    | BO      |     812799 |      3782 Lhasa     | CN      |     118721 |      3651 Puno      | PE      |     116552 |      3825 Juliaca   | PE      |     245675 |      3834

In the spark-shell:

val sx= new org.apache.spark.sql.SQLContext(sc)var props=new java.util.Properties()props.setProperty("driver", "org.postgresql.Driver" )val url="jdbc:postgresql://buya/dmn?user=dmn&password=dmn"val city_df=sx.read.jdbc(url=url,table="t_city",props)val city_3500_df=sx.read.jdbc(url=url,table="v_city_3500",props)

Result:

city_df.count()Long = 145725city_3500_df.count()Long = 6