Navin Ahmed
Navin Ahmed

Reputation: 65

Convert JavaDStream<String> to JavaRDD<String>

I have a JavaDStream which gets the data from an external source. I'm trying to integrate Spark Streaming and SparkSQL. It's known that JavaDStream is made up of JavaRDD's . And i can only apply the function applySchema() when I have a JavaRDD. Please help me to convert it to a JavaRDD. I know there are functions in scala, and its much easier. But help me out in Java.

Upvotes: 4

Views: 8830

Answers (3)

AP-Big Data
AP-Big Data

Reputation: 302

I hope this helps to covert JavaDstream to JavaRDD!

    JavaDStream<String> lines = stream.map(ConsumerRecord::value);

    //Create JavaRDD<Row>
    lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
        @Override
        public void call(JavaRDD<String> rdd) {
            JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
                @Override
                public Row call(String msg) {
                    Row row = RowFactory.create(msg);
                    return row;
                }
            });
            //Create Schema
            StructType schema = DataTypes.createStructType(new StructField[] {
                    DataTypes.createStructField("value", DataTypes.StringType, true)});
            //Get Spark 2.0 session
            SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
            Dataset msgDataFrame = spark.createDataFrame(rowRDD, schema);
            msgDataFrame.show();

Upvotes: 0

amey91
amey91

Reputation: 552

You have to first access all the RDDs inside the DStream using forEachRDD as:

javaDStream.foreachRDD( rdd => {
    rdd.collect.foreach({
        ...
    })
})

Upvotes: 1

maasg
maasg

Reputation: 37435

You can't transform a DStream into an RDD. As you mention, a DStream contains RDDs. The way to get access to the RDDs is by applying a function to each RDD of the DStream using foreachRDD. See the docs: https://spark.apache.org/docs/1.1.0/api/java/org/apache/spark/streaming/api/java/JavaDStreamLike.html#foreachRDD(org.apache.spark.api.java.function.Function2)

Upvotes: 5

Related Questions