Pyspark DataFrame UDF on Text Column Pyspark DataFrame UDF on Text Column python python

Pyspark DataFrame UDF on Text Column


Your dataset isn't clean. 985 lines split('\t') to only one value:

>>> from operator import add>>> lines = sc.textFile("classified_tweets.txt")>>> parts = lines.map(lambda l: l.split("\t"))>>> parts.map(lambda l: (len(l), 1)).reduceByKey(add).collect()[(2, 149195), (1, 985)]>>> parts.filter(lambda l: len(l) == 1).take(5)[['"show me the money!”  at what point do you start trying to monetize your #startup? tweet us with #startuplife.'], ['a good pitch can mean money in the bank for your #startup. see how body language plays a key role:  (via: ajalumnify)'], ['100+ apps in five years? @2359media did it using microsoft #azure:  #azureapps'], ['does buying better coffee make you a better leader? little things can make a big difference:  (via: @jmbrandonbb)'], ['.@msftventures graduates pitched\xa0#homeautomation #startups to #vcs! check out how they celebrated: ']]

So changing your code to:

>>> training = parts.filter(lambda l: len(l) == 2).map(lambda p: (p[0], p[1].strip()))>>> training_df = sqlContext.createDataFrame(training, ["tweet", "classification"])>>> df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))>>> df.show(5)+--------------------+--------------+---------+|               tweet|classification|    dummy|+--------------------+--------------+---------+|rt @jiffyclub: wi...|        python|dummyData||rt @arnicas: ipyt...|        python|dummyData||rt @treycausey: i...|        python|dummyData||what's my best op...|        python|dummyData||rt @raymondh: #py...|        python|dummyData|+--------------------+--------------+---------+only showing top 5 rows


I think you're misdefining the problem, and maybe simplifying your lambda for the purposes of this question but hiding the real problem.

Your stack trace reads

File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda>IndexError: list index out of range

When I run this code it works fine:

import pysparkfrom pyspark.sql import SQLContextfrom pyspark.sql.types import *from pyspark.sql import SQLContextfrom pyspark.sql.functions import udftraining_df = sqlContext.sql("select 'foo' as tweet, 'bar' as classification")def dummy_function(data_str):     cleaned_str = 'dummyData'     return cleaned_strdummy_function_udf = udf(dummy_function, StringType())df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))df.show()+-----+--------------+---------+|tweet|classification|    dummy|+-----+--------------+---------+|  foo|           bar|dummyData|+-----+--------------+---------+

Are you sure there isn't some other bug in your dummy_function_udf? What is the 'real' udf you are using - apart from this sample version?


Below one works with the spark2,

import hashlibimport uuidimport datetimefrom pyspark.sql.types import StringTypedef customencoding(s):    m = hashlib.md5()    m.update(s.encode('utf-8'))    d = m.hexdigest()    return dspark.udf.register("udf_customhashing32adadf", customencoding, StringType())spark.sql("SELECT udf_customhashing32adadf('test') as rowid").show(10, False)

You can implement it in the same way.