Qdesmedt
Qdesmedt

Reputation: 63

Weird behavior in Pyspark

I observed a weird behavior in PySpark. Maybe one of you will know what happens. If I do this:

def create_my_date(mydate):
       try:
           return mydate.strftime('%Y%m')
       except:
           return None

df = df.withColumn(
   "date_string", 
   F.udf(create_id, StringType())(df.mydate)
)

df.filter(~df.mydate.isNotNull()).count()
df.filter(df.mydate.isNotNull()).count()

This output:

0
10

It means I do not have Null value in the column df.mydate.

But If I change the create_my_date function and remove the try/except:

def create_my_date(mydate):
    return mydate.strftime('%Y%m')


df = df.withColumn(
    "date_string", 
    F.udf(create_id, StringType())(df.mydate)
)

df.filter(~df.mydate.isNotNull()).count()
df.filter(df.mydate.isNotNull()).count()

The JVM broke and say:

Py4JJavaError: An error occurred while calling o7058.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 997.0 failed 4 times, most recent failure: Lost task 22.3 in stage 997.0 (TID 335940, 126.102.230.110, executor 29): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/home/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/home/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
    mapper = lambda a: udf(*a)
  File "/home/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
  File "<ipython-input-109-422e4b5e07cf>", line 2, in create_my_date
AttributeError: 'NoneType' object has no attribute 'strftime'

Do someone have an explanation for me ?

Thanks !

Upvotes: 0

Views: 501

Answers (1)

kshell
kshell

Reputation: 236

The reason you're getting the attribute error is because you're trying to use strftime on a None type. You can see that the error is triggered during 'create_my_date', which since it's a udf is using the python representation of the rdd object. So essentially it's doing this:

>>> None.strftime("%Y%m")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: 'NoneType' object has no attribute 'strftime'

Instead, you can accomplish what you want just using dataframe functions (faster than a udf, and doesn't need a try-except block):

from pyspark.sql.functions import date_format
from datetime import datetime
df = spark.createDataFrame([[datetime(2018, 3, 2).date()], [None]], ["mydate"])

df = df.withColumn("date_string", date_format("mydate", "YMM"))
df.show()

The resulting dataframe:

    +----------+-----------+
    |    mydate|date_string|
    +----------+-----------+
    |2018-03-02|     201803|
    |      null|       null|
    +----------+-----------+

Then your count:

df.filter(df["mydate"].isNotNull()).count()
df.filter(df["mydate"].isNull()).count()

Returns as expected:

1
1

Upvotes: 2

Related Questions