arctic_Oak
arctic_Oak

Reputation: 1024

Why could streaming join of queries over kafka topics take so long?

I'm using Spark Structured Streaming and joining two streams from Kafka topics.

DAG for the job

I noticed that the streaming query takes around 15 seconds for each record. In the below screenshot, the stage id 2 takes 15s. Why could that be?

Time taken by each stage

The code is as follows:

  val kafkaTopic1 = "demo2"
  val kafkaTopic2 = "demo3"
  val bootstrapServer = "localhost:9092"

  val spark = SparkSession
    .builder
    .master("local")
    .getOrCreate

  import spark.implicits._

  val df1 = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServer)
    .option("subscribe", kafkaTopic1)
    .option("failOnDataLoss", false)
    .load

  val df2 = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServer)
    .option("subscribe", kafkaTopic2)
    .option("failOnDataLoss", false)
    .load

  val order_details = df1
    .withColumn(...)
    .select(...)

  val invoice_details = df2
    .withColumn(...)
    .where(...)

  order_details
    .join(invoice_details)
    .where(order_details.col("s_order_id") === invoice_details.col("order_id"))
    .select(...)
    .writeStream
    .format("console")
    .option("truncate", false)
    .start
    .awaitTermination()

Code-wise everything works fine. The only problem is the time to join the two streams. How could this query be optimised?

Upvotes: 1

Views: 94

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74619

It's fairly possible that the execution time is not satisfactory given the master URL, i.e. .master("local"). Change it to local[*] at the very least and you should find the join faster.

Upvotes: 1

Related Questions