freedev
freedev

Reputation: 30197

Convert RDD[Map[String, String]] to Spark dataframe

I'm trying to convert a val rec: RDD[Map[String, String]] into a Spark dataframe.

But when I execute:

val sqlContext = new SQLContext(sc)
val df = sqlContext.createDataFrame(rec, classOf[Map[String, String]])

df.write.json("/tmp/file.json") 

The file json is full of empty objects:

{}
{}
{}
{}
{}

I'm converting it to json just because I want save the rec val and reuse it later with SQLContext object in python.

So the question is how to save my RDD[HashMap[String, String]] created in Scala and reuse later in Python?

UPDATE

rec val contains

Map(Param_timestamp -> 2017-03-28T02:00:02.887, Param_querytype -> listing, Param_slug -> /salute-beauty-fitness/bellezza-cura-del-corpo/cosmesi/makeup, Param_br -> CAUDALIE)

df.show() returns:

++
||
++
||
... all the 20 lines are the alike "||"
||
++
only showing top 20 rows

Upvotes: 2

Views: 1231

Answers (1)

Adonis
Adonis

Reputation: 4818

As long as you master your schema, you can recreate it through the use of StructField and StructType, the doc explains it pretty well I believe. As for scala, I'm not exactly familiar with it but a small example in Java can perhaps help (I'll convert it to Scala later when I have more time):

    JavaSparkContext jsc = new JavaSparkContext(
            new SparkConf().setAppName("test").setMaster("local[*]"));
    jsc.setLogLevel("ERROR");
    System.setProperty("hadoop.home.dir", "C:\\Temp\\tt\\Tools");

    List<Tuple2<String, String>> test = new ArrayList<Tuple2<String, String>>();
    Tuple2<String, String> tt = new Tuple2<String, String>("key", "val1");
    test.add(tt);
    tt = new Tuple2<String, String>("key", "val2");
    test.add(tt);
    tt = new Tuple2<String, String>("key2", "val");
    test.add(tt);

    JavaPairRDD<String, String> testRDD = jsc.parallelizePairs(test);

    System.out.println(testRDD.first());

    SparkContext sc = JavaSparkContext.toSparkContext(jsc);
    SparkSession ss = new SparkSession(sc);
    StructField[] fields = {
            DataTypes.createStructField("key", DataTypes.StringType, false),
            DataTypes.createStructField("val", DataTypes.StringType, false) };
    StructType schema = DataTypes.createStructType(fields);
    JavaRDD<Row> testRowRDD = testRDD.map(line -> RowFactory.create(line._1, line._2));
    Dataset<Row> myDF = ss.createDataFrame(testRowRDD, schema);
    myDF.show();

    myDF.write().json("test.json");

    jsc.close();

The output is several Json files, containing each one line like this:

{"key":"key2","val":"val"}

Upvotes: 0

Related Questions