mommomonthewind
mommomonthewind

Reputation: 4650

Applying a function in each row of a big PySpark dataframe?

I have a big dataframe (~30M rows). I have a function f. The business of f is to run through each row, check some logics and feed the outputs into a dictionary. The function needs to be performed row by row.

I tried:

dic = dict() for row in df.rdd.collect(): f(row, dic)

But I always meet the error OOM. I set the memory of Docker to 8GB.

How can I effectively perform the business?

Upvotes: 13

Views: 23031

Answers (2)

Prem
Prem

Reputation: 11985

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType, MapType

#sample data
df = sc.parallelize([
    ['a', 'b'],
    ['c', 'd'],
    ['e', 'f']
]).toDF(('col1', 'col2'))

#add logic to create dictionary element using rows of the dataframe    
def add_to_dict(l):
    d = {}
    d[l[0]] = l[1]
    return d
add_to_dict_udf = udf(add_to_dict, MapType(StringType(), StringType()))
#struct is used to pass rows of dataframe
df = df.withColumn("dictionary_item", add_to_dict_udf(struct([df[x] for x in df.columns])))
df.show()

#list of dictionary elements
dictionary_list = [i[0] for i in df.select('dictionary_item').collect()]
print dictionary_list

Output is:

[{u'a': u'b'}, {u'c': u'd'}, {u'e': u'f'}]

Upvotes: 9

akoeltringer
akoeltringer

Reputation: 1721

By using collect you pull all the data out of the Spark Executors into your Driver. You really should avoid this, as it makes using Spark pointless (you could just use plain python in that case).

What could you do:

Upvotes: 5

Related Questions