gogocatmario
gogocatmario

Reputation: 70

How to extract each JSONobject from JSONArray and save to cassandra in spark streaming

I'm trying to get kafka streaming data which is JSONArray in spark streaming, each JSONArray contain several JSONObject.

I want to save each JSONObject into datadrames, and save to cassandra table after mapping with the other table.

I've tried to create dataframe to save JSONObject, but when I create dataframe in stream.foreachRDD, it throws out NullPointerException. Is it because spark doesn't support nested RDD? If so, how do I save JSONObject to cassandra?

Data format is as below:

[  
   {  
      "temperature":"21.8",
      "humidity":"65.6",
      "creatime":"2016-11-14 13:50:24",
      "id":"3303136",
      "msgtype":"th",
      "sensorID":"001"
   },
   {  
      "temperature":"23.1",
      "humidity":"60.6",
      "creatime":"2016-11-14 13:50:24",
      "id":"3303137",
      "msgtype":"th",
      "sensorID":"002"
   }
]

My Code:

import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import com.datastax.spark.connector.mapper.DefaultColumnMapper
import com.datastax.spark.connector._

import org.apache.spark.SparkConf
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._
import net.sf.json.JSONObject
import net.sf.json.JSONArray

object getkafkadata {

  def main(args: Array[String]) {

    val cassandraHostIP = "10.2.1.67"
    val keyspaceToGet = "iot_test"

    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("PageViewStream")
      .set("spark.driver.allowMultipleContexts", "true")
      .set("spark.cassandra.connection.host", cassandraHostIP)
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    val sqc = new SQLContext(sc)

    val sqlContext = SQLContextSingleton.getInstance(sc)
    import sqlContext.implicits._

    val cc = new CassandraSQLContext(sc)
    cc.setKeyspace(keyspaceToGet)

    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "10.2.1.67:6667",
      "group.id" -> "a13",
      "auto.offset.reset" -> "smallest")

    val topics = Set("test1208")
    println("kafkaParams=", kafkaParams, "topics=", topics)

    val offsetsList = 0
    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    println("Line3 good!")

    println("Start to parse json...")

    val datas = stream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
        partitionOfRecords.foreach(line => {
          val event = JSONArray.fromObject(line._2)
          for (n <- 0 to event.size() - 1) {
            val eventobj = event.getJSONObject(n)

            println("======= Message =======")
            println(eventobj.toString())

            //data lost exception handling
            var sensorID = "no_data"
            var humidity = "0"
            var temperature = "0"
            var msgtype = "no_data"
            var creatime = "0"
            var id = "no_data"

            if (eventobj.has("sensorID"))
              sensorID = eventobj.getString("sensorID")
            if (eventobj.has("humidity"))
              humidity = eventobj.getString("humidity")
            if (eventobj.has("temperature"))
              temperature = eventobj.getString("temperature")
            if (eventobj.has("msgtype"))
              msgtype = eventobj.getString("msgtype")
            if (eventobj.has("creatime"))
              creatime = eventobj.getString("creatime")
            if (eventobj.has("id"))
              id = eventobj.getString("id")

            var df = cc.createDataFrame(Seq(
              (sensorID, humidity, temperature, msgtype, creatime, id)))
              .toDF("sensorID", "humidity", "temperature", "msgtype", "creatime", "id")

            println("==========df create done=========")
            df.show()

          }
        })
      })
    })
ssc.start()
ssc.awaitTermination()

}

Exception Message:

16/12/12 09:28:35 ERROR JobScheduler: Error running job streaming job 1481506110000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException
    at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
    at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573)
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
    at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:75)
    at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:74)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
    at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573)
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    ... 3 more
16/12/12 09:28:35 INFO DAGScheduler: ResultStage 1 (foreachPartition at getkafkadata.scala:75) finished in 0.063 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException
    at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
    at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573)
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
    at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:75)
    at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:74)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
    at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573)
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76)
    at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    ... 3 more
16/12/12 09:28:35 INFO DAGScheduler: Job 1 finished: foreachPartition at getkafkadata.scala:75, took 0.098511 s

Upvotes: 2

Views: 1336

Answers (1)

maasg
maasg

Reputation: 37435

It's not possible to create a dataframe within an RDD closure. Dataframes operations do not make sense at the executor level.

Instead, we transform the data in the RDD in the desired format and do the dataframe operations at the driver level.

eg.: partial code to illustrate the structural changes. Note how the rdd data is transformed first and then converted into a dataframe in the driver.

val datas = stream.foreachRDD{rdd => 
      val parsedData = rdd.flatMap{record => 
            val events = JSONArray.fromObject(record._2)
            events.map(json => // parse + transform each entry into Record)
          }
      val df = cc.createDataFrame(parsedData)
      // write to Cassandra
      df.write
        .format("org.apache.spark.sql.cassandra")
        .options(Map( "table" -> "sensordata", "keyspace" -> "iot"))
        .save()
}

Upvotes: 1

Related Questions