Converting Pandas dataframe into Spark dataframe error Converting Pandas dataframe into Spark dataframe error python python

Converting Pandas dataframe into Spark dataframe error


I made this script, It worked for my 10 pandas Data frames

from pyspark.sql.types import *# Auxiliar functionsdef equivalent_type(f):    if f == 'datetime64[ns]': return TimestampType()    elif f == 'int64': return LongType()    elif f == 'int32': return IntegerType()    elif f == 'float64': return FloatType()    else: return StringType()def define_structure(string, format_type):    try: typo = equivalent_type(format_type)    except: typo = StringType()    return StructField(string, typo)# Given pandas dataframe, it will return a spark's dataframe.def pandas_to_spark(pandas_df):    columns = list(pandas_df.columns)    types = list(pandas_df.dtypes)    struct_list = []    for column, typo in zip(columns, types):       struct_list.append(define_structure(column, typo))    p_schema = StructType(struct_list)    return sqlContext.createDataFrame(pandas_df, p_schema)

You can see it also in this gist

With this you just have to call spark_df = pandas_to_spark(pandas_df)


Type related errors can be avoided by imposing a schema as follows:

note: a text file was created (test.csv) with the original data (as above) and hypothetical column names were inserted ("col1","col2",...,"col25").

import pysparkfrom pyspark.sql import SparkSessionimport pandas as pdspark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()pdDF = pd.read_csv("test.csv")

contents of the pandas data frame:

       col1     col2    col3    col4    col5    col6    col7    col8   ... 0      10000001 1       0       1       12:35   OK      10002   1      ...1      10000001 2       0       1       12:36   OK      10002   1      ...2      10000002 1       0       4       12:19   PA      10003   1      ...

Next, create the schema:

from pyspark.sql.types import *mySchema = StructType([ StructField("col1", LongType(), True)\                       ,StructField("col2", IntegerType(), True)\                       ,StructField("col3", IntegerType(), True)\                       ,StructField("col4", IntegerType(), True)\                       ,StructField("col5", StringType(), True)\                       ,StructField("col6", StringType(), True)\                       ,StructField("col7", IntegerType(), True)\                       ,StructField("col8", IntegerType(), True)\                       ,StructField("col9", IntegerType(), True)\                       ,StructField("col10", IntegerType(), True)\                       ,StructField("col11", StringType(), True)\                       ,StructField("col12", StringType(), True)\                       ,StructField("col13", IntegerType(), True)\                       ,StructField("col14", IntegerType(), True)\                       ,StructField("col15", IntegerType(), True)\                       ,StructField("col16", IntegerType(), True)\                       ,StructField("col17", IntegerType(), True)\                       ,StructField("col18", IntegerType(), True)\                       ,StructField("col19", IntegerType(), True)\                       ,StructField("col20", IntegerType(), True)\                       ,StructField("col21", IntegerType(), True)\                       ,StructField("col22", IntegerType(), True)\                       ,StructField("col23", IntegerType(), True)\                       ,StructField("col24", IntegerType(), True)\                       ,StructField("col25", IntegerType(), True)])

Note: True (implies nullable allowed)

create the pyspark dataframe:

df = spark.createDataFrame(pdDF,schema=mySchema)

confirm the pandas data frame is now a pyspark data frame:

type(df)

output:

pyspark.sql.dataframe.DataFrame

Aside:

To address Kate's comment below - to impose a general (String) schema you can do the following:

df=spark.createDataFrame(pdDF.astype(str)) 


You need to make sure your pandas dataframe columns are appropriate for the type spark is inferring. If your pandas dataframe lists something like:

pd.info()<class 'pandas.core.frame.DataFrame'>RangeIndex: 5062 entries, 0 to 5061Data columns (total 51 columns):SomeCol                    5062 non-null objectCol2                       5062 non-null object

And you're getting that error try:

df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)

Now, make sure .astype(str) is actually the type you want those columns to be. Basically, when the underlying Java code tries to infer the type from an object in python it uses some observations and makes a guess, if that guess doesn't apply to all the data in the column(s) it's trying to convert from pandas to spark it will fail.