SimbaPK
SimbaPK

Reputation: 596

spark streaming join kafka topics

We have two InputDStream from two Kafka topics, but we have to join the data of these two input together. The problem is that each InputDStream is processed independently, because of the foreachRDD, nothing can be returned, to join after.

  var Message1ListBuffer = new ListBuffer[Message1]
  var Message2ListBuffer = new ListBuffer[Message2]

    inputDStream1.foreachRDD(rdd => {
      if (!rdd.partitions.isEmpty) {
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.map({ msg =>
          val r = msg.value()
          val avro = AvroUtils.objectToAvro(r.getSchema, r)
          val messageValue = AvroInputStream.json[FMessage1](avro.getBytes("UTF-8")).singleEntity.get
          Message1ListBuffer = Message1FlatMapper.flatmap(messageValue)
          Message1ListBuffer
        })
        inputDStream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
    })


    inputDStream2.foreachRDD(rdd => {
      if (!rdd.partitions.isEmpty) {
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.map({ msg =>
          val r = msg.value()
          val avro = AvroUtils.objectToAvro(r.getSchema, r)
          val messageValue = AvroInputStream.json[FMessage2](avro.getBytes("UTF-8")).singleEntity.get
          Message2ListBuffer = Message1FlatMapper.flatmap(messageValue)
          Message2ListBuffer

        })
        inputDStream2.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
    })

I thought I could return Message1ListBuffer and Message2ListBuffer, turn them into dataframes and join them. But that does not work, and I do not think it's the best choice

From there, what is the way to return the rdd of each foreachRDD in order to make a join?

inputDStream1.foreachRDD(rdd => {

})


inputDStream2.foreachRDD(rdd => {

})

Upvotes: 0

Views: 621

Answers (1)

mrsrinivas
mrsrinivas

Reputation: 35424

Not sure about the Spark version you are using, with Spark 2.3+, it can be achieved directly.

With Spark >= 2.3

Subscribe to 2 topics you want to join

val ds1 = spark
  .readStream 
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("subscribe", "source-topic1")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load

val ds2 = spark
  .readStream 
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("subscribe", "source-topic2")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load

Format the subscribed messages in both streams

val stream1 = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

val stream2 = ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Join both the streams

resultStream = stream1.join(stream2)

more join operations here

Warning:

Delay records will not get a join match. Need to tweak buffer a bit. more information found here

Upvotes: 2

Related Questions