matchew
matchew

Reputation: 19675

Spark join array

I am brand new to spark (hours) and additionally rather inexperienced with Scala. However, I have long standing desire to become more familiar with both.

I have a rather trivial taks. I have two dataframes that I am importing from two JSON files. One with an uuid,text,tag_ids and the other with the tags id,term I would like to produce a new json file that I can import into solr that contains the uuid,text,tag_ids,tag_terms.

val text = spark.sqlContext.jsonFile("/tmp/text.js")
val tags = spark.sqlContext.jsonFile("/tmp/tags.js")



text.printSchema()

root
| -- uuid: string (nullable = true)
| -- tag_ids: array (nullable = true)
|    | -- element: string (contiansNull = true)
| -- text: string (nullable = true)

tags.printSchema()
root
| -- id: string (nullable = true)
| -- term: string (nullable = true)


#desired output  
+--------------------+------+---------+------------+
|                uuid| text | tag_ids |   tag_terms|  
+--------------------+------+---------+------------+    
|cf5c1f4c-96e6-4ca...| foo  |    [1,2]| [tag1,tag2]|      
|c9834e2e-0f04-486...| bar  |    [2,3]| [tag2,tag3]|   
+--------------------+--------------+--------------+

It is difficult to show all I have been trying. Essentially .join() is having issues with tag_ids being an array. I can explode() tag_ids and join on tag_terms but reassembling it into a new df to export is still beyond my level.

Upvotes: 3

Views: 1826

Answers (2)

vaquar khan
vaquar khan

Reputation: 11489

Try this :

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

import spark.implicits._

val text = spark.sqlContext.jsonFile("/tmp/text.js")
val tags = spark.sqlContext.jsonFile("/tmp/tags.js")

 val df1 = spark.sparkContext.parallelize(text, 4).toDF()
 val df2 = spark.sparkContext.parallelize(tags, 4).toDF()

 df1.createOrReplaceTempView("A")
 df2.createOrReplaceTempView("B")


spark.sql("select d1.key,d1.value,d2.value1  from A d1  inner join B d2 on d1.key = d2.key").show()

Upvotes: 1

Tzach Zohar
Tzach Zohar

Reputation: 37852

Solution using explode:

val result = text
  .withColumn("tag_id", explode($"tag_ids"))
  .join(tags,  $"tag_id" === $"id")
  .groupBy("uuid", "tag_ids")
  .agg(first("text") as "text", collect_list("term") as "tag_terms")

Upvotes: 4

Related Questions