Yana
Yana

Reputation: 87

How to pass dataframe to pyspark parallel operation?

I'm trying to filter the data frame by values of salary then saving them as CSV files using pyspark.

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [('James','M',30),('Anna','F',41),
  ('Robert','M',62),('yana','M',30) 
]
columns = ["firstname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
spark.sparkContext.parallelize([30,41,62]).foreach(lambda x : df.filter(df.salary == x).show())

while running this code it returns

_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object

(EDITED)

More I need to perform a file write concurrently spark filters the data frame

spark.sparkContext.parallelize([30,41,62]).foreach(lambda x : df.filter(df.salary == x).write.save((f'/path-to-drive-location/{driver}'))

Upvotes: 0

Views: 695

Answers (1)

pltc
pltc

Reputation: 6082

You don't want to deal with RDD and foreach or map. You can just create another dataframe and perform an inner join (or any other join you want).

salaries = [30, 41, 62]

a = spark.createDataFrame([(s,) for s in salaries], ['salary'])
+------+
|salary|
+------+
|    30|
|    41|
|    62|
+------+

df2 = df.join(a, on=['salary'])
df2.show()
+------+---------+------+
|salary|firstname|gender|
+------+---------+------+
|    30|    James|     M|
|    30|     yana|     M|
|    41|     Anna|     F|
|    62|   Robert|     M|
+------+---------+------+

df2.write.csv('/path/to/location')

Upvotes: 1

Related Questions