Reputation: 12627
I'm trying to implement Uber's Petastorm dataset creation which utilizes Spark to create a parquet file following the tutorial on their Github page.
The code:
spark = SparkSession.builder.config('spark.driver.memory', '10g').master('local[4]').getOrCreate()
sc = spark.sparkContext
with materialize_dataset(spark=spark, dataset_url='file:///opt/data/hello_world_dataset',
schema=MySchema, row_group_size_mb=256):
logging.info('Building RDD...')
rows_rdd = sc.parallelize(ids)\
.map(row_generator)\ # Generator that yields lists of examples
.flatMap(lambda x: dict_to_spark_row(MySchema, x))
logging.info('Creating DataFrame...')
spark.createDataFrame(rows_rdd, MySchema.as_spark_schema()) \
.coalesce(10) \
.write \
.mode('overwrite') \
.parquet('file:///opt/data/hello_world_dataset')
Now the RDD code executes successfully but fails only the .createDataFrame
call with the following error:
_pickle.PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 4GiB
This is my first experience with Spark, so I can't really tell if this error originates in Spark or Petastorm.
Looking through other solutions to this error (in respect to Spark, not Petastorm) I saw that it might have to do with the pickling protocol, but I can't confirm that, neither did I find a way of altering the pickling protocol.
How could I avoid this error?
Upvotes: 1
Views: 1264
Reputation: 1511
To build off bluesummers answer
The master branch of spark right now fixes this issue, so I it used this code to patch dump function in the same way but is just a bit more safe. [test with 2.3.2]
from pyspark import broadcast
from pyspark.cloudpickle import print_exec
import pickle
def broadcast_dump(self, value, f):
try:
pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
except pickle.PickleError:
raise
except Exception as e:
msg = "Could not serialize broadcast: %s: %s" \
% (e.__class__.__name__, _exception_message(e))
print_exec(sys.stderr)
raise pickle.PicklingError(msg)
f.close()
broadcast.Broadcast.dump = broadcast_dump
Upvotes: 2
Reputation: 12627
The problem lies in the pickling that is done to pass the data between the different processes, the default pickling protocol is 2, and we need to use 4 in order to pass objects larger than 4GB.
To change the pickling protocol, before creation a Spark session, use the following code
from pyspark import broadcast
import pickle
def broadcast_dump(self, value, f):
pickle.dump(value, f, 4) # was 2, 4 is first protocol supporting >4GB
f.close()
return f.name
broadcast.Broadcast.dump = broadcast_dump
Upvotes: 2