jph
jph

Reputation: 2233

parquet file size, firehose vs. spark

I'm generating Parquet files via two methods: a Kinesis Firehose and a Spark job. They are both written into the same partition structure on S3. Both sets of data can be queried using the same Athena table definition. Both use gzip compression.

I'm noticing, however, that the Parquet files generated by Spark are about 3x as large as those from Firehose. Any reason this should be the case? I do notice some schema and metadata differences when I load them using Pyarrow:

>>> import pyarrow.parquet as pq
>>> spark = pq.ParquetFile('<spark object name>.gz.parquet')
>>> spark.metadata
<pyarrow._parquet.FileMetaData object at 0x101f2bf98>
  created_by: parquet-mr version 1.8.3 (build aef7230e114214b7cc962a8f3fc5aeed6ce80828)
  num_columns: 4
  num_rows: 11
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 1558
>>> spark.schema
<pyarrow._parquet.ParquetSchema object at 0x101f2f438>
uri: BYTE_ARRAY UTF8
dfpts.list.element: BYTE_ARRAY UTF8
udids.list.element: BYTE_ARRAY UTF8
uuids.list.element: BYTE_ARRAY UTF8

>>> firehose = pq.ParquetFile('<firehose object name>.parquet')
>>> firehose.metadata
<pyarrow._parquet.FileMetaData object at 0x10fc63458>
  created_by: parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
  num_columns: 4
  num_rows: 156
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 1017
>>> firehose.schema
<pyarrow._parquet.ParquetSchema object at 0x10fc5e7b8>
udids.bag.array_element: BYTE_ARRAY UTF8
dfpts.bag.array_element: BYTE_ARRAY UTF8
uuids.bag.array_element: BYTE_ARRAY UTF8
uri: BYTE_ARRAY UTF8

Is it likely that the schema difference is the culprit? Something else?

These two specific files don't contain the exact same data, but based on my Athena queries the total cardinality of all lists for all rows in the Firehose file is roughly 2.5x what's in the Spark file.

EDITED TO ADD:

I wrote the following to essentially dump the contents of each parquet file to stdout one row per line:

import sys
import pyarrow.parquet as pq

table = pq.read_table(sys.argv[1])
pydict = table.to_pydict()
for i in range(0, table.num_rows):
    print(f"{pydict['uri'][i]}, {pydict['dfpts'][i]}, {pydict['udids'][i]}, {pydict['uuids'][i]}")

I then ran that against each parquet file and piped the output to a file. Here are the sizes of the original two files, the output of pointing the above python code at each file, and the gzipped version of that output:

-rw-r--r--  1 myuser  staff  1306337 Jun 28 16:19 firehose.parquet
-rw-r--r--  1 myuser  staff  8328156 Jul  2 15:09 firehose.printed
-rw-r--r--  1 myuser  staff  5009543 Jul  2 15:09 firehose.printed.gz
-rw-r--r--  1 myuser  staff  1233761 Jun 28 16:23 spark.parquet
-rw-r--r--  1 myuser  staff  3213528 Jul  2 15:09 spark.printed
-rw-r--r--  1 myuser  staff  1951058 Jul  2 15:09 spark.printed.gz

Notice that the two parquet files are approximately the same size, but the "printed" content of the firehose file is approximately 2.5x the size of the "printed" content from the spark file. And they're about equally compressible.

So: what is taking up all the space in the Spark parquet file if it's not the raw data?

EDITED TO ADD:

Below is the output from "parquet-tools meta". The compression ratios for each column look similar, but the firehose file contains many more values per uncompressed byte. For the "dfpts" column:

firehose:

SZ:667849/904992/1.36 VC:161475

spark:

SZ:735561/1135861/1.54 VC:62643

parquet-tools meta output:

file:            file:/Users/jh01792/Downloads/firehose.parquet 
creator:         parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 

file schema:     hive_schema 
--------------------------------------------------------------------------------
udids:           OPTIONAL F:1 
.bag:            REPEATED F:1 
..array_element: OPTIONAL BINARY L:STRING R:1 D:3
dfpts:           OPTIONAL F:1 
.bag:            REPEATED F:1 
..array_element: OPTIONAL BINARY L:STRING R:1 D:3
uuids:           OPTIONAL F:1 
.bag:            REPEATED F:1 
..array_element: OPTIONAL BINARY L:STRING R:1 D:3
uri:             OPTIONAL BINARY L:STRING R:0 D:1

row group 1:     RC:156 TS:1905578 OFFSET:4 
--------------------------------------------------------------------------------
udids:           
.bag:            
..array_element:  BINARY GZIP DO:0 FPO:4 SZ:421990/662241/1.57 VC:60185 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 58, min/max not defined]
dfpts:           
.bag:            
..array_element:  BINARY GZIP DO:0 FPO:421994 SZ:667849/904992/1.36 VC:161475 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 53, min/max not defined]
uuids:           
.bag:            
..array_element:  BINARY GZIP DO:0 FPO:1089843 SZ:210072/308759/1.47 VC:39255 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 32, min/max not defined]
uri:              BINARY GZIP DO:0 FPO:1299915 SZ:5397/29586/5.48 VC:156 ENC:BIT_PACKED,RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]

file:        file:/Users/jh01792/Downloads/spark.parquet 
creator:     parquet-mr version 1.8.3 (build aef7230e114214b7cc962a8f3fc5aeed6ce80828) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"uri","type":"string","nullable":false,"metadata":{}},{"name":"dfpts","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"udids","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"uuids","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
uri:         REQUIRED BINARY L:STRING R:0 D:0
dfpts:       OPTIONAL F:1 
.list:       REPEATED F:1 
..element:   OPTIONAL BINARY L:STRING R:1 D:3
udids:       OPTIONAL F:1 
.list:       REPEATED F:1 
..element:   OPTIONAL BINARY L:STRING R:1 D:3
uuids:       OPTIONAL F:1 
.list:       REPEATED F:1 
..element:   OPTIONAL BINARY L:STRING R:1 D:3

row group 1: RC:11 TS:1943008 OFFSET:4 
--------------------------------------------------------------------------------
uri:          BINARY GZIP DO:0 FPO:4 SZ:847/2530/2.99 VC:11 ENC:PLAIN,BIT_PACKED ST:[num_nulls: 0, min/max not defined]
dfpts:       
.list:       
..element:    BINARY GZIP DO:0 FPO:851 SZ:735561/1135861/1.54 VC:62643 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]
udids:       
.list:       
..element:    BINARY GZIP DO:0 FPO:736412 SZ:335289/555989/1.66 VC:23323 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]
uuids:       
.list:       
..element:    BINARY GZIP DO:0 FPO:1071701 SZ:160494/248628/1.55 VC:13305 ENC:RLE,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]

Upvotes: 5

Views: 1304

Answers (2)

rluta
rluta

Reputation: 6897

You should probably frame your question differently:

Why is the compression of the Firehose data more efficient than the Spark data ?

You have several possible explanations for this in Parquet:

  • Different column values cardinality

    In addition to a compression scheme, Parquet tries to use the most efficient encoding for your values. Especially for BYTE_ARRAY, it will try by default to use a dictionary encoding, i.e map each distinct BYTE_ARRAY value to an int and then simply store the ints in the column data (more info here). If the dictionary grows too big, it will fallback to simply store the BYTE_ARRAY values.

    If your Firehose dataset contain a lot less diversity in values than your Spark dataset, one may be using an efficient dictionary encoding and the other not.

  • Sorted data

    Sorted data typically compresses much better than unsorted data so if your Firehose columns values are naturally sorted (or at least more often repeating), the parquet encoding and gzip compression will achieve a much better compression ratio

  • Different row group size

    Parquet splits values in row groups with adjustable size (parquet.block.size config in Spark). The compression and encoding are applied at the row group level so the bigger the row group the better the compression but possibly worse encoding (you may switch from a dictionary encoding to plain byte_array values for example) and higher memory requirements when reading or writing.

How to find out what happens in your case ?

Use parquet-tools to inspect the detailed encoding data for your columns:

For example on one of my datasets:

$ parquet-tools meta part-00015-6a77dcbe-3edd-4199-bff0-efda0f512d61.c000.snappy.parquet

...

row group 1:              RC:63076 TS:41391030 OFFSET:4
--------------------------------------------------------------------------------
options:
.list:
..element:                 BINARY SNAPPY DO:0 FPO:6042924 SZ:189370/341005/1,80 VC:269833 ENC:RLE,PLAIN_DICTIONARY ST:[no stats for this column]

...

row group 2:              RC:28499 TS:14806649 OFFSET:11648146
--------------------------------------------------------------------------------
options:
.list:
..element:                 BINARY SNAPPY DO:0 FPO:13565454 SZ:78631/169832/2,16 VC:144697 ENC:RLE,PLAIN_DICTIONARY ST:[no stats for this column]

The ENC attribute on the column data give you the encoding used for the column (DICTIONARY in this case) the SZ attribute gives you compressed size/uncompressed size/compression ratio and VC the number of values encoded.

You can see in my example that compression ratio is slightly better in row group 2 than in row group 1 simply because of data distribution.

Update:

Looking at the stats your provide, you can see that the dfpts column in your dataset has an average encoded value size of 904992/161475 = 5.6 bytes whereas the spark version has 1135861/62643 = 18.13 bytes even though both are the same dictionary encoding. This likely means the RLE is much more efficient on your firehose dataset because you have a lot of repeating values or a lot less distinct values. If you sort your dfpts column in spark before saving to parquet, you're likely to achieve similar encoding ratios to your firehose data.

Upvotes: 2

DaRkMaN
DaRkMaN

Reputation: 1054

Two things that I can think of than could attribute to the difference.
1. Parquet properties.
In Spark, you could find all the properties related to Parquet using the following snippets.
If properties were set using Hadoop configs,

import scala.collection.JavaConverters._

// spark = SparkSsssion
spark.sparkContext.hadoopConfiguration.asScala.filter {
  x =>
    x.getKey.contains("parquet")
}.foreach(println)

If properties, were set using Spark(spark-defaults.conf, --conf etc)

spark.sparkContext.getConf.getAll.filter {
  case(key, value) => key.contains("parquet")
}.foreach(println)

If we are able to get the firehose(which I am not familiar) configs as well, we could do a comparison. Otherwise also the configs should give a general idea of what could be wrong.
2. Difference in parquet versions used between Spark and FireHose.
The Parquet community could have changed the defaults of the parquet configs between versions.

Upvotes: 0

Related Questions