user3049941
user3049941

Reputation: 137

Checking DataFrame has records in PySpark

This is my first time working with either Python or Spark, I'm a Java developer. So I don't know how is the best way to solve it here.

I'm working with:

I have a PySpark script, this script executes different queries and creates temporary views, until it finally executes a final queries using/joining the different temporary views. It will write files with the result of the final executed query.

The script works fine, but we found out, that when there is no data, it still creates the 200 files (all empty). We wanted to validate that it actually has data before calling the write method or even create the temporary view, so we tried with if df.count() == 0:, if so raising an error, otherwise, just proceed.

I just added that validation to the final two dataframes, before doing the temporary view, so it interrupts the process as soon as possible, and before executing the next queries.

Then we read somewhere, count is a very expensive method to validate that there is data because it goes through all the executioners , so before even trying it, we changed to something recommended in several places: to use df.take(1), df.head(1),or df.first(1). We went finally with head(1).

However, this changed the execution elapsed time from 30 mins to actually more than 1h 40m.

I'd like to know which other way I can avoid spark to write empty files, without increasing that much the computation time.

Since I'm new with all this, I'm opened to suggestion.

Edit

I have already read this thread: How to check if spark dataframe is empty. From this very thread, I took that I should use len(df.head(1)) == 0, and that increased the computing time from 30 minutes to 1h 40m+.

Upvotes: 2

Views: 8808

Answers (2)

Henrique Florencio
Henrique Florencio

Reputation: 3751

Just get your dataframe's rdd and check if it is empty:

df.rdd.isEmpty()

There are two types of operations in spark: actions and transformations. All transformations in Spark are lazy, they do not compute their results right away. The transformations are only computed when an action is executed. Actions are costly because spark needs to run all the transformations to that point to run the action.

Upvotes: 3

user3049941
user3049941

Reputation: 137

@Jaco I finally did something like if df.select('my_no_computed_column').head() is None:, because apparently, head() with no parameter will assume 1 and according to Spark's code:

    @ignore_unicode_prefix
    @since(1.3)
    def head(self, n=None):
        """Returns the first ``n`` rows.

        .. note:: This method should only be used if the resulting array is expected
            to be small, as all the data is loaded into the driver's memory.

        :param n: int, default 1. Number of rows to return.
        :return: If n is greater than 1, return a list of :class:`Row`.
            If n is 1, return a single Row.

        >>> df.head()
        Row(age=2, name=u'Alice')
        >>> df.head(1)
        [Row(age=2, name=u'Alice')]
        """
        if n is None:
            rs = self.head(1)
            return rs[0] if rs else None
        return self.take(n)

it will return a None if there is no rows (I might be reading it all wrong though, I've been programming with Java for over 10 years now, and Python as well as Spark are too new for me, and Python is too odd for my eyes).

It did reduce the running time considerably.

Upvotes: 0

Related Questions