Pyspark: show histogram of a data frame column Pyspark: show histogram of a data frame column python python

Pyspark: show histogram of a data frame column


Unfortunately I don't think that there's a clean plot() or hist() function in the PySpark Dataframes API, but I'm hoping that things will eventually go in that direction.

For the time being, you could compute the histogram in Spark, and plot the computed histogram as a bar chart. Example:

import pandas as pdimport pyspark.sql as sparksql# Let's use UCLA's college admission datasetfile_name = "https://stats.idre.ucla.edu/stat/data/binary.csv"# Creating a pandas dataframe from Sample Datadf_pd = pd.read_csv(file_name)sql_context = sparksql.SQLcontext(sc)# Creating a Spark DataFrame from a pandas dataframedf_spark = sql_context.createDataFrame(df_pd)df_spark.show(5)

This is what the data looks like:

Out[]:    +-----+---+----+----+          |admit|gre| gpa|rank|          +-----+---+----+----+          |    0|380|3.61|   3|          |    1|660|3.67|   3|          |    1|800| 4.0|   1|          |    1|640|3.19|   4|          |    0|520|2.93|   4|          +-----+---+----+----+          only showing top 5 rows# This is what we wantdf_pandas.hist('gre');

Histogram when plotted in using df_pandas.hist()

# Doing the heavy lifting in Spark. We could leverage the `histogram` function from the RDD apigre_histogram = df_spark.select('gre').rdd.flatMap(lambda x: x).histogram(11)# Loading the Computed Histogram into a Pandas Dataframe for plottingpd.DataFrame(    list(zip(*gre_histogram)),     columns=['bin', 'frequency']).set_index(    'bin').plot(kind='bar');

Histogram computed by using RDD.histogram()


You can now use the pyspark_dist_explore package to leverage the matplotlib hist function for Spark DataFrames:

from pyspark_dist_explore import histimport matplotlib.pyplot as pltfig, ax = plt.subplots()hist(ax, my_df.select('field_1'), bins = 20, color=['red'])

This library uses the rdd histogram function to calculate bin values.


Another solution, without the need for extra imports,which should also be efficient; First, use window partition:

import pyspark.sql.functions as Fimport pyspark.sql as SQLwin = SQL.Window.partitionBy('column_of_values')

Then all you need it to use count aggregation partitioned by the window:

df.select(F.count('column_of_values').over(win).alias('histogram'))

The aggregative operators happens on each partition of the cluster, and does not require an extra round-trip to the host.