Mark J Miller
Mark J Miller

Reputation: 4871

Why is partition key column missing from DataFrame

I have a job which loads a DataFrame object and then saves the data to parquet format using the DataFrame partitionBy method. Then I publish the paths created so a subsequent job can use the output. The paths in the output would look like this:

/ptest/_SUCCESS
/ptest/id=0
/ptest/id=0/part-00000-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=0/part-00001-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=0/part-00002-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=1
/ptest/id=1/part-00003-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=1/part-00004-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=1/part-00005-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=3
/ptest/id=3/part-00006-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet
/ptest/id=3/part-00007-942fb247-1fe4-4147-a41a-bc688f932862.snappy.parquet

When I receive new data it is appended to the dataset. The paths are published so jobs which depend on the data can just process the new data.

Here's a simplified example of the code:

>>> rdd = sc.parallelize([(0,1,"A"), (0,1,"B"), (0,2,"C"), (1,2,"D"), (1,10,"E"), (1,20,"F"), (3,18,"G"), (3,18,"H"), (3,18,"I")])
>>> df = sqlContext.createDataFrame(rdd, ["id", "score","letter"])
>>> df.show()
+---+-----+------+
| id|score|letter|
+---+-----+------+
|  0|    1|     A|
|  0|    1|     B|
|  0|    2|     C|
|  1|    2|     D|
|  1|   10|     E|
|  1|   20|     F|
|  3|   18|     G|
|  3|   18|     H|
|  3|   18|     I|
+---+-----+------+
>>> df.write.partitionBy("id").format("parquet").save("hdfs://localhost:9000/ptest")

The problem is when another job tries to read the file using the published paths:

>>> df2 = spark.read.format("parquet").schema(df2.schema).load("hdfs://localhost:9000/ptest/id=0/")
>>> df2.show()
+-----+------+
|score|letter|
+-----+------+
|    1|     A|
|    1|     B|
|    2|     C|
+-----+------+

As you can see the partition key is missing from the loaded dataset. If I were to publish a schema that jobs could use I can load the file using the schema. The file loads and the partition key exists, but the values are null:

>>> df2 = spark.read.format("parquet").schema(df.schema).load("hdfs://localhost:9000/ptest/id=0/")
>>> df2.show()
+----+-----+------+
|  id|score|letter|
+----+-----+------+
|null|    1|     A|
|null|    1|     B|
|null|    2|     C|
+----+-----+------+

Is there a way to make sure the partition keys are stored w/in the parquet data? I don't want to require other processes to parse the paths to get the keys.

Upvotes: 12

Views: 12395

Answers (2)

zero323
zero323

Reputation: 330113

In case like this you should provide basePath option:

(spark.read
    .format("parquet")
    .option("basePath", "hdfs://localhost:9000/ptest/")
    .load("hdfs://localhost:9000/ptest/id=0/"))

which points to the root directory of your data.

With basePath DataFrameReader will be aware of the partitioning and adjust schema accordingly.

Upvotes: 15

Pushkr
Pushkr

Reputation: 3619

If the other application is loading specific partition, which it looks like from load("hdfs://localhost:9000/ptest/id=0/") path, that application can tweak code to replace null with partition column value

part = 0 # partition to load 
df2 =spark.read.format("parquet")\
               .schema(df.schema)\
               .load("ptest/id="+str(part)).fillna(part,["id"])

That way, the output will be -

+---+-----+------+
| id|score|letter|
+---+-----+------+
|  0|    1|     A|
|  0|    1|     B|
|  0|    2|     C|
+---+-----+------+

Upvotes: 0

Related Questions