Reputation: 25366
I wanted to convert the spark data frame to add using the code below:
from pyspark.mllib.clustering import KMeans
spark_df = sqlContext.createDataFrame(pandas_df)
rdd = spark_df.map(lambda data: Vectors.dense([float(c) for c in data]))
model = KMeans.train(rdd, 2, maxIterations=10, runs=30, initializationMode="random")
The detailed error message is:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-11-a19a1763d3ac> in <module>()
1 from pyspark.mllib.clustering import KMeans
2 spark_df = sqlContext.createDataFrame(pandas_df)
----> 3 rdd = spark_df.map(lambda data: Vectors.dense([float(c) for c in data]))
4 model = KMeans.train(rdd, 2, maxIterations=10, runs=30, initializationMode="random")
/home/edamame/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in __getattr__(self, name)
842 if name not in self.columns:
843 raise AttributeError(
--> 844 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
845 jc = self._jdf.apply(name)
846 return Column(jc)
AttributeError: 'DataFrame' object has no attribute 'map'
Does anyone know what I did wrong here? Thanks!
Upvotes: 53
Views: 162483
Reputation: 6999
You can use df.rdd.map()
, as DataFrame does not have map
or flatMap
, but be aware of the implications of using df.rdd
:
Converting to RDD breaks Dataframe lineage, there is no predicate pushdown, no column prunning, no SQL plan and less efficient PySpark transformations.
What should you do instead?
Keep in mind that the high-level DataFrame API is equipped with many alternatives. First, you can use select
or selectExpr
.
Another example is using explode instead of flatMap
(which existed in RDD):
df.select($"name",explode($"knownLanguages"))
.show(false)
Result:
+-------+------+
|name |col |
+-------+------+
|James |Java |
|James |Scala |
|Michael|Spark |
|Michael|Java |
|Michael|null |
|Robert |CSharp|
|Robert | |
+-------+------+
You can also use withColumn
or UDF
, depending on the use-case, or another option in the DataFrame API.
Upvotes: 1
Reputation: 11573
You can't map
a dataframe, but you can convert the dataframe to an RDD and map that by doing spark_df.rdd.map()
. Prior to Spark 2.0, spark_df.map
would alias to spark_df.rdd.map()
. With Spark 2.0, you must explicitly call .rdd
first.
Upvotes: 111