Reputation: 53
I am using AWS Glue to perform ETL job using dev-endpoint. I tried to run the following code but got an error while execution. It was running successfully when I run it few days back. But, it is raising errors for new runs.
df1=
sqlContext.createDataFrame([("11/25/1991","11/24/1991","11/30/1991")
("11/25/1391","11/24/1992","11/30/1992")],
schema=['first', 'second', 'third'])
func = udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())
df = df1.withColumn('test', func(col('first')))
df.printSchema()
df.show()
Here is the error
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-2266243445000109294.py", line 367, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-2266243445000109294.py", line 360, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 27, in <module>
File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 318, in show
print(self._jdf.showString(n, 20))
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o208.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 36, ip-172-31-58-71.us-east-2.compute.internal, executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 7.1 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2150)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2363)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Do you think there might some issues with AWS Glue itself. The code works with printSchema() method while it shows error with show() method.
Upvotes: 2
Views: 2634
Reputation: 3153
I dont see any errors with the above code, except that I had to use datetime.datetime.strptime().
%pyspark
import datetime
from datetime import datetime
import dateutil
from pyspark.sql.functions import udf, col
from pyspark.sql.types import DateType
df1 = sqlContext.createDataFrame([("11/25/1991","11/24/1991","11/30/1991"),("11/25/1391","11/24/1992","11/30/1992")], schema=['first', 'second', 'third'])
func = udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())
df1.show()
df = df1.withColumn('test', func(col('first')))
df.printSchema()
df.show()
Result output:
+----------+----------+----------+
| first| second| third|
+----------+----------+----------+
|11/25/1991|11/24/1991|11/30/1991|
|11/25/1391|11/24/1992|11/30/1992|
+----------+----------+----------+
root
|-- first: string (nullable = true)
|-- second: string (nullable = true)
|-- third: string (nullable = true)
|-- test: date (nullable = true)
+----------+----------+----------+----------+
| first| second| third| test|
+----------+----------+----------+----------+
|11/25/1991|11/24/1991|11/30/1991|1991-11-25|
|11/25/1391|11/24/1992|11/30/1992|1391-11-17|
+----------+----------+----------+----------+
This, I ran it on zeppelin in local mode without connecting to glue, since I didnt see any glue context object in the above code provided by you, so hope it is works.
Other thing I noticed in the error log is, it has reported some memory issue, maybe you can take a look at it as well, maybe you can try increase the DPUs if required.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 36, ip-172-31-58-71.us-east-2.compute.internal, executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 7.1 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead
The result output in the new column TEST is not converted correctly, but think you can resolve it.
Thanks
Upvotes: 1