Reputation: 30197
I'm trying to convert a val rec: RDD[Map[String, String]]
into a Spark dataframe.
But when I execute:
val sqlContext = new SQLContext(sc)
val df = sqlContext.createDataFrame(rec, classOf[Map[String, String]])
df.write.json("/tmp/file.json")
The file json is full of empty objects:
{}
{}
{}
{}
{}
I'm converting it to json just because I want save the rec
val and reuse it later with SQLContext object in python.
So the question is how to save my RDD[HashMap[String, String]]
created in Scala and reuse later in Python?
UPDATE
rec
val contains
Map(Param_timestamp -> 2017-03-28T02:00:02.887, Param_querytype -> listing, Param_slug -> /salute-beauty-fitness/bellezza-cura-del-corpo/cosmesi/makeup, Param_br -> CAUDALIE)
df.show()
returns:
++
||
++
||
... all the 20 lines are the alike "||"
||
++
only showing top 20 rows
Upvotes: 2
Views: 1231
Reputation: 4818
As long as you master your schema, you can recreate it through the use of StructField and StructType, the doc explains it pretty well I believe. As for scala, I'm not exactly familiar with it but a small example in Java can perhaps help (I'll convert it to Scala later when I have more time):
JavaSparkContext jsc = new JavaSparkContext(
new SparkConf().setAppName("test").setMaster("local[*]"));
jsc.setLogLevel("ERROR");
System.setProperty("hadoop.home.dir", "C:\\Temp\\tt\\Tools");
List<Tuple2<String, String>> test = new ArrayList<Tuple2<String, String>>();
Tuple2<String, String> tt = new Tuple2<String, String>("key", "val1");
test.add(tt);
tt = new Tuple2<String, String>("key", "val2");
test.add(tt);
tt = new Tuple2<String, String>("key2", "val");
test.add(tt);
JavaPairRDD<String, String> testRDD = jsc.parallelizePairs(test);
System.out.println(testRDD.first());
SparkContext sc = JavaSparkContext.toSparkContext(jsc);
SparkSession ss = new SparkSession(sc);
StructField[] fields = {
DataTypes.createStructField("key", DataTypes.StringType, false),
DataTypes.createStructField("val", DataTypes.StringType, false) };
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> testRowRDD = testRDD.map(line -> RowFactory.create(line._1, line._2));
Dataset<Row> myDF = ss.createDataFrame(testRowRDD, schema);
myDF.show();
myDF.write().json("test.json");
jsc.close();
The output is several Json files, containing each one line like this:
{"key":"key2","val":"val"}
Upvotes: 0