shankar
shankar

Reputation: 225

From the following code how to convert a JavaRDD<Integer> to DataFrame or DataSet

public static void main(String[] args) {
        SparkSession sessn = SparkSession.builder().appName("RDD2DF").master("local").getOrCreate();
        List<Integer> lst = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20);
        Dataset<Integer> DF = sessn.createDataset(lst, Encoders.INT());
        System.out.println(DF.javaRDD().getNumPartitions());
        JavaRDD<Integer> mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator());

}

From the above code, i am unable to convert the JavaRdd (mappartRdd) to DataFrame in Java Spark. I am using the below to convert JavaRdd to DataFrame/DataSet.

sessn.createDataFrame(mappartRdd, beanClass);

I tried multiple options and different overloaded functions for createDataFrame. I am facing issues to convert it to DF. what is the beanclass I need to provide for the code to work?

Unlike scala, there is no function like toDF() to convert the RDD to DataFrame in Java. can someone assist to convert it as per my requirement.

Note: I am able to create a Dataset directly by modifying the above code as below.

Dataset<Integer> mappartDS = DF.repartition(3).mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator(), Encoders.INT());

But I want to know why my JavaRdd is not getting converted to DF/DS if i use createDataFrame. Any help will be greatly appreciated.

Upvotes: 0

Views: 457

Answers (1)

Som
Som

Reputation: 6338

This seems to be follow up of this SO Question

I think, you are in learning stage of spark. I would suggest to understand the apis for java provided - https://spark.apache.org/docs/latest/api/java/index.html

Regarding your question, if you check the createDataFrame api, it is as follows-

 def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
...
}

As you see, it takes JavaRDD[Row] and related StructType schema as args. Hence to create DataFrame which is equal to Dataset<Row> use below snippet-

JavaRDD<Integer> mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator());

  StructType schema = new StructType()
                .add(new StructField("value", DataTypes.IntegerType, true, Metadata.empty()));
        Dataset<Row> df = spark.createDataFrame(mappartRdd.map(RowFactory::create), schema);
        df.show(false);
        df.printSchema();

        /**
         * +-----+
         * |value|
         * +-----+
         * |6    |
         * |8    |
         * |6    |
         * +-----+
         *
         * root
         *  |-- value: integer (nullable = true)
         */

Upvotes: 1

Related Questions