Bookamp
Bookamp

Reputation: 682

Spark RDD.map use within a spark dataframe withColumn method

I have the following code:

from pyspark import *;
from pyspark.sql import *;
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, DoubleType
import math;

sc = SparkContext.getOrCreate();
spark = SparkSession.builder.master('local').getOrCreate();


schema = StructType([
    StructField("INDEX", IntegerType(), True),
    StructField("SYMBOL", StringType(), True),
    StructField("DATETIMETS", StringType(), True),
    StructField("PRICE", DoubleType(), True),
    StructField("SIZE", IntegerType(), True),
])

df = spark\
    .createDataFrame(
        data=[(0,'A','2002-12-02 9:30:20',19.75,30200),
             (1,'A','2002-12-02 9:31:20',19.75,30200),             
             (8,'A','2004-12-02 10:36:20',1.0,30200),
             (9,'A','2006-12-02 22:41:20',20.0,30200),
             (10,'A','2006-12-02 22:42:20',40.0,30200)],
        schema=schema);

I then do some computations without using spark. This works fine.

def without_spark(price):    
    first_summation = sum(map(lambda n: math.sqrt(price), range(1,10)));
    return first_summation;

u_without_spark = udf(without_spark, DoubleType())

df.withColumn("NEW_COL", u_without_spark('PRICE')).show()

No Spark Output

The following code that uses rdd parallelization however does not.

def with_spark(price):    
    rdd = sc.parallelize(1, 10)
    first_summation = rdd.map(lambda n: math.sqrt(price));
    return first_summation.sum();

u_with_spark = udf(with_spark, DoubleType())

df.withColumn("NEW_COL", u_with_spark('PRICE')).show()

Spark Error

Is what I am trying to do not possible? Is there a faster way to do this?

Thanks for your help

Upvotes: 1

Views: 3143

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

I then do some computations without using spark

When you created dataframe, you used SparkSession, so you already are using spark. udf and withColumn are spark dataframe's apis which are used to transform dataframe.

Dataframes are distributed in nature i.e. all the transformations on dataframes are done in worker nodes. So the udf by using withColumn transformation are all done on worker nodes. You created sparkContext (sc) in the driver node which cannot be used in transformations.

Is what I am trying to do not possible? Is there a faster way to do this?

Your second implementation is wrong as you are trying to access sparkContext from within transformation.

Your first method seems to be working fine and is already using spark. So I guess you don't need to look for alternatives.

Upvotes: 1

Assaf Mendelson
Assaf Mendelson

Reputation: 13001

You cannot call any RDD methods from within a UDF.

When you create a UDF, it runs on the workers. RDD or dataframe operations can only run on the driver and therefore are not allowed in the UDF.

It seems as if your goal is to do a UDAF (User Defined Aggregate Method). This cannot be done from pyspark. You have two options for this. Either use collect_list and then do a UDF on the resulting array or write the UDAF in scala and wrap it for pyspark.

Upvotes: 1

Related Questions