Reputation: 19675
I am brand new to spark (hours) and additionally rather inexperienced with Scala. However, I have long standing desire to become more familiar with both.
I have a rather trivial taks. I have two dataframes that I am importing from two JSON files. One with an uuid,text,tag_ids
and the other with the tags id,term
I would like to produce a new json file that I can import into solr
that contains the uuid,text,tag_ids,tag_terms.
val text = spark.sqlContext.jsonFile("/tmp/text.js")
val tags = spark.sqlContext.jsonFile("/tmp/tags.js")
text.printSchema()
root
| -- uuid: string (nullable = true)
| -- tag_ids: array (nullable = true)
| | -- element: string (contiansNull = true)
| -- text: string (nullable = true)
tags.printSchema()
root
| -- id: string (nullable = true)
| -- term: string (nullable = true)
#desired output
+--------------------+------+---------+------------+
| uuid| text | tag_ids | tag_terms|
+--------------------+------+---------+------------+
|cf5c1f4c-96e6-4ca...| foo | [1,2]| [tag1,tag2]|
|c9834e2e-0f04-486...| bar | [2,3]| [tag2,tag3]|
+--------------------+--------------+--------------+
It is difficult to show all I have been trying. Essentially .join()
is having issues with tag_ids being an array. I can explode()
tag_ids
and join on tag_terms
but reassembling it into a new df to export is still beyond my level.
Upvotes: 3
Views: 1826
Reputation: 11489
Try this :
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import spark.implicits._
val text = spark.sqlContext.jsonFile("/tmp/text.js")
val tags = spark.sqlContext.jsonFile("/tmp/tags.js")
val df1 = spark.sparkContext.parallelize(text, 4).toDF()
val df2 = spark.sparkContext.parallelize(tags, 4).toDF()
df1.createOrReplaceTempView("A")
df2.createOrReplaceTempView("B")
spark.sql("select d1.key,d1.value,d2.value1 from A d1 inner join B d2 on d1.key = d2.key").show()
Upvotes: 1
Reputation: 37852
Solution using explode
:
val result = text
.withColumn("tag_id", explode($"tag_ids"))
.join(tags, $"tag_id" === $"id")
.groupBy("uuid", "tag_ids")
.agg(first("text") as "text", collect_list("term") as "tag_terms")
Upvotes: 4