Create Spark Dataframe from SQL Query
I found this here Bulk data migration through Spark SQL
The dbname parameter can be any query wrapped in parenthesis with an alias. So in my case, I need to do this:
val query = """ (select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d join DialogLine as dl on dl.DialogID=d.DialogID join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID join WordRoot as wr on wr.WordRootID=wi.WordRootID where d.InSite=1 and dl.Active=1 limit 100) foo"""val df = sqlContext.format("jdbc"). option("url", "jdbc:mysql://localhost:3306/local_content"). option("driver", "com.mysql.jdbc.Driver"). option("useUnicode", "true"). option("continueBatchOnError","true"). option("useSSL", "false"). option("user", "root"). option("password", ""). option("dbtable",query). load()
As expected, loading each table as its own Dataframe and joining them in Spark was very inefficient.
If you have your table
already registered in your SQLContext, you could simply use sql
method.
val resultDF = sqlContext.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")
TL;DR: just create a view in your database.
Detail:I have a table t_city in my postgres database, on which I create a view:
create view v_city_3500 as select asciiname, country, population, elevation from t_city where elevation>3500 and population>100000select * from v_city_3500; asciiname | country | population | elevation-----------+---------+------------+----------- Potosi | BO | 141251 | 3967 Oruro | BO | 208684 | 3936 La Paz | BO | 812799 | 3782 Lhasa | CN | 118721 | 3651 Puno | PE | 116552 | 3825 Juliaca | PE | 245675 | 3834
In the spark-shell:
val sx= new org.apache.spark.sql.SQLContext(sc)var props=new java.util.Properties()props.setProperty("driver", "org.postgresql.Driver" )val url="jdbc:postgresql://buya/dmn?user=dmn&password=dmn"val city_df=sx.read.jdbc(url=url,table="t_city",props)val city_3500_df=sx.read.jdbc(url=url,table="v_city_3500",props)
Result:
city_df.count()Long = 145725city_3500_df.count()Long = 6