Reputation: 21
I have a spark data frame of three columns and 1000 rows. (Integer, Integer and DateType). I have a separate python function which takes each value of every row and does some processing. How to pass these three values iteration and collect the output to a dataframe
Upvotes: 2
Views: 2186
Reputation: 680
The following example makes use of two bits that you were probably missing:
You'll see a call to DataFrame.rdd.map() and RDD.toDF(). These are the methods that facilitate the shift between these two representations.
from pyspark import SparkConf, SparkContext, HiveContext
from datetime import datetime, timedelta
# boring setup
sconf = SparkConf()
sconf.setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=sconf)
hc = HiveContext(sc)
# define your transformation functions
def process_column_a(val):
return val * 2
def process_column_b(val):
return val * 3
def process_column_c(val):
return val + timedelta(days=1)
# this wrapper isn't required but makes calling the transformations easier
def process_row(val_a, val_b, val_c):
return (process_column_a(val_a),
process_column_b(val_b),
process_column_c(val_c))
# mocking up some data in the shape you specified
data = ((i, -i, datetime.now() + timedelta(days=i)) for i in range(1000))
initial_dataframe = hc.createDataFrame(data, ["col_a", "col_b", "col_c"])
# call the processing functions in a map over an rdd representation of the data
processed_rdd = initial_dataframe.rdd.map(lambda x: process_row(*x))
# convert the processed rdd back to a dataframe
finished_df = processed_rdd.toDF(initial_dataframe.columns)
# examine the result
finished_df.show()
Upvotes: 2