Reputation: 87
I'm trying to filter the data frame by values of salary then saving them as CSV files using pyspark.
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [('James','M',30),('Anna','F',41),
('Robert','M',62),('yana','M',30)
]
columns = ["firstname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()
spark.sparkContext.parallelize([30,41,62]).foreach(lambda x : df.filter(df.salary == x).show())
while running this code it returns
_pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
(EDITED)
More I need to perform a file write concurrently spark filters the data frame
spark.sparkContext.parallelize([30,41,62]).foreach(lambda x : df.filter(df.salary == x).write.save((f'/path-to-drive-location/{driver}'))
Upvotes: 0
Views: 695
Reputation: 6082
You don't want to deal with RDD and foreach
or map
. You can just create another dataframe and perform an inner join (or any other join you want).
salaries = [30, 41, 62]
a = spark.createDataFrame([(s,) for s in salaries], ['salary'])
+------+
|salary|
+------+
| 30|
| 41|
| 62|
+------+
df2 = df.join(a, on=['salary'])
df2.show()
+------+---------+------+
|salary|firstname|gender|
+------+---------+------+
| 30| James| M|
| 30| yana| M|
| 41| Anna| F|
| 62| Robert| M|
+------+---------+------+
df2.write.csv('/path/to/location')
Upvotes: 1