ABHINAV MAZUMDAR
ABHINAV MAZUMDAR

Reputation: 21

Spark Dataframe Pass Row rdd to a Python Function

I have a spark data frame of three columns and 1000 rows. (Integer, Integer and DateType). I have a separate python function which takes each value of every row and does some processing. How to pass these three values iteration and collect the output to a dataframe

Upvotes: 2

Views: 2186

Answers (1)

dayman
dayman

Reputation: 680

The following example makes use of two bits that you were probably missing:

  • DataFrames can be represented as RDDs of row objects
  • It is possible to move between the DataFrame and RDD representation of your data at will

You'll see a call to DataFrame.rdd.map() and RDD.toDF(). These are the methods that facilitate the shift between these two representations.

from pyspark import SparkConf, SparkContext, HiveContext
from datetime import datetime, timedelta

# boring setup
sconf = SparkConf()
sconf.setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=sconf)
hc = HiveContext(sc)


# define your transformation functions
def process_column_a(val):
    return val * 2

def process_column_b(val):
    return val * 3

def process_column_c(val):
    return val + timedelta(days=1)

# this wrapper isn't required but makes calling the transformations easier
def process_row(val_a, val_b, val_c):
    return (process_column_a(val_a), 
            process_column_b(val_b), 
            process_column_c(val_c))


# mocking up some data in the shape you specified
data = ((i, -i, datetime.now() + timedelta(days=i)) for i in range(1000))
initial_dataframe = hc.createDataFrame(data, ["col_a", "col_b", "col_c"])

# call the processing functions in a map over an rdd representation of the data
processed_rdd = initial_dataframe.rdd.map(lambda x: process_row(*x))

# convert the processed rdd back to a dataframe
finished_df = processed_rdd.toDF(initial_dataframe.columns)

# examine the result
finished_df.show()

Upvotes: 2

Related Questions