Converting Pandas DataFrame to Spark DataFrame Converting Pandas DataFrame to Spark DataFrame pandas pandas

Converting Pandas DataFrame to Spark DataFrame


I am not sure if this question is still relevant to the current version of pySpark, but here is the solution I worked out a couple weeks after posting this question. The code is rather ugly and possibly inefficient, but I am posting it here due to the continued interest in this question.:

from pyspark import SparkContextfrom pyspark.sql import HiveContextfrom pyspark import SparkConffrom py4j.protocol import Py4JJavaErrormyConf = SparkConf(loadDefaults=True)sc = SparkContext(conf=myConf)hc = HiveContext(sc)def chunks(lst, k):    """Yield k chunks of close to equal size"""    n = len(lst) / k    for i in range(0, len(lst), n):        yield lst[i: i + n]def reconstruct_rdd(lst, num_parts):    partitions = chunks(lst, num_parts)    for part in range(0, num_parts - 1):        print "Partition ", part, " started..."        partition = next(partitions)    # partition is a list of lists        if part == 0:            prime_rdd = sc.parallelize(partition)        else:            second_rdd = sc.parallelize(partition)            prime_rdd = prime_rdd.union(second_rdd)        print "Partition ", part, " complete!"    return prime_rdddef build_col_name_list(len_cols):    name_lst = []    for i in range(1, len_cols):        idx = "_" + str(i)        name_lst.append(idx)    return name_lstdef set_spark_df_header(header, sdf):    oldColumns = build_col_name_lst(len(sdf.columns))    newColumns = header    sdf = reduce(lambda sdf, idx: sdf.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), sdf)    return sdfdef convert_pdf_matrix_to_sdf(pdf, sdf_header, num_of_parts):    try:        sdf = hc.createDataFrame(pdf)    except ValueError:        lst = pdf.values.tolist()   #Need to convert to list of list to parallelize        try:            rdd = sc.parallelize(lst)        except Py4JJavaError:            rdd = reconstruct_rdd(lst, num_of_parts)            sdf = hc.createDataFrame(rdd)            sdf = set_spark_df_header(sdf_header, sdf)    return sdf


to_sparse(fill_value=0) is basically obsolete. Just use standard variant

sqlContext.createDataFrame(pd.DataFrame(csc_mat.todense()))

and as long as types are compatible you'd be fine.