will
will

Reputation: 589

Spark Streaming appends to S3 as Parquet format, too many small partitions

I am building an app that uses Spark Streaming to receive data from Kinesis streams on AWS EMR. One of the goals is to persist the data into S3 (EMRFS), and for this I am using a 2 minutes non-overlapping window.

My approaches:

Kinesis Stream -> Spark Streaming with batch duration about 60 seconds, using a non-overlapping window of 120s, save the streamed data into S3 as:

val rdd1 = kinesisStream.map( rdd => /* decode the data */)
rdd1.window(Seconds(120), Seconds(120).foreachRDD { rdd =>
        val spark = SparkSession...
        import spark.implicits._
        // convert rdd to df
        val df = rdd.toDF(columnNames: _*)
        df.write.parquet("s3://bucket/20161211.parquet")
}

Here is what s3://bucket/20161211.parquet looks like after a while: Spark Streaming S3 Parquet

As you can see, lots of fragmented small partitions (which is horrendous for read performance)...the question is, is there any way to control the number of small partitions as I stream data into this S3 parquet file?

Thanks

What I am thinking to do, is to each day do something like this:

val df = spark.read.parquet("s3://bucket/20161211.parquet")
df.coalesce(4).write.parquet("s3://bucket/20161211_4parition.parquet")

where I kind of repartition the dataframe to 4 partitions and save them back....

It works, I feel that doing this every day is not elegant solution...

Upvotes: 2

Views: 2885

Answers (1)

Holden
Holden

Reputation: 7452

That's actually pretty close to what you want to do, each partition will get written out as an individual file in Spark. However coalesce is a bit confusing since it can (effectively) apply upstream of where the coalesce is called. The warning from the Scala doc is:

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
this may result in your computation taking place on fewer nodes than
you like (e.g. one node in the case of numPartitions = 1). To avoid this,
you can pass shuffle = true. This will add a shuffle step, but means the
current upstream partitions will be executed in parallel (per whatever
the current partitioning is).

In Dataset's its a bit easier to persist and count to do wide evaluation since the default coalesce function doesn't take repartition as a flag for input (although you could construct an instance of Repartition manually).

Another option is to have a second periodic batch job (or even a second streaming job) that cleans up/merges the results, but this can be a bit complicated as it introduces a second moving part to keep track of.

Upvotes: 4

Related Questions