Reputation: 153
I'm using a single node databricks cluster with the following configuration:
I need to write partitions from different dataframes in zstd compressed .csv files and upload them to DBFS. What I would like to do is to run the compression process on multiple threads with different parts of the dataframe (the dataframe is filtered by some criteria). What I tried so far is this (Python):
from multiprocessing.pool import ThreadPool
class FileCreation:
def __init__()
#...
# init the data frame and file path
def create_files(self, df, country, partitions):
df.coalesce(partitions).write.csv(os.path.join(self.storage_path, f"country={country}"), compression="org.apache.hadoop.io.compress.ZStandardCodec", header=True, mode="overwrite")
def process(self) -> None:
countries: List[str] = self.get_country_codes(df) # returns a list like ["Germany", "Netherlands"...] depending on different countries existing in dataset
self.run(countries)
def run(self, countries):
with ThreadPool(processes=int(32)) as pool:
return pool.map(self.process_country, countries)
def process_country(self, country):
df = self.filter() # performs a filter on the dataframe based on country
partitions = self.get_number_of_necessary_partitions(df) #returns the number of partitions based on a defined number of rows in a file
self.create_files(country, partition)
file_creation = FileCreation(...)
file_creation.process()
The script works, some files are uploaded but at some point the Spark culster stoppes unexpectedly. I have to mention the data comes with 4 partitions by default but I need to reduce them based on calculated number of partitions.I need the job to run for aprox one hour. I had a look on the logs but nothing helpful. I read that zstd compression can have problems with multiple threads. Any ideea what I can do? Thanks!
Upvotes: 0
Views: 477