messenjah00
messenjah00

Reputation: 73

What is the effect of 'coalesce' before 'partitionBy' in this streaming query?

I have a streaming query (Spark Structured Streaming) that receives data from a Kafka topic (two partitions), like this:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "172.29.57.25:9092,172.29.57.30:9092")
  .option("subscribe", "mytopic")
  .load()
  .select(from_json(col("value").cast("string"), schema).as("record")).select("record.*")

I want to perform a simple aggregation and partition by date/hour, and the save to Parquet files in HDFS, like this:

val aggregationQuery = df.withColumn("ROP", from_unixtime((col("attributes.START_TIME")/1000), "yyyy-MM-dd HH:mm").cast(TimestampType))
.withColumn("date", to_date(col("ROP")))
.withColumn("hour", hour(col("ROP")))
.withColumn("timestamp", current_timestamp())
.withWatermark("timestamp", "0 minutes")
.groupBy(window(col("timestamp"), "10 seconds"), col("date"), col("hour"))
.agg(count("attributes.RECORDID").as("NumRecords"))
.coalesce(2)

Output to Parquet:

aggregationQuery.writeStream
.format("parquet")
.trigger(Trigger.ProcessingTime("10 seconds"))
.partitionBy("date", "hour")
.option("path", "hdfs://cloudera-cluster:8020/user/spark/proyecto1")
.option("checkpointLocation", "hdfs://cloudera-cluster:8020/user/spark/checkpointfolder")
.outputMode("append")
.start()

As a result, I am getting a folder structure similar to this example:

         user/spark/proyecto1/date=2015-08-18/hour=20

Inside each folder I am getting 2 Parquet files per Trigger during the streaming process.

I would like to understand what 'coalesce' and 'partitionBy' operations are doing with my data, and also any risks associated to this particular combination.

By the way, I have only 2 nodes in my cluster.

Upvotes: 2

Views: 7012

Answers (2)

Kiran Balakrishnan
Kiran Balakrishnan

Reputation: 257

Coalesce: it reduces the number of partitions.In this case if the n was the default number of partitions it reduces it to 2.It combines all the partitions blindly in each of your node into a single partition resulting in 2.This might be the reason you are getting 2 files in the folder.

When you use partition by it creates n number of partitions based on the values you have in the column.Much like each unique key goes into a respective partition.If not used properly you might end up have a large number of partition that would create a overhead in the two node cluster

Upvotes: 3

Alper t. Turker
Alper t. Turker

Reputation: 35229

  • coalesce reduces parallelism for the complete Pipeline to 2. Since it doesn't introduce analysis barrier it propagates back, so in practice it might be better to replace it with repartition.
  • partitionBy creates a directory structure you see, with values encoded in the path. It removes corresponding columns from the leaf files. Because dates and hours have low cardinality there is no particular risk in this case.

Combined these two create observed directory structure and limit a number of files in each leaf directory to at most two.

Upvotes: 3

Related Questions