Bernhard
Bernhard

Reputation: 65

Joining Kafka and Cassandra DataFrames in Spark Streaming ignores C* predicate pushdown

Intent

I'm receiving data from Kafka via direct stream and would like to enrich the messages with data from Cassandra. The Kafka messages (Protobufs) are decoded into DataFrames and then joined with a (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka) streaming batch size to raw C* data is [several streaming messages to millions of C* rows], BUT the join always yields exactly ONE result [1:1] per message. After the join the resulting DF is eventually stored to another C* table.

Problem

Even though I'm joining the two DFs on the full Cassandra primary key and pushing the corresponding filter to C*, it seems that Spark is loading the whole C* data-set into memory before actually joining (which I'd like to prevent by using the filter/predicate pushdown). This leads to a lot of shuffling and tasks being spawned, hence the "simple" join takes forever...

def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("test")      
      .set("spark.cassandra.connection.host", "xxx")
      .set("spark.cassandra.connection.keep_alive_ms", "30000")
      .setMaster("local[*]")
      
    val ssc = new StreamingContext(conf, Seconds(10))
    ssc.sparkContext.setLogLevel("INFO")
    
    // Initialise Kafka
    val kafkaTopics = Set[String]("xxx")
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
      "auto.offset.reset" -> "smallest")
    
    // Kafka stream
    val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)      
    
    // Executed on the driver
    messages.foreachRDD { rdd =>
      
      // Create an instance of SQLContext
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._
      
      // Map MyMsg RDD
      val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
      
      // Convert RDD[MyMsg] to DataFrame
      val MyMsgDf = MyMsgRdd.toDF()        
        .select(
            $"prim1Id" as 'prim1_id,
            $"prim2Id" as 'prim2_id,
            $...
      )
      
      // Load DataFrame from C* data-source
      val base_data = base_data_df.getInstance(sqlContext)    
      
      // Left join on prim1Id and prim2Id
      val joinedDf = MyMsgDf.join(base_data,
            MyMsgDf("prim1_id") === base_data("prim1_id") &&
            MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
            .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
                && base_data("prim2_id").isin(MyMsgDf("prim2_id")))          
                
      joinedDf.show()
      joinedDf.printSchema()
      
      // Select relevant fields
            
      // Persist
    }
    
    // Start the computation
    ssc.start()
    ssc.awaitTermination()
}

Environment

SOLUTION

From discussions on the DataStax Spark Connector for Apache Cassandra ML

I've learned the following:

Quoting Russell Spitzer

  1. This wouldn't be a case of predicate pushdown. This is a join on a partition key column. Currently only joinWithCassandraTable supports this direct kind of join although we are working on some methods to try to have this automatically done within Spark.

  2. Dataframes can be created from any RDD which can have a schema applied to it. The easiest thing to do is probably to map your joinedRDD[x,y] to Rdd[JoinedCaseClass] and then call toDF (which will require importing your sqlContext implicits.) See the DataFrames documentation here for more info.

So the actual implementation now resembles something like

// Join myMsg RDD with myCassandraTable
val joinedMsgRdd = myMsgRdd.joinWithCassandraTable(
  "keyspace",
  "myCassandraTable",
  AllColumns,
  SomeColumns(
      "prim1_id",
      "prim2_id"
  )
).map{case (myMsg, cassandraRow) => 

  JoinedMsg(
    foo = myMsg.foo
    bar = cassandraRow.bar
  )
}

// Convert RDD[JoinedMsg] to DataFrame
val myJoinedDf = joinedMsgRdd.toDF()

Upvotes: 1

Views: 1121

Answers (1)

vgkowski
vgkowski

Reputation: 519

Have you tried joinWithCassandraTable ? It should pushdown to C* all keys you are looking for...

Upvotes: 1

Related Questions