Write data from pyspark to ElasticSearch
I had the same problem.
After reading this article, I found the answer!!!
You have to convert to PythonRDD
Type like this:
>>> type(df)<class 'pyspark.sql.dataframe.DataFrame'>>>> type(df.rdd)<class 'pyspark.rdd.RDD'>>>> df.rdd.saveAsNewAPIHadoopFile(...) # Got the same error message>>> df.printSchema() # My schemaroot |-- id: string (nullable = true) ...# Let's convert to PythonRDD>>> python_rdd = df.map(lambda item: ('key', {... 'id': item['id'], ...... }))>>> python_rddPythonRDD[42] at RDD at PythonRDD.scala:43>>> python_rdd.saveAsNewAPIHadoopFile(...) # Now, success
saveAsNewAPIHadoopFile is in RDD ,
http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
I guess this line should be
es_df_pf.rdd.saveAsNewAPIHadoopFile