Reputation: 67
The transactions_df
is the DF I am running my UDF on and inside the UDF I am referencing another DF to get values from based on some conditions.
def convertRate(row):
completed = row["completedAt"]
currency = row["currency"]
amount = row["amount"]
if currency == "MXN":
rate = currency_exchange_df.select("rate").where((transactions_df.to =="MXN") & (completed>=col("effectiveAt")) & (completed< col("effectiveTill")))
amount = amount/rate
final_rate = currency_exchange_df.select("rate").where((transactions_df.to =="CAD") & (completed>=col("effectiveAt")) & (completed< col("effectiveTill")))
converted = amount*final_rate
return converted
convertUDF = f.udf(lambda row: convertRate(row), DoubleType())
To call the UDF I am passing the Row as a struct. I got this solution from here.
temp = transactions_df.withColumn("newAmount", convertUDF(f.struct([transactions_df[x] for x in transactions_df.columns])))
temp.show()
I am getting error as follows:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
File ~\AppData\Local\Programs\Python\Python310\lib\site- packages\pyspark\serializers.py:437, in CloudPickleSerializer.dumps(self, obj)
436 try:
--> 437 return cloudpickle.dumps(obj, pickle_protocol)
438 except pickle.PickleError:
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:72, in dumps(obj, protocol, buffer_callback)
69 cp = CloudPickler(
70 file, protocol=protocol, buffer_callback=buffer_callback
71 )
---> 72 cp.dump(obj)
73 return file.getvalue()
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:540, in CloudPickler.dump(self, obj)
539 try:
--> 540 return Pickler.dump(self, obj)
541 except RuntimeError as e:
TypeError: cannot pickle '_thread.RLock' object
During handling of the above exception, another exception occurred:
PicklingError Traceback (most recent call last)
Input In [40], in <cell line: 1>()
----> 1 temp = transactions_df.withColumn("newAmount", convertUDF(f.struct([transactions_df[x] for x in transactions_df.columns])))
2 temp.show()
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:199, in UserDefinedFunction._wrapped.<locals>.wrapper(*args)
197 @functools.wraps(self.func, assigned=assignments)
198 def wrapper(*args):
--> 199 return self(*args)
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:177, in UserDefinedFunction.__call__(self, *cols)
176 def __call__(self, *cols):
--> 177 judf = self._judf
178 sc = SparkContext._active_spark_context
179 return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:161, in UserDefinedFunction._judf(self)
154 @property
155 def _judf(self):
156 # It is possible that concurrent access, to newly created UDF,
157 # will initialize multiple UserDefinedPythonFunctions.
158 # This is unlikely, doesn't affect correctness,
159 # and should have a minimal performance impact.
160 if self._judf_placeholder is None:
--> 161 self._judf_placeholder = self._create_judf()
162 return self._judf_placeholder
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:170, in UserDefinedFunction._create_judf(self)
167 spark = SparkSession.builder.getOrCreate()
168 sc = spark.sparkContext
--> 170 wrapped_func = _wrap_function(sc, self.func, self.returnType)
171 jdt = spark._jsparkSession.parseDataType(self.returnType.json())
172 judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
173 self._name, wrapped_func, jdt, self.evalType, self.deterministic)
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:34, in _wrap_function(sc, func, returnType)
32 def _wrap_function(sc, func, returnType):
33 command = (func, returnType)
---> 34 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
35 return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
36 sc.pythonVer, broadcast_vars, sc._javaAccumulator)
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\rdd.py:2814, in _prepare_for_python_RDD(sc, command)
2811 def _prepare_for_python_RDD(sc, command):
2812 # the serialized command will be compressed by broadcast
2813 ser = CloudPickleSerializer()
-> 2814 pickled_command = ser.dumps(command)
2815 if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc): # Default 1M
2816 # The broadcast will have same life cycle as created PythonRDD
2817 broadcast = sc.broadcast(pickled_command)
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\serializers.py:447, in CloudPickleSerializer.dumps(self, obj)
445 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
446 print_exec(sys.stderr)
--> 447 raise pickle.PicklingError(msg)
PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
Sample DF are as follows:
The first DF is my transaction_df
. The second DF contains the exchange rates.
The transactions provided are in either US dollars or Mexican Pesos and the currency exchange data contains only an "effectiveAt" date. It is assumed that the exchange rate will remain the same until a new record for a given rate is provided.
I have to convert all the transactions into CAD. Note we must first convert MXN to USD then USD to CAD.
The third DF is the expected solution.
Upvotes: 2
Views: 3392
Reputation: 24438
Your initial script is supposed to do the conversion. But in the screenshot, we see that other transformations were performed too - grouping, aggregation, pivoting. All of this may seem too much for one question. I did it just in order to show that it's truly possible using just dataframes, natively. The code may look bigger, but this way it's more manageable and efficient.
Inputs:
from pyspark.sql import functions as F, Window as W
transactions_df = spark.createDataFrame(
[(7, '2021-10-01', 'USD', 30.0, 'DEBIT', '2021-10-01'),
(9, '2021-10-02', 'USD', 10.0, 'DEBIT', '2021-10-02'),
(6, '2021-10-03', 'USD', 29.99, 'CREDIT', '2021-10-03'),
(2, '2021-10-03', 'USD', 29.99, 'CREDIT', '2021-10-03'),
(1, '2021-10-04', 'USD', 29.99, 'CREDIT', '2021-10-04'),
(4, '2021-10-04', 'USD', 49.99, 'CREDIT', '2021-10-04'),
(8, '2021-10-05', 'USD', 9.99, 'DEBIT', '2021-10-05'),
(3, '2021-10-06', 'MXN', 621.42, 'CREDIT', '2021-10-06'),
(5, '2021-10-07', 'USD', 35.99, 'CREDIT', '2021-10-07')],
['id', 'completedAt', 'currency', 'amount', 'type', 'completedDate'])
currency_exchange_df = spark.createDataFrame(
[(3, 'USD', 'MXN', 20.44, '2021-09-28', '2021-10-05'),
(4, 'USD', 'CAD', 1.35, '2021-09-28', '2021-10-04'),
(2, 'USD', 'CAD', 1.33, '2021-10-04', '9999-12-31'),
(1, 'USD', 'MXN', 20.79, '2021-10-05', '9999-12-31')],
['id', 'from', 'to', 'rate', 'effectiveAt', 'effectiveTill'])
First, for currency_exchange_df
, creating a row for every possible date in your ranges. This is inexpensive operation, because it's maximum only 365 rows per year per every currency pair.
eff_at = F.to_date('effectiveAt')
max_date = F.lit(transactions_df.agg(F.max(F.to_date('completedAt'))).head()[0])
w = W.partitionBy('from', 'to').orderBy(eff_at)
currency_exchange_df = (currency_exchange_df
.withColumn('effective', F.sequence(eff_at, F.coalesce(F.date_sub(F.lead(eff_at).over(w), 1), max_date)))
.withColumn('effective', F.explode('effective'))
)
# +---+----+---+-----+-----------+-------------+----------+
# | id|from| to| rate|effectiveAt|effectiveTill| effective|
# +---+----+---+-----+-----------+-------------+----------+
# | 4| USD|CAD| 1.35| 2021-09-28| 2021-10-04|2021-09-28|
# | 4| USD|CAD| 1.35| 2021-09-28| 2021-10-04|2021-09-29|
# | 4| USD|CAD| 1.35| 2021-09-28| 2021-10-04|2021-09-30|
# | 4| USD|CAD| 1.35| 2021-09-28| 2021-10-04|2021-10-01|
# | 4| USD|CAD| 1.35| 2021-09-28| 2021-10-04|2021-10-02|
# | 4| USD|CAD| 1.35| 2021-09-28| 2021-10-04|2021-10-03|
# | 2| USD|CAD| 1.33| 2021-10-04| 9999-12-31|2021-10-04|
# | 2| USD|CAD| 1.33| 2021-10-04| 9999-12-31|2021-10-05|
# | 2| USD|CAD| 1.33| 2021-10-04| 9999-12-31|2021-10-06|
# | 2| USD|CAD| 1.33| 2021-10-04| 9999-12-31|2021-10-07|
# | 3| USD|MXN|20.44| 2021-09-28| 2021-10-05|2021-09-28|
# | 3| USD|MXN|20.44| 2021-09-28| 2021-10-05|2021-09-29|
# | 3| USD|MXN|20.44| 2021-09-28| 2021-10-05|2021-09-30|
# | 3| USD|MXN|20.44| 2021-09-28| 2021-10-05|2021-10-01|
# | 3| USD|MXN|20.44| 2021-09-28| 2021-10-05|2021-10-02|
# | 3| USD|MXN|20.44| 2021-09-28| 2021-10-05|2021-10-03|
# | 3| USD|MXN|20.44| 2021-09-28| 2021-10-05|2021-10-04|
# | 1| USD|MXN|20.79| 2021-10-05| 9999-12-31|2021-10-05|
# | 1| USD|MXN|20.79| 2021-10-05| 9999-12-31|2021-10-06|
# | 1| USD|MXN|20.79| 2021-10-05| 9999-12-31|2021-10-07|
# +---+----+---+-----+-----------+-------------+----------+
Then, creating df_rates
containing exchange rates which can be used to directly convert to CAD. For this, a self join was used.
join_on = (F.col('a.effective') == F.col('b.effective')) & (F.col('a.to') != 'CAD')
df_rates = (currency_exchange_df.alias('a')
.join(currency_exchange_df.filter("from = 'USD' and to = 'CAD'").alias('b'), join_on, 'left')
.select(
F.col('a.effective').alias('completedAt'),
F.when(F.col('a.to') == 'CAD', 'USD').otherwise(F.col('a.to')).alias('currency'),
F.coalesce(F.col('b.rate') / F.col('a.rate'), 'a.rate').alias('rate')
)
)
# +-----------+--------+-------------------+
# |completedAt|currency| rate|
# +-----------+--------+-------------------+
# | 2021-10-02| USD| 1.35|
# | 2021-10-02| MXN|0.06604696673189824|
# | 2021-09-30| USD| 1.35|
# | 2021-09-30| MXN|0.06604696673189824|
# | 2021-10-05| USD| 1.33|
# | 2021-10-05| MXN|0.06397306397306397|
# | 2021-09-28| USD| 1.35|
# | 2021-09-28| MXN|0.06604696673189824|
# | 2021-09-29| USD| 1.35|
# | 2021-09-29| MXN|0.06604696673189824|
# | 2021-10-03| USD| 1.35|
# | 2021-10-03| MXN|0.06604696673189824|
# | 2021-10-06| USD| 1.33|
# | 2021-10-06| MXN|0.06397306397306397|
# | 2021-10-04| USD| 1.33|
# | 2021-10-04| MXN|0.06506849315068493|
# | 2021-10-01| USD| 1.35|
# | 2021-10-01| MXN|0.06604696673189824|
# | 2021-10-07| USD| 1.33|
# | 2021-10-07| MXN|0.06397306397306397|
# +-----------+--------+-------------------+
Finally, joining with the transactions_df
, grouping, pivoting, aggregating.
df_converted = (transactions_df
.join(df_rates, ['completedAt', 'currency'], 'left')
.withColumn('types', F.concat(F.initcap('type'), F.lit('s')))
.groupBy(F.col('completedAt').alias('date'))
.pivot('types', ['Credits', 'Debits'])
.agg(F.round(F.sum(F.col('amount') * F.col('rate')), 2))
.fillna(0)
)
df_converted.sort('date').show()
# +----------+-------+------+
# | date|Credits|Debits|
# +----------+-------+------+
# |2021-10-01| 0.0| 40.5|
# |2021-10-02| 0.0| 13.5|
# |2021-10-03| 80.97| 0.0|
# |2021-10-04| 106.37| 0.0|
# |2021-10-05| 0.0| 13.29|
# |2021-10-06| 39.75| 0.0|
# |2021-10-07| 47.87| 0.0|
# +----------+-------+------+
Dataframe API in Spark are becoming more and more powerful. This script didn't even require higher-order functions with predicates. If you cannot do the task using native Spark functionality, then you can turn to pandas_udf
. Going for regular udf
is a relic of past. They are inefficient and 99% of cases they are totally avoidable.
Upvotes: 1