Reputation: 2517
I would like to create a JSON from a Spark v.1.6 (using scala) dataframe. I know that there is the simple solution of doing df.toJSON
.
However, my problem looks a bit different. Consider for instance a dataframe with the following columns:
| A | B | C1 | C2 | C3 |
-------------------------------------------
| 1 | test | ab | 22 | TRUE |
| 2 | mytest | gh | 17 | FALSE |
I would like to have at the end a dataframe with
| A | B | C |
----------------------------------------------------------------
| 1 | test | { "c1" : "ab", "c2" : 22, "c3" : TRUE } |
| 2 | mytest | { "c1" : "gh", "c2" : 17, "c3" : FALSE } |
where C is a JSON containing C1
, C2
, C3
. Unfortunately, I at compile time I do not know what the dataframe looks like (except the columns A
and B
that are always "fixed").
As for the reason why I need this: I am using Protobuf for sending around the results. Unfortunately, my dataframe sometimes has more columns than expected and I would still send those via Protobuf, but I do not want to specify all columns in the definition.
How can I achieve this?
Upvotes: 21
Views: 47832
Reputation: 530
I use this command to solve the to_json problem:
output_df = (df.select(to_json(struct(col("*"))).alias("content")))
Upvotes: 14
Reputation: 1565
Spark 2.1 should have native support for this use case (see #15354).
import org.apache.spark.sql.functions.to_json
df.select(to_json(struct($"c1", $"c2", $"c3")))
Upvotes: 28
Reputation: 13927
Here, no JSON parser, and it adapts to your schema:
import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}
df.select(
col(df.columns(0)),
col(df.columns(1)),
concat(
lit("{"),
concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => {
val c = dt._1;
val t = dt._2;
concat(
lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "") ),
col(c),
lit(if(t=="StringType") "\""; else "")
)
}):_*),
lit("}")
) as "C"
).collect()
Upvotes: 7
Reputation: 330413
First lets convert C's to a struct
:
val dfStruct = df.select($"A", $"B", struct($"C1", $"C2", $"C3").alias("C"))
This is structure can be converted to JSONL using toJSON
as before:
dfStruct.toJSON.collect
// Array[String] = Array(
// {"A":1,"B":"test","C":{"C1":"ab","C2":22,"C3":true}},
// {"A":2,"B":"mytest","C":{"C1":"gh","C2":17,"C3":false}})
I am not aware of any built-in method that can convert a single column but you can either convert it individually and join
or use your favorite JSON parser in an UDF.
case class C(C1: String, C2: Int, C3: Boolean)
object CJsonizer {
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.write
implicit val formats = Serialization.formats(org.json4s.NoTypeHints)
def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3))
}
val cToJSON = udf((c1: String, c2: Int, c3: Boolean) =>
CJsonizer.toJSON(c1, c2, c3))
df.withColumn("c_json", cToJSON($"C1", $"C2", $"C3"))
Upvotes: 5