Reputation: 85
I'm brand new the pyspark (and really python as well). I'm trying to count distinct on each column (not distinct combinations of columns). I want the answer to this SQL statement:
sqlStatement = "Select Count(Distinct C1) AS C1, Count(Distinct C2) AS C2, ..., Count(Distinct CN) AS CN From myTable"
distinct_count = spark.sql(sqlStatement).collect()
That takes forever (16 hours) on an 8-node cluster (see configuration below). I'm trying to optimize a 100GB dataset with 400 columns. I am not seeing a way of using dataframe sql primitives like:
df.agg(countDistinct('C1', 'C2', ..., 'CN'))
as that will again give me unique combinations. There must be a way to make this fast.
Master node
Standard (1 master, N workers)
Machine type
n1-highmem-8 (8 vCPU, 52.0 GB memory)
Primary disk size
500 GB
Worker nodes
8
Machine type
n1-highmem-4 (4 vCPU, 26.0 GB memory)
Primary disk size
500 GB
Local SSDs
1
Upvotes: 1
Views: 4049
Reputation: 99
Note that you are using the .collect()
method which returns all elements of the dataset to the driver and this may cause the driver to run out of memory. See this link for explanation.
You can see what is being passed by running .explain()
on your query:
myquery = spark.sql(sqlStatement)
myquery.explain()
You could ease the problem by splitting up your query into multiple queries so you do not calculate distinct()
on every column at once. This will reduce the amount of data passed at a time.
Upvotes: 1