Reputation: 63022
For a given DataFrame just before being save
'd to parquet
here is the schema: notice that the centroid0
is the first column and is StringType
:
However when saving the file using:
df.write.partitionBy(dfHolder.metadata.partitionCols: _*).format("parquet").mode("overwrite").save(fpath)
and with the partitionCols
as centroid0
:
then there is a (to me) surprising result:
centroid0
partition column has been moved to the end of the RowInteger
I confirmed the output path via println
:
path=/git/block/target/scala-2.11/test-classes/data/output/blocking/out//level1/clusters
And here is the schema upon reading back from the saved parquet
:
Why are those two modifications to the input schema occurring - and how can they be avoided - while still maintaining the centroid0
as a partitioning column?
Update A preferred answer should mention why /when the partitions were added to the end (vs the beginning) of the columns list. We need an understanding of the deterministic ordering.
In addition - is there any way to cause spark
to "change it's mind" on the inferred column types? I have had to change the partitions from 0
, 1
etc to c0
, c1
etc in order to get the inference to map to StringType
. Maybe that were required .. but if there were some spark setting to change the behavior that would make for an excellent answer.
Upvotes: 6
Views: 3323
Reputation: 93
You can actually pretty easily make use of ordering of the columns of a case class that holds the schema of your partitioned data. You will need to read the data from the path, inside which the partitioning columns are stored underneath to make Spark infer the values of these columns. Then simply apply re-ordering by using the case class schema with a statement like:
val encoder: Encoder[RecordType] = Encoders.product[RecordType]
spark.read
.schema(encoder.schema)
.format("parquet")
.option("mergeSchema", "true")
.load(myPath)
// reorder columns, since reading from partitioned data, the partitioning columns are put to end
.select(encoder.schema.fieldNames.head, encoder.schema.fieldNames.tail: _*)
.as[RecordType]
Upvotes: 1
Reputation: 25909
When you write.partitionBy(...)
Spark saves the partition field(s) as folder(s)
This is can be beneficial for reading data later as (with some file types, parquet included) it can optimize to read data just from partitions that you use (i.e. if you'd read and filter for centroid0==1 spark wouldn't read the other partitions
The effect of this is that the partition fields (centroid0
in your case) are not written into the parquet file only as folder names (centroid0=1
, centroid0=2
, etc.)
The side effect of these are 1. the type of the partition is inferred at run time (since the schema is not saved in the parquet) and in your case it happened that you only had integer values so it was inferred to integer.
The other side effect is that the partition field is added at the end/beginning of the schema as it reads the schema from the parquet files as one chunk and then it adds to that the partition field(s) as another (again, it is no longer part of the schema that is stored in the parquet)
Upvotes: 5
Reputation: 10406
The reason is in fact pretty simple. When you partition by a column, each partition can only contain one value of the said column. Therefore it is useless to actually write the same value everywhere in the file, and this is why Spark does not. When the file is read, Spark uses the information contained in the names of the files to reconstruct the partitioning column and it is put at the end of the schema. The type of the column is not stored, it is inferred when reading, hence the integer type in your case. NB: There is no particular reason as to why the column is added at the end. It could have been at the beginning. I guess it is just an arbitrary choice of implementation.
To avoid losing the type and the order of the columns, you could duplicate the partitioning column like this df.withColumn("X", 'YOUR_COLUMN).write.partitionBy("X").parquet("...")
.
You will waste space though. Also, spark uses the partitioning to optimize filters for instance. Don't forget to use the X column for filters after reading the data and not your column or Spark won't be able to perform any optimizations.
Upvotes: 0