vijay
vijay

Reputation: 1233

Joining two streams from same spark streaming dataset

Is joining two streams coming from the same input streaming dataset allowed in Spark structutred streaming (2.3)?

For e.g. in the below example query, two streams are joined. I get IllegalStateException in the Azure eventhub spark client.

Is this expected to work?

eventhubs = spark.readStream ... .createOrReplaceTempView("Input")

spark.sql("SELECT temperature, time, device,  category FROM Input").createOrReplaceTempView("devices1")

spark.sql("SELECT temperature, time, device,  category FROM Input").createOrReplaceTempView("devices2")

val d1 = spark.sql("SELECT * FROM devices1 WHERE device=0")
val d2 = spark.sql("SELECT * FROM devices2 WHERE device=1")


val output = d1.join(
                        d2,
                        expr("""
                          devices1.category = devices2.category AND
                          devices1.time >= devices2.time AND
                          devices1.time <= devices2.time + interval 1 seconds
                          """),
                        joinType = "inner"      
                       )

display(output)

Upvotes: 1

Views: 908

Answers (1)

Mikhail Dubkov
Mikhail Dubkov

Reputation: 1233

As far as I know, semi join on Spark Structured Streaming allowed, but only in Append output mode.

Here's example:

class ExampleTest extends SparkBaseSpec {

  import spark.implicits._

  private val data: DataFrame = spark.range(1, 5).toDF

  data.write.parquet("/tmp/streaming/")

  val readStr = spark.readStream.schema(data.schema).parquet("/tmp/streaming/")

  val df = readStr
    .select($"id".as("id1"))
    .where("id1<50")
    .join(readStr.select($"id".as("id2")).where("id2<50"), $"id1" === $"id2")

  df.printSchema()

  val stream = df.writeStream
    .option("checkpointLocation", "/tmp/spark-streaming-checkpoint")
    .format("console")
    .outputMode("append")
    .start

  spark.range(20, 25).toDF.write.mode("append").parquet("/tmp/streaming/")

  stream.awaitTermination(30000)

}

root
 |-- id1: long (nullable = false)
 |-- id2: long (nullable = false)

-------------------------------------------
Batch: 0
-------------------------------------------
+---+---+
|id1|id2|
+---+---+
|  1|  1|
|  3|  3|
|  2|  2|
|  4|  4|
+---+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+---+---+
|id1|id2|
+---+---+
| 22| 22|
| 21| 21|
| 23| 23|
| 20| 20|
| 24| 24|
+---+---+

By the way, you don't need create two temp views, just one is enough.

Hope it helps!

Upvotes: 1

Related Questions