Reputation: 33
I want to do the following:
I have a DataFrame with two columns (ID,timestamp), both string, that I would like to convert to another format, a String type, or an Array[String] or an Array[Row], I don't really mind but I'd like to convert it back to DF.
I've tried different things:
val aux= df.collect().map { row => row.toString() }
val distDataRDD = sc.parallelize(aux).toDF().show()
But I just have one column named "value" that is a concatenation of the two columns from the original DF
I've also tried:
val aux= df.collect().map { row => row.toString() }
val distDataRDD = sc.parallelize(aux).toDF("Id","timestamp").show()
but I get the following error:
Exception in thread "main" java.lang.IllegalArgumentException:
requirement failed: The number of columns doesn't match.
Old column names (1): value
New column names (2): Id, timestamp
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.sql.Dataset.toDF(Dataset.scala:448)
at org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:44)
at example.tests$.main(tests.scala:60)
at example.tests.main(tests.scala)
Any Idea? thank you
Upvotes: 0
Views: 721
Reputation: 51
You can use spark built-in functions to do that.
val df = Seq(
("a", "1"),
("b", "2")
).toDF()
df
.select(org.apache.spark.sql.functions.array($"_1", $"_2"))
.show()
Here, the array
function put the value in column "_1" and the value in column "_2" in an array for each row.
You can have a look at spark SQL functions to find the one that matches your need. If you don't find one, you'll probably have to make an user-defined function (UDF) but it's not advised as built in functions are more optimized.
Upvotes: 0
Reputation: 2451
You can achieve it by transforming your Array[String]
to RDD[Row]
and create DF using .createDataFrame
with provided schema. Another option is to create DF using .toDF
then split values and select them as separate cols.
import spark.implicits._
import org.apache.spark.sql.functions._
val df = Seq(("1","2")).toDF()
df.show()
val aux = df.collect().map { row => row.mkString(",") }
val aux2 = aux.map(s=>RowFactory.create(s.split(","):_*))
val schema = new (StructType)
.add("id","string")
.add("timestamp","string")
val df2 = spark.createDataFrame(sc.parallelize(aux2),schema)
df2.show()
// another option
aux.toSeq.toDF("val")
.select(split('val,",").as("arr"))
.select('arr.getItem(0).as("id"),'arr.getItem(1).as("timestamp"))
.show()
+---+---+
| _1| _2|
+---+---+
| 1| 2|
+---+---+
+---+---------+
| id|timestamp|
+---+---------+
| 1| 2|
+---+---------+
+---+---------+
| id|timestamp|
+---+---------+
| 1| 2|
+---+---------+
Upvotes: 1