Reputation: 15
I construct a spark DataFrame using with the following structure:
root
|-- tickers: string (nullable = true)
|-- name: string (nullable = true)
|-- price: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: map (valueContainsNull = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)
I want to save each object in price
into a separate JSON file and have each saved file using the corresponding name
string as filename. Is there a way to implement this in a Python environment?
The most relevant solution I find is to repartition the dataframe into patitions of number of "rows" in dataframe, and use .write.csv()
(see https://stackoverflow.com/a/49890590/6158414). But this doesn't fit my need to save "rows" into separate files with different filenames.
To give more contexts. I'm using spark to call API and retrieve data in parallel. Each "row" in the spark dataframe is data query based on a unique values of tickers
. The last step of my process is to save each query result separately. Would also appreciate it if someone have better way to do this.
Thanks a lot!
Upvotes: 1
Views: 5620
Reputation: 1912
You can write Spark UDF to save each object / element to a different CSV file.
Below is an example, which writes each row to a separate file. In your case, you just need to modify the UDF, to traverse through the elements of Price column and write them to a separate file.
>>> import csv
>>> from pyspark.sql.functions import udf, col
>>> from pyspark.sql.types import StringType
>>>
>>> list = [("1", "name1"), ("2", "name2"), ("3", "name3"), ("4", "name4")]
>>>
>>> df = spark.createDataFrame(list, ["id", "name"])
>>>
>>> df.show()
+---+-----+
| id| name|
+---+-----+
| 1|name1|
| 2|name2|
| 3|name3|
| 4|name4|
+---+-----+
>>> # UDF that takes 2 columns and return if the file is saved successfully.
>>> def writeToCsv(x, y):
... myData = [["id", "name"]]
... fileName = x + '.csv' # Modify the file name, add path if required.
... myFile = open(fileName, 'w')
... with myFile:
... writer = csv.writer(myFile)
... myData = myData + [[x, y]]
... writer.writerows(myData)
... return "SAVED"
...
>>> # Register UDF with StringType Return type.
>>> save_udf = udf(writeToCsv, StringType())
>>> # Invoke UDF for each row of the Dataframe.
... out_df = df.withColumn("processed", save_udf(df.id, df.name))
>>>
>>> # Check if all the rows are processed successfully.
>>> out_df.show()
+---+-----+---------+
| id| name|processed|
+---+-----+---------+
| 1|name1| SAVED|
| 2|name2| SAVED|
| 3|name3| SAVED|
| 4|name4| SAVED|
+---+-----+---------+
Upvotes: 8