diego
diego

Reputation: 177

Spark trying to avoid use of collect()

I'm having some troubles trying to improve the performance of a code in Python. This code is reading a huge amount of data (really big) from Databricks.

I filter it doing something like this:

                    data_filtered = (
                    data
                    .filter(
                        (data["id"] == event['id']) &
                        (data["Topic"].isin(topics))
                    )
                )

but still, this data is quite big. For some instances I need to declare my sparkSession like:

{
        "sark.kryoserializer.buffer.max": "2047m",
        "spark.dynamicAllocation.enabled": "true",
        "spark.dynamicAllocation.minExecutors": "1",
        "spark.dynamicAllocation.maxExecutors": "100",
        "spark.dynamicAllocation.initialExecutors": "50",
        "spark.executor.memory": "16g",
        "spark.driver.memory": "16g",
        "spark.executor.cores": "4",
        "spark.driver.cores": "4",
        "spark.sql.shuffle.partitions": "750",
        "spark.sql.catalogImplementation": "hive",
        "spark.driver.maxResultSize": "4g"
    }

and sometimes it had been not enough. And the problem always comes with the .collect() I'm doing to this filtered data:

data_collected = data_filtered.rdd.map(lambda row: row.asDict()).collect()

I need this data like this (or I dont know some other way) because I am looping the data creating a dictionary with every row. I have tried some other methods, using foreach/toLocalIterator for example (maybe wrongly), but I had a problem, I think because one column is converted to json json.loads() when I .append() it to my final dictionary.

Thanks!

Upvotes: 1

Views: 44

Answers (1)

Matt Andruff
Matt Andruff

Reputation: 5135

Stop using collect(). Collect is not a big data tool. It's a fits 'on my computer tool'.

If you need to call a function on every row the big data tool for that is a User Defined Function.(UDF). The performance won't be good but hey at least you get the job done. (It doesn't perform well as it ships the data back and for the the python interpreter one row at a time.)

I'm not sure why you are creating a dictionary for each row. You can access each row via a select statement, so why create a dictionary? Perhaps consider exploring other options using spark/pyspark tools instead of python tools.

Upvotes: 0

Related Questions