Pyspark DataFrame UDF on Text Column
Your dataset isn't clean. 985 lines split('\t')
to only one value:
>>> from operator import add>>> lines = sc.textFile("classified_tweets.txt")>>> parts = lines.map(lambda l: l.split("\t"))>>> parts.map(lambda l: (len(l), 1)).reduceByKey(add).collect()[(2, 149195), (1, 985)]>>> parts.filter(lambda l: len(l) == 1).take(5)[['"show me the money!” at what point do you start trying to monetize your #startup? tweet us with #startuplife.'], ['a good pitch can mean money in the bank for your #startup. see how body language plays a key role: (via: ajalumnify)'], ['100+ apps in five years? @2359media did it using microsoft #azure: #azureapps'], ['does buying better coffee make you a better leader? little things can make a big difference: (via: @jmbrandonbb)'], ['.@msftventures graduates pitched\xa0#homeautomation #startups to #vcs! check out how they celebrated: ']]
So changing your code to:
>>> training = parts.filter(lambda l: len(l) == 2).map(lambda p: (p[0], p[1].strip()))>>> training_df = sqlContext.createDataFrame(training, ["tweet", "classification"])>>> df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))>>> df.show(5)+--------------------+--------------+---------+| tweet|classification| dummy|+--------------------+--------------+---------+|rt @jiffyclub: wi...| python|dummyData||rt @arnicas: ipyt...| python|dummyData||rt @treycausey: i...| python|dummyData||what's my best op...| python|dummyData||rt @raymondh: #py...| python|dummyData|+--------------------+--------------+---------+only showing top 5 rows
I think you're misdefining the problem, and maybe simplifying your lambda for the purposes of this question but hiding the real problem.
Your stack trace reads
File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda>IndexError: list index out of range
When I run this code it works fine:
import pysparkfrom pyspark.sql import SQLContextfrom pyspark.sql.types import *from pyspark.sql import SQLContextfrom pyspark.sql.functions import udftraining_df = sqlContext.sql("select 'foo' as tweet, 'bar' as classification")def dummy_function(data_str): cleaned_str = 'dummyData' return cleaned_strdummy_function_udf = udf(dummy_function, StringType())df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))df.show()+-----+--------------+---------+|tweet|classification| dummy|+-----+--------------+---------+| foo| bar|dummyData|+-----+--------------+---------+
Are you sure there isn't some other bug in your dummy_function_udf
? What is the 'real' udf you are using - apart from this sample version?
Below one works with the spark2,
import hashlibimport uuidimport datetimefrom pyspark.sql.types import StringTypedef customencoding(s): m = hashlib.md5() m.update(s.encode('utf-8')) d = m.hexdigest() return dspark.udf.register("udf_customhashing32adadf", customencoding, StringType())spark.sql("SELECT udf_customhashing32adadf('test') as rowid").show(10, False)
You can implement it in the same way.