bluesummers
bluesummers

Reputation: 12627

Creating parquet Petastorm dataset through Spark fails with Overflow error (larger than 4GB)

I'm trying to implement Uber's Petastorm dataset creation which utilizes Spark to create a parquet file following the tutorial on their Github page.

The code:

spark = SparkSession.builder.config('spark.driver.memory', '10g').master('local[4]').getOrCreate()
sc = spark.sparkContext

with materialize_dataset(spark=spark, dataset_url='file:///opt/data/hello_world_dataset',
                         schema=MySchema, row_group_size_mb=256):

    logging.info('Building RDD...')
    rows_rdd = sc.parallelize(ids)\
        .map(row_generator)\  # Generator that yields lists of examples
        .flatMap(lambda x: dict_to_spark_row(MySchema, x))

    logging.info('Creating DataFrame...')
    spark.createDataFrame(rows_rdd, MySchema.as_spark_schema()) \
        .coalesce(10) \
        .write \
        .mode('overwrite') \
        .parquet('file:///opt/data/hello_world_dataset')

Now the RDD code executes successfully but fails only the .createDataFrame call with the following error:

_pickle.PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 4GiB

This is my first experience with Spark, so I can't really tell if this error originates in Spark or Petastorm.

Looking through other solutions to this error (in respect to Spark, not Petastorm) I saw that it might have to do with the pickling protocol, but I can't confirm that, neither did I find a way of altering the pickling protocol.

How could I avoid this error?

Upvotes: 1

Views: 1264

Answers (2)

lockwobr
lockwobr

Reputation: 1511

To build off bluesummers answer

The master branch of spark right now fixes this issue, so I it used this code to patch dump function in the same way but is just a bit more safe. [test with 2.3.2]

from pyspark import broadcast
from pyspark.cloudpickle import print_exec
import pickle

def broadcast_dump(self, value, f):
    try:
        pickle.dump(value, f, pickle.HIGHEST_PROTOCOL) 
    except pickle.PickleError:
        raise
    except Exception as e:
        msg = "Could not serialize broadcast: %s: %s" \
                % (e.__class__.__name__, _exception_message(e))
        print_exec(sys.stderr)
        raise pickle.PicklingError(msg)
    f.close()

broadcast.Broadcast.dump = broadcast_dump

Upvotes: 2

bluesummers
bluesummers

Reputation: 12627

The problem lies in the pickling that is done to pass the data between the different processes, the default pickling protocol is 2, and we need to use 4 in order to pass objects larger than 4GB.

To change the pickling protocol, before creation a Spark session, use the following code

from pyspark import broadcast
import pickle


def broadcast_dump(self, value, f):
    pickle.dump(value, f, 4)  # was 2, 4 is first protocol supporting >4GB
    f.close()

    return f.name


broadcast.Broadcast.dump = broadcast_dump

Upvotes: 2

Related Questions