Reputation: 682
I have the following code:
from pyspark import *;
from pyspark.sql import *;
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, DoubleType
import math;
sc = SparkContext.getOrCreate();
spark = SparkSession.builder.master('local').getOrCreate();
schema = StructType([
StructField("INDEX", IntegerType(), True),
StructField("SYMBOL", StringType(), True),
StructField("DATETIMETS", StringType(), True),
StructField("PRICE", DoubleType(), True),
StructField("SIZE", IntegerType(), True),
])
df = spark\
.createDataFrame(
data=[(0,'A','2002-12-02 9:30:20',19.75,30200),
(1,'A','2002-12-02 9:31:20',19.75,30200),
(8,'A','2004-12-02 10:36:20',1.0,30200),
(9,'A','2006-12-02 22:41:20',20.0,30200),
(10,'A','2006-12-02 22:42:20',40.0,30200)],
schema=schema);
I then do some computations without using spark. This works fine.
def without_spark(price):
first_summation = sum(map(lambda n: math.sqrt(price), range(1,10)));
return first_summation;
u_without_spark = udf(without_spark, DoubleType())
df.withColumn("NEW_COL", u_without_spark('PRICE')).show()
The following code that uses rdd parallelization however does not.
def with_spark(price):
rdd = sc.parallelize(1, 10)
first_summation = rdd.map(lambda n: math.sqrt(price));
return first_summation.sum();
u_with_spark = udf(with_spark, DoubleType())
df.withColumn("NEW_COL", u_with_spark('PRICE')).show()
Is what I am trying to do not possible? Is there a faster way to do this?
Thanks for your help
Upvotes: 1
Views: 3143
Reputation: 41957
I then do some computations without using spark
When you created dataframe
, you used SparkSession, so you already are using spark. udf
and withColumn
are spark dataframe's apis which are used to transform dataframe
.
Dataframes
are distributed in nature i.e. all the transformations on dataframes
are done in worker nodes. So the udf
by using withColumn
transformation
are all done on worker nodes. You created sparkContext
(sc
) in the driver node which cannot be used in transformations.
Is what I am trying to do not possible? Is there a faster way to do this?
Your second implementation is wrong as you are trying to access sparkContext
from within transformation.
Your first method seems to be working fine and is already using spark. So I guess you don't need to look for alternatives.
Upvotes: 1
Reputation: 13001
You cannot call any RDD methods from within a UDF.
When you create a UDF, it runs on the workers. RDD or dataframe operations can only run on the driver and therefore are not allowed in the UDF.
It seems as if your goal is to do a UDAF (User Defined Aggregate Method). This cannot be done from pyspark. You have two options for this. Either use collect_list and then do a UDF on the resulting array or write the UDAF in scala and wrap it for pyspark.
Upvotes: 1