user257330
user257330

Reputation: 73

Pickle error while creating new pyspark dataframe by processing old dataframe using foreach method

Given a pyspark dataframe given_df, I need to use it to generate a new dataframe new_df from it.

I am trying to process the pyspark dataframe row by row using foreach() method. Lets say, for simplicity, both the dataframes given_df and new_df consists of a single column.

I have to process each row of this dataframe and based on the value present in that cell, I am creating some new Rows and adding it to new_df by unioning it with the Rows. The number of rows that are going to be generated upon processing a single row of given_df is variable.

new_df=spark.createDataFrame([], schema=['SampleField']) // Create an empty dataframe initially

given_df.foreach(func) // given_df already contains some data loaded. Now I run a function for each row.

def func(row):
    rows_to_append = getNewRowsAfterProcessingCurrentRow(row)
    global new_df // without this line, the next line will result in an error, because it will think that new_df is a local variable and we are trying to access it without defining it first.
    new_df=new_df.union(spark.createDataFrame(data=rows_to_append, schema=['SampleField'])

However this results in a pickle error.

If the union function is commented out, then no error takes place.

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 476, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 1097, in dumps
    cp.dump(obj)
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 356, in dump
    return Pickler.dump(self, obj)
  File "/databricks/python/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 789, in save_tuple
    save(element)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 500, in save_function
    self.save_function_tuple(obj)
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 729, in save_function_tuple
    save(state)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/databricks/python/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 819, in save_list
    self._batch_appends(obj)
  File "/databricks/python/lib/python3.7/pickle.py", line 843, in _batch_appends
    save(x)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 500, in save_function
    self.save_function_tuple(obj)
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 729, in save_function_tuple
    save(state)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/databricks/python/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 819, in save_list
    self._batch_appends(obj)
  File "/databricks/python/lib/python3.7/pickle.py", line 843, in _batch_appends
    save(x)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 500, in save_function
    self.save_function_tuple(obj)
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 729, in save_function_tuple
    save(state)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/databricks/python/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 819, in save_list
    self._batch_appends(obj)
  File "/databricks/python/lib/python3.7/pickle.py", line 843, in _batch_appends
    save(x)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 500, in save_function
    self.save_function_tuple(obj)
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 729, in save_function_tuple
    save(state)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/databricks/python/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 819, in save_list
    self._batch_appends(obj)
  File "/databricks/python/lib/python3.7/pickle.py", line 846, in _batch_appends
    save(tmp[0])
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 500, in save_function
    self.save_function_tuple(obj)
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 729, in save_function_tuple
    save(state)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/databricks/python/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 819, in save_list
    self._batch_appends(obj)
  File "/databricks/python/lib/python3.7/pickle.py", line 846, in _batch_appends
    save(tmp[0])
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 500, in save_function
    self.save_function_tuple(obj)
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 729, in save_function_tuple
    save(state)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/databricks/python/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 819, in save_list
    self._batch_appends(obj)
  File "/databricks/python/lib/python3.7/pickle.py", line 846, in _batch_appends
    save(tmp[0])
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 495, in save_function
    self.save_function_tuple(obj)
  File "/databricks/spark/python/pyspark/cloudpickle.py", line 729, in save_function_tuple
    save(state)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/databricks/python/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/databricks/python/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/databricks/python/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/databricks/python/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/databricks/python/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/databricks/python/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/databricks/python/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/databricks/python/lib/python3.7/pickle.py", line 524, in save
    rv = reduce(self.proto)
  File "/databricks/spark/python/pyspark/context.py", line 356, in __getnewargs__
    "It appears that you are attempting to reference SparkContext from a broadcast "
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

For understanding a bit better what I am trying to do, let me give an example illustrating a possible use case :

Lets say given_df is a dataframe of sentences, where each sentence consist of some words separated by space.

given_df=spark.createDataframe([("The old brown fox",), ("jumps over",), ("the lazy log",)], schema=["SampleField"])

new_df is a dataframe consisting of each word at separate rows. So we will be processing each row of given_df and based on the words we get by splitting the row, we will be inserting each row into new_df.

new_df=spark.createDataFrame([("The",), ("old",), ("brown",), ("fox",), ("jumps",), ("over",), ("the",), ("lazy",), ("dog",)], schema=["SampleField"])

Upvotes: 2

Views: 2681

Answers (1)

egordoe
egordoe

Reputation: 958

You are trying to use DataFrame API on an executor which is not allowed and hence the PicklingError:

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.


You should rewrite your code. You can for example use RDD.flatMap or, if you prefer the DataFrame API, explode() function.

Here is how you do it with the latter approach:

given_df=spark.createDataFrame([("The old brown fox",), ("jumps over",), ("the lazy log",)], schema=["SampleField"])

from pyspark.sql.functions import udf, explode
from pyspark.sql.types import ArrayType, StringType

@udf(returnType=ArrayType(StringType()))
def getNewRowsAfterProcessingCurrentRow(str):
  return str.split()

new_df= given_df\
  .select(explode(getNewRowsAfterProcessingCurrentRow("SampleField")).alias("SampleField"))\
  .unionAll(given_df)

new_df.show()

  1. You wrap your getNewRowsAfterProcessingCurrentRow() in udf(). This just makes your function usable in DataFrame API.
  2. Then, you use your function wrapped in another function called explode(). This is required since you want to "explode" (or transpose) the split sentences to multiple rows, a word per row.
  3. Finally, you take the resulting DataFrame and union it with the original given_df.

The output:

+-----------------+
|      SampleField|
+-----------------+
|              The|
|              old|
|            brown|
|              fox|
|            jumps|
|             over|
|              the|
|             lazy|
|              log|
|The old brown fox|
|       jumps over|
|     the lazy log|
+-----------------+

Upvotes: 3

Related Questions