thedataguy
thedataguy

Reputation: 85

Spark Streaming - Dstream messages in json format to DataFrame

I'm trying to read Kafka topics through Apache Spark Streaming and am not able to figure out how to transform the data in DStream to DataFrame and then store in a temp table. The messages in Kafka are in Avro format, which were created by Kafka JDBC Connect from a database. I have the below code, which works fine until it executes the spark.read.json to read the json to dataframe.

package consumerTest


import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._

import scala.util.parsing.json.{JSON, JSONObject}

object Consumer {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder
      .master("local")
      .appName("my-spark-app")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate();

    import spark.implicits._


    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "<kafka-server>:9092",
      "key.deserializer" -> classOf[KafkaAvroDeserializer],
      "value.deserializer" -> classOf[KafkaAvroDeserializer],
      "group.id" -> "sakwq",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> "false",
      "schema.registry.url" -> "http://<schema-registry>:8181"
    )

    val topics = Array("cdcemployee")

    val stream = KafkaUtils.createDirectStream[String, Object](
      ssc,
      PreferConsistent,
      Subscribe[String, Object](topics, kafkaParams)
    )


    val data = stream.map(record => {
      println(record.value.toString())
      record.value
      val df = spark.read.json(record.value.toString())

    })


    data.print();



    ssc.start()
    ssc.awaitTermination()
  }


}

I am getting a Null pointer exception when executing the line val df = spark.read.json(record.value.toString())

18/05/10 09:49:11 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:135)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:133)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:689)
    at org.apache.spark.sql.SparkSession.read(SparkSession.scala:645)
    at consumerTest.Consumer$.$anonfun$main$1(Consumer.scala:63)
    at consumerTest.Consumer$.$anonfun$main$1$adapted(Consumer.scala:60)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/05/10 09:49:11 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:135)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:133)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:689)
    at org.apache.spark.sql.SparkSession.read(SparkSession.scala:645)
    at consumerTest.Consumer$.$anonfun$main$1(Consumer.scala:63)
    at consumerTest.Consumer$.$anonfun$main$1$adapted(Consumer.scala:60)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Also, here is the sample data that gets printed when executing the statement println(record.value.toString()) if I remove the spark.read.json statement

    {"CDCTRANSACTIONID": 182241, "CDCTIMESTAMP": "2018-03-26 18:04:44:776 - 04:00", "CDCCHANGESEQ": 14, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Francisco", "State": null, "PostalCode": null, "DeptCode": "300", "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 182241, "CDCTIMESTAMP": "2018-03-26 18:04:44:776 - 04:00", "CDCCHANGESEQ": 14, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "Raleigh", "State": null, "PostalCode": null, "DeptCode": "", "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 197086, "CDCTIMESTAMP": "2018-03-27 11:18:48:022 - 04:00", "CDCCHANGESEQ": 15, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "New York", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 197086, "CDCTIMESTAMP": "2018-03-27 11:18:48:022 - 04:00", "CDCCHANGESEQ": 15, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Francisco", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 363712, "CDCTIMESTAMP": "2018-04-04 15:30:46:551 - 04:00", "CDCCHANGESEQ": 16, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Diego", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 363712, "CDCTIMESTAMP": "2018-04-04 15:30:46:551 - 04:00", "CDCCHANGESEQ": 16, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "New York", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 363785, "CDCTIMESTAMP": "2018-04-04 15:35:11:492 - 04:00", "CDCCHANGESEQ": 17, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 2, "EmpNum": 57, "LastName": "bobba2s", "FirstName": "Saikrishna Teja", "Address": "9220 Bothwell St", "Address2": "", "City": "San Diego", "State": "NC", "PostalCode": "27617", "DeptCode": "300", "Position": "", "HomePhone": "919 931-5737", "WorkPhone": "919 931-5737", "VacationDaysLeft": 10, "SickDaysLeft": 5, "StartDate": 16979, "Birthdate": 7270}
{"CDCTRANSACTIONID": 364688, "CDCTIMESTAMP": "2018-04-04 16:39:05:602 - 04:00", "CDCCHANGESEQ": 18, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 1, "EmpNum": 59, "LastName": "Bobba", "FirstName": "Saikrishna Teja", "Address": "9220 Bothwell St", "Address2": "", "City": "Raleigh", "State": "NC", "PostalCode": "27617", "DeptCode": "300", "Position": "", "HomePhone": "919 931-5737", "WorkPhone": "919 931-5737", "VacationDaysLeft": 10, "SickDaysLeft": 5, "StartDate": 16979, "Birthdate": 7270}
{"CDCTRANSACTIONID": 384368, "CDCTIMESTAMP": "2018-04-05 15:43:15:478 - 04:00", "CDCCHANGESEQ": 19, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 59, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Francisco", "State": "CA", "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 384368, "CDCTIMESTAMP": "2018-04-05 15:43:15:478 - 04:00", "CDCCHANGESEQ": 19, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 59, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "Raleigh", "State": "NC", "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 650254, "CDCTIMESTAMP": "2018-04-18 16:19:35:669 - 04:00", "CDCCHANGESEQ": 20, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 59, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Diego", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}

Can anyone help me on how to convert this to dataframe and store it temporarily in a table?

edit:

enter image description here

Upvotes: 2

Views: 7601

Answers (2)

lokesh
lokesh

Reputation: 37

Pyspark

Json Data:

{"timestamp": "1571053218000","t1": "55.23","t2": "10","t3": "ON"}

{"timestamp": "1571053278000","t1": "63.23","t2": "11","t3": "OFF"}

{"timestamp": "1571053338000","t1": "73.23","t2": "12","t3": "ON"}

{"timestamp": "1571053398000","t1": "83.23","t2": "13","t3": "ON"}

Pyspark Code to read from above json data:

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.types import IntegerType, LongType, DecimalType,StructType, StructField, StringType
from pyspark.sql import Row
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql import Window

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
ssc = StreamingContext(sc, 5)

stream_data = ssc.textFileStream("/filepath/")

def readMyStream(rdd):
  if not rdd.isEmpty():
    df = spark.read.json(rdd)
    print('Started the Process')
    print('Selection of Columns')
    df = df.select('t1','t2','t3','timestamp').where(col("timestamp").isNotNull())
    df.show()

stream_data.foreachRDD( lambda rdd: readMyStream(rdd) )
ssc.start()
ssc.stop()

Upvotes: 0

koiralo
koiralo

Reputation: 23109

stream contains the RDD for each interval of time, so for each interval time you can convert the rdd to datafarme as

stream.foreachRDD(rddRaw => {
  val rdd = rddRaw.map(_.value.toString) // or rddRaw.map(_._2)
  val df = spark.read.json(rdd)
})

This should give you the dataframe as expected.

Hope this helps!

Upvotes: 6

Related Questions