sopana
sopana

Reputation: 375

How to read a column from Pyspark RDD and apply UDF on it?

I'm creating a DF by reading a csv file in Pyspark and then converting into RDD to apply UDF. It throws an error while applying the UDF.

Here's my code snippet -

# My UDF definition
def my_udf(string_array):
    // some code //
    return float_var

spark.udf.register("my_udf", my_udf, FloatType())

#Read from csv file
read_data=spark.read.format("csv").load("/path/to/file/part-*.csv", header="true")

rdd = read_data.rdd

get_df = rdd.map(lambda x: (x[0], x[1], my_udf(x[2]))).toDF(["col1", "col2","col3"])

Sample data in read_data DF -

[Row(Id='ABCD505936', some_string='XCOYNZGAE', array='[0, 2, 5, 6, 8, 10, 12, 13, 14, 15]')]

The schema of the DF created by reading from CSV file -

print (read_data.schema)
StructType(List(StructField(col1,StringType,true),StructField(col2,StringType,true),StructField(col3,StringType,true)))

I get following error while applying UDF at the get_df line -

Traceback (most recent call last): File "", line 1, in File "/usr/lib/spark/python/pyspark/sql/session.py", line 58, in toDF return sparkSession.createDataFrame(self, schema, sampleRatio) File "/usr/lib/spark/python/pyspark/sql/session.py", line 746, in createDataFrame rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) File "/usr/lib/spark/python/pyspark/sql/session.py", line 390, in _createFromRDD struct = self._inferSchema(rdd, samplingRatio, names=schema) File "/usr/lib/spark/python/pyspark/sql/session.py", line 377, in _inferSchema raise ValueError("Some of types cannot be determined by the " ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling

Can anyone please help me in passing the array (datatype is string) to the UDF?

Upvotes: 0

Views: 4910

Answers (1)

E.ZY.
E.ZY.

Reputation: 725

Two things:

  1. if convert DF to RDD you don't need to register my_udf as a udf. If you register udf, you directly apply to df like read_data.withColumn("col3", my_udf(F.col("col3")))

  2. the problem you encountered is at toDF step, that you dont specify the schema of the new DF when converted from RDD and spark is trying to infer type from sample data, but in your case, the implicit type hint is not working. I will manually create the schema and pass into the toDF like this

from pyspark.sql.types import StringType, FloatType, StructField, StructType
get_schema = StructType(
[StructField('col1', StringType(), True),
 StructField('col2', StringType(), True)
 StructField('col3', FloatType(), True)]
)
get_df = rdd.map(lambda x: (x[0], x[1], my_udf(x[2]))).toDF(get_schema)

Upvotes: 1

Related Questions