Lobsterrrr
Lobsterrrr

Reputation: 335

How to convert RDD to DataFrame in Spark Streaming, not just Spark

How can I convert RDD to DataFrame in Spark Streaming, not just Spark?

I saw this example, but it requires SparkContext.

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
rdd.toDF()

In my case I have StreamingContext. Should I then create SparkContext inside foreach? It looks too crazy... So, how to deal with this issue? My final goal (if it might be useful) is to save the DataFrame in Amazon S3 using rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json");, which is not possible for RDD without converting it to DataFrame (as I know).

myDstream.foreachRDD { rdd =>
    val conf = new SparkConf().setMaster("local").setAppName("My App")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._
    rdd.toDF()
}

Upvotes: 6

Views: 10378

Answers (2)

Mor Shemesh
Mor Shemesh

Reputation: 2899

Look at the following answer which contains a scala magic cell inside a python notebook: How to convert Spark Streaming data into Spark DataFrame

Upvotes: 0

Shankar
Shankar

Reputation: 8967

Create sqlContext outside foreachRDD ,Once you convert the rdd to DF using sqlContext, you can write into S3.

For example:

val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
myDstream.foreachRDD { rdd =>

    val df = rdd.toDF()
    df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")
}

Update:

Even you can create sqlContext inside foreachRDD which is going to execute on Driver.

Upvotes: 2

Related Questions