Best way to get the max value in a Spark dataframe column Best way to get the max value in a Spark dataframe column python python

Best way to get the max value in a Spark dataframe column


>df1.show()+-----+--------------------+--------+----------+-----------+|floor|           timestamp|     uid|         x|          y|+-----+--------------------+--------+----------+-----------+|    1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418||    1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393||    1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585||    1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073|>row1 = df1.agg({"x": "max"}).collect()[0]>print row1Row(max(x)=110.33613)>print row1["max(x)"]110.33613

The answer is almost the same as method3. but seems the "asDict()" in method3 can be removed


Max value for a particular column of a dataframe can be achieved by using -

your_max_value = df.agg({"your-column": "max"}).collect()[0][0]


Remark: Spark is intended to work on Big Data - distributed computing. The size of the example DataFrame is very small, so the order of real-life examples can be altered with respect to the small example.

Slowest: Method_1, because .describe("A") calculates min, max, mean, stddev, and count (5 calculations over the whole column).

Medium: Method_4, because, .rdd (DF to RDD transformation) slows down the process.

Faster: Method_3 ~ Method_2 ~ Method_5, because the logic is very similar, so Spark's catalyst optimizer follows very similar logic with minimal number of operations (get max of a particular column, collect a single-value dataframe; .asDict() adds a little extra-time comparing 2, 3 vs. 5)

import pandas as pdimport timetime_dict = {}dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])#--  For bigger/realistic dataframe just uncomment the following 3 lines#lst = list(np.random.normal(0.0, 100.0, 100000))#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst})#dfff = self.sqlContext.createDataFrame(pdf)tic1 = int(round(time.time() * 1000))# Method 1: Use describe()max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])tac1 = int(round(time.time() * 1000))time_dict['m1']= tac1 - tic1print (max_val)tic2 = int(round(time.time() * 1000))# Method 2: Use SQLdfff.registerTempTable("df_table")max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']tac2 = int(round(time.time() * 1000))time_dict['m2']= tac2 - tic2print (max_val)tic3 = int(round(time.time() * 1000))# Method 3: Use groupby()max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)']tac3 = int(round(time.time() * 1000))time_dict['m3']= tac3 - tic3print (max_val)tic4 = int(round(time.time() * 1000))# Method 4: Convert to RDDmax_val = dfff.select("A").rdd.max()[0]tac4 = int(round(time.time() * 1000))time_dict['m4']= tac4 - tic4print (max_val)tic5 = int(round(time.time() * 1000))# Method 5: Use agg()max_val = dfff.agg({"A": "max"}).collect()[0][0]tac5 = int(round(time.time() * 1000))time_dict['m5']= tac5 - tic5print (max_val)print time_dict

Result on an edge-node of a cluster in milliseconds (ms):

small DF (ms): {'m1': 7096, 'm2': 205, 'm3': 165, 'm4': 211, 'm5': 180}

bigger DF (ms): {'m1': 10260, 'm2': 452, 'm3': 465, 'm4': 916, 'm5': 373}