Reputation: 90
Trying to use UDF function, but getting error:
import time
import datetime
from pyspark.sql.functions import lit,unix_timestamp, udf, col, lit
from pyspark.sql.types import TimestampType, DecimalType
dict = [{'name': 'Alice', 'age': 1},{'name': 'Again', 'age': 2}]
df = spark.createDataFrame(dict)
timestamp1 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
timestamp2 = datetime.datetime.fromtimestamp(time.time() + 90).strftime('%Y-%m-%d %H:%M:%S')
def calc_time(start, end):
timefmt = "yyyy-MM-dd'T'HH:mm:ss"
return unix_timestamp(end, format=timefmt) - unix_timestamp(start, format=timefmt)
calc_time_udf = udf(lambda start, end: calc_time(start, end), TimestampType())
new_df = (df.withColumn('time1', unix_timestamp(lit(timestamp1),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
.withColumn('time2', unix_timestamp(lit(timestamp2),'yyyy-MM-dd HH:mm:ss').cast("timestamp")))
new_df.withColumn("DIFF", calc_time_udf(col("time1"), col("time2")).cast(DecimalType(20, 6))).show()
Error Stacktrace:
File "/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6626826/lib/spark/python/pyspark/sql/functions.py", line 1253, in unix_timestamp return Column(sc._jvm.functions.unix_timestamp(_to_java_column(timestamp), format)) AttributeError: 'NoneType' object has no attribute '_jvm'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Upvotes: 2
Views: 1382
Reputation: 210822
You may want to try this approach:
from pyspark.sql.functions import lit,unix_timestamp, udf, col, lit
from pyspark.sql.types import DoubleType, DecimalType
from pyspark.sql.functions import pandas_udf
@pandas_udf(DoubleType())
def ts_diff(start, end):
return (end - start).dt.total_seconds()
then using the new_df
from your question:
>>> new_df.withColumn("DIFF", ts_diff("time1", "time2")).show()
+---+-----+-------------------+-------------------+----+
|age| name| time1| time2|DIFF|
+---+-----+-------------------+-------------------+----+
| 1|Alice|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
| 2|Again|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
+---+-----+-------------------+-------------------+----+
Upvotes: 2