Tibo Geysen
Tibo Geysen

Reputation: 185

How to resolve a SparkException when using a User-Defined function?

I need to detect a language by text, and translate that text using PySpark. I could not find any functions for this in PySpark so I created my own UDF's.

Language detection

def detectlang(string):
    b = TextBlob(string)
    return b.detect_language()

detectlang_udf = udf(detectlang)

Translation

def translate(string):
  trans = Translator()
  return trans.translate(string).text

translate_udf = udf(translate, StringType())

However when I call these functions and then ask for the result I get the following error:

result = dict_comments[13].withColumn("lang", detectlang_udf(col('Text')))
result.show()

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 15, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):

Edit (full error)

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-256375544159477> in <module>
      1 result = dict_comments[13].withColumn("lang", detectlang_udf(col('Text')))
----> 2 result.show()

/databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    379         """
    380         if isinstance(truncate, bool) and truncate:
--> 381             print(self._jdf.showString(n, 20, vertical))
    382         else:
    383             print(self._jdf.showString(n, int(truncate), vertical))

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o872.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 15, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 480, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 472, in process
    serializer.dump_stream(out_iter, outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 456, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/databricks/spark/python/pyspark/serializers.py", line 149, in dump_stream
    for obj in iterator:
  File "/databricks/spark/python/pyspark/serializers.py", line 445, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/databricks/spark/python/pyspark/worker.py", line 87, in <lambda>
    return lambda *a: f(*a)
  File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<command-256375544159470>", line 3, in detectlang
  File "/databricks/python/lib/python3.7/site-packages/textblob/blob.py", line 568, in detect_language
    return self.translator.detect(self.raw)
  File "/databricks/python/lib/python3.7/site-packages/textblob/translate.py", line 69, in detect
    raise TranslatorError('Must provide a string with at least 3 characters.')
textblob.exceptions.TranslatorError: Must provide a string with at least 3 characters.

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:534)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:488)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
    at org.apache.spark.scheduler.Task.run(Task.scala:113)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:528)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:534)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2360)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2348)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2347)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2347)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1101)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1101)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1101)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2579)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2527)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2515)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:896)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280)
    at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:270)
    at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:280)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:80)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:86)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:55)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectResult(Dataset.scala:2889)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3501)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2618)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2618)
    at org.apache.spark.sql.Dataset$$anonfun$54.apply(Dataset.scala:3485)
    at org.apache.spark.sql.Dataset$$anonfun$54.apply(Dataset.scala:3480)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3480)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2618)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2832)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:265)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:302)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 480, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 472, in process
    serializer.dump_stream(out_iter, outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 456, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/databricks/spark/python/pyspark/serializers.py", line 149, in dump_stream
    for obj in iterator:
  File "/databricks/spark/python/pyspark/serializers.py", line 445, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/databricks/spark/python/pyspark/worker.py", line 87, in <lambda>
    return lambda *a: f(*a)
  File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<command-256375544159470>", line 3, in detectlang
  File "/databricks/python/lib/python3.7/site-packages/textblob/blob.py", line 568, in detect_language
    return self.translator.detect(self.raw)
  File "/databricks/python/lib/python3.7/site-packages/textblob/translate.py", line 69, in detect
    raise TranslatorError('Must provide a string with at least 3 characters.')
textblob.exceptions.TranslatorError: Must provide a string with at least 3 characters.

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:534)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:488)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159)
    at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
    at org.apache.spark.scheduler.Task.run(Task.scala:113)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:528)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:534)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

Does anybody know how to resolve this or know of any pre-implemented PySpark functions for my goal?

Upvotes: 0

Views: 4194

Answers (1)

Napoleon Borntoparty
Napoleon Borntoparty

Reputation: 1962

Based on the latest edit, here is the source of your task failure.

  File "/databricks/python/lib/python3.7/site-packages/textblob/translate.py", line 69, in detect
    raise TranslatorError('Must provide a string with at least 3 characters.')
textblob.exceptions.TranslatorError: Must provide a string with at least 3 characters.

Spark can sometimes be quite unhelpful in providing errors, so here you need to search all the way for your python exception, as the driver will only say something like Task Lost or Task Failed.

A quick fix to your function is then checking on the len() of the input string, or adding a f.when().otherwise() in pyspark on your function call - this might be more desirable as you will not execute your udf when not needed. Hope this helps!

Upvotes: 1

Related Questions