Sagar Moghe
Sagar Moghe

Reputation: 67

PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object in row passing to UDF

The transactions_df is the DF I am running my UDF on and inside the UDF I am referencing another DF to get values from based on some conditions.

def convertRate(row):
    completed = row["completedAt"]
    currency = row["currency"]
    amount = row["amount"]
    if currency == "MXN":
        rate = currency_exchange_df.select("rate").where((transactions_df.to =="MXN") & (completed>=col("effectiveAt")) & (completed< col("effectiveTill")))
        amount = amount/rate
    final_rate = currency_exchange_df.select("rate").where((transactions_df.to =="CAD") & (completed>=col("effectiveAt")) & (completed< col("effectiveTill")))
    converted = amount*final_rate
    return converted

convertUDF = f.udf(lambda row: convertRate(row), DoubleType())

To call the UDF I am passing the Row as a struct. I got this solution from here.

temp = transactions_df.withColumn("newAmount", convertUDF(f.struct([transactions_df[x] for x in transactions_df.columns])))
temp.show()

I am getting error as follows:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File ~\AppData\Local\Programs\Python\Python310\lib\site-        packages\pyspark\serializers.py:437, in CloudPickleSerializer.dumps(self, obj)
    436 try:
--> 437     return cloudpickle.dumps(obj, pickle_protocol)
    438 except pickle.PickleError:

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:72, in dumps(obj, protocol, buffer_callback)
     69 cp = CloudPickler(
     70     file, protocol=protocol, buffer_callback=buffer_callback
     71 )
---> 72 cp.dump(obj)
     73 return file.getvalue()

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:540, in CloudPickler.dump(self, obj)
    539 try:
--> 540     return Pickler.dump(self, obj)
    541 except RuntimeError as e:

TypeError: cannot pickle '_thread.RLock' object

During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
Input In [40], in <cell line: 1>()
----> 1 temp = transactions_df.withColumn("newAmount", convertUDF(f.struct([transactions_df[x] for x in transactions_df.columns])))
  2 temp.show()

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:199, in UserDefinedFunction._wrapped.<locals>.wrapper(*args)
    197 @functools.wraps(self.func, assigned=assignments)
    198 def wrapper(*args):
--> 199     return self(*args)

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:177, in UserDefinedFunction.__call__(self, *cols)
    176 def __call__(self, *cols):
--> 177     judf = self._judf
    178     sc = SparkContext._active_spark_context
    179     return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:161, in UserDefinedFunction._judf(self)
    154 @property
    155 def _judf(self):
    156     # It is possible that concurrent access, to newly created UDF,
    157     # will initialize multiple UserDefinedPythonFunctions.
    158     # This is unlikely, doesn't affect correctness,
    159     # and should have a minimal performance impact.
    160     if self._judf_placeholder is None:
--> 161         self._judf_placeholder = self._create_judf()
    162     return self._judf_placeholder

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:170, in UserDefinedFunction._create_judf(self)
    167 spark = SparkSession.builder.getOrCreate()
    168 sc = spark.sparkContext
--> 170 wrapped_func = _wrap_function(sc, self.func, self.returnType)
    171 jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    172 judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
    173     self._name, wrapped_func, jdt, self.evalType, self.deterministic)

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:34, in _wrap_function(sc, func, returnType)
     32 def _wrap_function(sc, func, returnType):
     33     command = (func, returnType)
---> 34     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
     35     return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
     36                                   sc.pythonVer, broadcast_vars, sc._javaAccumulator)

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\rdd.py:2814, in _prepare_for_python_RDD(sc, command)
   2811 def _prepare_for_python_RDD(sc, command):
   2812     # the serialized command will be compressed by broadcast
   2813     ser = CloudPickleSerializer()
-> 2814     pickled_command = ser.dumps(command)
   2815     if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc):  # Default 1M
   2816         # The broadcast will have same life cycle as created PythonRDD
   2817         broadcast = sc.broadcast(pickled_command)

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\serializers.py:447, in CloudPickleSerializer.dumps(self, obj)
    445     msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    446 print_exec(sys.stderr)
--> 447 raise pickle.PicklingError(msg)

    PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object

Sample DF are as follows:

The first DF is my transaction_df. The second DF contains the exchange rates.

The transactions provided are in either US dollars or Mexican Pesos and the currency exchange data contains only an "effectiveAt" date. It is assumed that the exchange rate will remain the same until a new record for a given rate is provided.

I have to convert all the transactions into CAD. Note we must first convert MXN to USD then USD to CAD.

The third DF is the expected solution. enter image description here

Upvotes: 2

Views: 3392

Answers (1)

ZygD
ZygD

Reputation: 24438

Your initial script is supposed to do the conversion. But in the screenshot, we see that other transformations were performed too - grouping, aggregation, pivoting. All of this may seem too much for one question. I did it just in order to show that it's truly possible using just dataframes, natively. The code may look bigger, but this way it's more manageable and efficient.

Inputs:

from pyspark.sql import functions as F, Window as W
transactions_df = spark.createDataFrame(
    [(7, '2021-10-01', 'USD', 30.0, 'DEBIT', '2021-10-01'),
     (9, '2021-10-02', 'USD', 10.0, 'DEBIT', '2021-10-02'),
     (6, '2021-10-03', 'USD', 29.99, 'CREDIT', '2021-10-03'),
     (2, '2021-10-03', 'USD', 29.99, 'CREDIT', '2021-10-03'),
     (1, '2021-10-04', 'USD', 29.99, 'CREDIT', '2021-10-04'),
     (4, '2021-10-04', 'USD', 49.99, 'CREDIT', '2021-10-04'),
     (8, '2021-10-05', 'USD', 9.99, 'DEBIT', '2021-10-05'),
     (3, '2021-10-06', 'MXN', 621.42, 'CREDIT', '2021-10-06'),
     (5, '2021-10-07', 'USD', 35.99, 'CREDIT', '2021-10-07')],
    ['id', 'completedAt', 'currency', 'amount', 'type', 'completedDate'])

currency_exchange_df = spark.createDataFrame(
    [(3, 'USD', 'MXN', 20.44, '2021-09-28', '2021-10-05'),
     (4, 'USD', 'CAD', 1.35, '2021-09-28', '2021-10-04'),
     (2, 'USD', 'CAD', 1.33, '2021-10-04', '9999-12-31'),
     (1, 'USD', 'MXN', 20.79, '2021-10-05', '9999-12-31')],
    ['id', 'from', 'to', 'rate', 'effectiveAt', 'effectiveTill'])

First, for currency_exchange_df, creating a row for every possible date in your ranges. This is inexpensive operation, because it's maximum only 365 rows per year per every currency pair.

eff_at = F.to_date('effectiveAt')
max_date = F.lit(transactions_df.agg(F.max(F.to_date('completedAt'))).head()[0])
w = W.partitionBy('from', 'to').orderBy(eff_at)
currency_exchange_df = (currency_exchange_df
    .withColumn('effective', F.sequence(eff_at, F.coalesce(F.date_sub(F.lead(eff_at).over(w), 1), max_date)))
    .withColumn('effective', F.explode('effective'))
)
# +---+----+---+-----+-----------+-------------+----------+
# | id|from| to| rate|effectiveAt|effectiveTill| effective|
# +---+----+---+-----+-----------+-------------+----------+
# |  4| USD|CAD| 1.35| 2021-09-28|   2021-10-04|2021-09-28|
# |  4| USD|CAD| 1.35| 2021-09-28|   2021-10-04|2021-09-29|
# |  4| USD|CAD| 1.35| 2021-09-28|   2021-10-04|2021-09-30|
# |  4| USD|CAD| 1.35| 2021-09-28|   2021-10-04|2021-10-01|
# |  4| USD|CAD| 1.35| 2021-09-28|   2021-10-04|2021-10-02|
# |  4| USD|CAD| 1.35| 2021-09-28|   2021-10-04|2021-10-03|
# |  2| USD|CAD| 1.33| 2021-10-04|   9999-12-31|2021-10-04|
# |  2| USD|CAD| 1.33| 2021-10-04|   9999-12-31|2021-10-05|
# |  2| USD|CAD| 1.33| 2021-10-04|   9999-12-31|2021-10-06|
# |  2| USD|CAD| 1.33| 2021-10-04|   9999-12-31|2021-10-07|
# |  3| USD|MXN|20.44| 2021-09-28|   2021-10-05|2021-09-28|
# |  3| USD|MXN|20.44| 2021-09-28|   2021-10-05|2021-09-29|
# |  3| USD|MXN|20.44| 2021-09-28|   2021-10-05|2021-09-30|
# |  3| USD|MXN|20.44| 2021-09-28|   2021-10-05|2021-10-01|
# |  3| USD|MXN|20.44| 2021-09-28|   2021-10-05|2021-10-02|
# |  3| USD|MXN|20.44| 2021-09-28|   2021-10-05|2021-10-03|
# |  3| USD|MXN|20.44| 2021-09-28|   2021-10-05|2021-10-04|
# |  1| USD|MXN|20.79| 2021-10-05|   9999-12-31|2021-10-05|
# |  1| USD|MXN|20.79| 2021-10-05|   9999-12-31|2021-10-06|
# |  1| USD|MXN|20.79| 2021-10-05|   9999-12-31|2021-10-07|
# +---+----+---+-----+-----------+-------------+----------+

Then, creating df_rates containing exchange rates which can be used to directly convert to CAD. For this, a self join was used.

join_on = (F.col('a.effective') == F.col('b.effective')) & (F.col('a.to') != 'CAD')
df_rates = (currency_exchange_df.alias('a')
    .join(currency_exchange_df.filter("from = 'USD' and to = 'CAD'").alias('b'), join_on, 'left')
    .select(
        F.col('a.effective').alias('completedAt'),
        F.when(F.col('a.to') == 'CAD', 'USD').otherwise(F.col('a.to')).alias('currency'),
        F.coalesce(F.col('b.rate') / F.col('a.rate'), 'a.rate').alias('rate')
    )
)
# +-----------+--------+-------------------+
# |completedAt|currency|               rate|
# +-----------+--------+-------------------+
# | 2021-10-02|     USD|               1.35|
# | 2021-10-02|     MXN|0.06604696673189824|
# | 2021-09-30|     USD|               1.35|
# | 2021-09-30|     MXN|0.06604696673189824|
# | 2021-10-05|     USD|               1.33|
# | 2021-10-05|     MXN|0.06397306397306397|
# | 2021-09-28|     USD|               1.35|
# | 2021-09-28|     MXN|0.06604696673189824|
# | 2021-09-29|     USD|               1.35|
# | 2021-09-29|     MXN|0.06604696673189824|
# | 2021-10-03|     USD|               1.35|
# | 2021-10-03|     MXN|0.06604696673189824|
# | 2021-10-06|     USD|               1.33|
# | 2021-10-06|     MXN|0.06397306397306397|
# | 2021-10-04|     USD|               1.33|
# | 2021-10-04|     MXN|0.06506849315068493|
# | 2021-10-01|     USD|               1.35|
# | 2021-10-01|     MXN|0.06604696673189824|
# | 2021-10-07|     USD|               1.33|
# | 2021-10-07|     MXN|0.06397306397306397|
# +-----------+--------+-------------------+

Finally, joining with the transactions_df, grouping, pivoting, aggregating.

df_converted = (transactions_df
    .join(df_rates, ['completedAt', 'currency'], 'left')
    .withColumn('types', F.concat(F.initcap('type'), F.lit('s')))
    .groupBy(F.col('completedAt').alias('date'))
    .pivot('types', ['Credits', 'Debits'])
    .agg(F.round(F.sum(F.col('amount') * F.col('rate')), 2))
    .fillna(0)
)
df_converted.sort('date').show()
# +----------+-------+------+
# |      date|Credits|Debits|
# +----------+-------+------+
# |2021-10-01|    0.0|  40.5|
# |2021-10-02|    0.0|  13.5|
# |2021-10-03|  80.97|   0.0|
# |2021-10-04| 106.37|   0.0|
# |2021-10-05|    0.0| 13.29|
# |2021-10-06|  39.75|   0.0|
# |2021-10-07|  47.87|   0.0|
# +----------+-------+------+

Dataframe API in Spark are becoming more and more powerful. This script didn't even require higher-order functions with predicates. If you cannot do the task using native Spark functionality, then you can turn to pandas_udf. Going for regular udf is a relic of past. They are inefficient and 99% of cases they are totally avoidable.

Upvotes: 1

Related Questions