Reputation: 7179
I am trying to save a Spark DataFrame to a Hive table (Parquet) with .saveAsTable()
in pySpark, but keep running in to memory issues like below:
org.apache.hadoop.hive.ql.metadata.HiveException: parquet.hadoop.MemoryManager$1:
New Memory allocation 1034931 bytes is smaller than the minimum allocation size of 1048576 bytes.
The first number (1034931
) generally keeps changing in different runs. I recognize the second number (1048576
) is 1024^2
, but I have little idea what that means here.
I have been using the exact same technique for a few other of my projects (with much larger DataFrames), and it has worked without issue. Here I have essentially copy-pasted the structure of the process and configuration but runs in to the memory problem! It must be something trivial I am missing.
The Spark DataFrame (let's call it sdf
) has the structure (~10 columns and ~300k rows, but could be more if this runs correctly):
+----------+----------+----------+---------------+---------------+
| col_a_str| col_b_num| col_c_num|partition_d_str|partition_e_str|
+----------+----------+----------+---------------+---------------+
|val_a1_str|val_b1_num|val_c1_num| val_d1_str| val_e1_str|
|val_a2_str|val_b2_num|val_c2_num| val_d2_str| val_e2_str|
| ...| ...| ...| ...| ...|
+----------+----------+----------+---------------+---------------+
The Hive table was created like this:
sqlContext.sql("""
CREATE TABLE IF NOT EXISTS my_hive_table (
col_a_str string,
col_b_num double,
col_c_num double
)
PARTITIONED BY (partition_d_str string,
partition_e_str string)
STORED AS PARQUETFILE
""")
The attempt at inserting data to this table is with the following command:
sdf.write \
.mode('append') \
.partitionBy('partition_d_str', 'partition_e_str') \
.saveAsTable('my_hive_table')
The Spark/Hive configuration is like this:
spark_conf = pyspark.SparkConf()
spark_conf.setAppName('my_project')
spark_conf.set('spark.executor.memory', '16g')
spark_conf.set('spark.python.worker.memory', '8g')
spark_conf.set('spark.yarn.executor.memoryOverhead', '15000')
spark_conf.set('spark.dynamicAllocation.maxExecutors', '64')
spark_conf.set('spark.executor.cores', '4')
sc = pyspark.SparkContext(conf=spark_conf)
sqlContext = pyspark.sql.HiveContext(sc)
sqlContext.setConf('hive.exec.dynamic.partition', 'true')
sqlContext.setConf('hive.exec.max.dynamic.partitions', '5000')
sqlContext.setConf('hive.exec.dynamic.partition.mode', 'nonstrict')
sqlContext.setConf('hive.exec.compress.output', 'true')
I have tried changing the .partitionBy('partition_d_str', 'partition_e_str')
to .partitionBy(['partition_d_str', 'partition_e_str'])
, increasing memory, splitting the DataFrame to smaller chunks, re-creating the tables and DataFrame, but nothing seems to work. I can't find any solutions online either. What would be causing the memory error (I don't fully understand where it's coming from either), and how can I change my code to write to the Hive table? Thanks.
Upvotes: 4
Views: 3277
Reputation: 7179
It turns out I was partitioning with a nullable field that was throwing the .saveAsTable()
off. When I was converting the RDD to a Spark DataFrame, the schema I was providing was generated like this:
from pyspark.sql.types import *
# Define schema
my_schema = StructType(
[StructField('col_a_str', StringType(), False),
StructField('col_b_num', DoubleType(), True),
StructField('col_c_num', DoubleType(), True),
StructField('partition_d_str', StringType(), False),
StructField('partition_e_str', StringType(), True)])
# Convert RDD to Spark DataFrame
sdf = sqlContext.createDataFrame(my_rdd, schema=my_schema)
Since partition_e_str
was declared as nullable=True
(the third argument for that StructField
), it had issues when writing to the Hive table because it was being used as one of the partitioning fields. I changed it to:
# Define schema
my_schema = StructType(
[StructField('col_a_str', StringType(), False),
StructField('col_b_num', DoubleType(), True),
StructField('col_c_num', DoubleType(), True),
StructField('partition_d_str', StringType(), False),
StructField('partition_e_str', StringType(), False)])
and all was well again!
Lesson: Make sure your partitioning fields are not nullable!
Upvotes: 3