Reputation: 596
We have two InputDStream
from two Kafka topics, but we have to join the data of these two input together.
The problem is that each InputDStream
is processed independently, because of the foreachRDD
, nothing can be returned, to join
after.
var Message1ListBuffer = new ListBuffer[Message1]
var Message2ListBuffer = new ListBuffer[Message2]
inputDStream1.foreachRDD(rdd => {
if (!rdd.partitions.isEmpty) {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.map({ msg =>
val r = msg.value()
val avro = AvroUtils.objectToAvro(r.getSchema, r)
val messageValue = AvroInputStream.json[FMessage1](avro.getBytes("UTF-8")).singleEntity.get
Message1ListBuffer = Message1FlatMapper.flatmap(messageValue)
Message1ListBuffer
})
inputDStream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
})
inputDStream2.foreachRDD(rdd => {
if (!rdd.partitions.isEmpty) {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.map({ msg =>
val r = msg.value()
val avro = AvroUtils.objectToAvro(r.getSchema, r)
val messageValue = AvroInputStream.json[FMessage2](avro.getBytes("UTF-8")).singleEntity.get
Message2ListBuffer = Message1FlatMapper.flatmap(messageValue)
Message2ListBuffer
})
inputDStream2.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
})
I thought I could return Message1ListBuffer and Message2ListBuffer, turn them into dataframes and join them. But that does not work, and I do not think it's the best choice
From there, what is the way to return the rdd of each foreachRDD in order to make a join?
inputDStream1.foreachRDD(rdd => {
})
inputDStream2.foreachRDD(rdd => {
})
Upvotes: 0
Views: 621
Reputation: 35424
Not sure about the Spark version you are using, with Spark 2.3+, it can be achieved directly.
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("subscribe", "source-topic1")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load
val ds2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("subscribe", "source-topic2")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load
val stream1 = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val stream2 = ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
resultStream = stream1.join(stream2)
more join operations here
Warning:
Delay records will not get a join match. Need to tweak buffer a bit. more information found here
Upvotes: 2