user2038119
user2038119

Reputation: 61

How to join two dataframes?

I cannot get Sparks DataFrame join to work (no result gets produced). Here is my code:

val e = Seq((1, 2), (1, 3), (2, 4))
var edges = e.map(p => Edge(p._1, p._2)).toDF()
var filtered = edges.filter("start = 1").distinct()
println("filtered")
filtered.show()
filtered.printSchema()
println("edges")
edges.show()
edges.printSchema()
var joined = filtered.join(edges, filtered("end") === edges("start"))//.select(filtered("start"), edges("end"))
println("joined")
joined.show()

It requires case class Edge(start: Int, end: Int) to be defined at top level. Here is the output it produces:

filtered
+-----+---+
|start|end|
+-----+---+
|    1|  2|
|    1|  3|
+-----+---+

root
 |-- start: integer (nullable = false)
 |-- end: integer (nullable = false)

edges
+-----+---+
|start|end|
+-----+---+
|    1|  2|
|    1|  3|
|    2|  4|
+-----+---+

root
 |-- start: integer (nullable = false)
 |-- end: integer (nullable = false)

joined
+-----+---+-----+---+
|start|end|start|end|
+-----+---+-----+---+
+-----+---+-----+---+

I don't understand why the output is empty. Why isn't the first row of filtered get combined with the last row of edges?

Upvotes: 0

Views: 1532

Answers (1)

Michael Lloyd Lee mlk
Michael Lloyd Lee mlk

Reputation: 14661

val f2 = filtered.withColumnRenamed("start", "fStart").withColumnRenamed("end", "fEnd")
f2.join(edges, f2("fEnd") === edges("start")).show

I believe this is because filtered("start").equals(edges("start")), that is as filtered is a filtered view on edges and they share the column definitions. The columns are the same so Spark does not understand which you are referencing.

As such you can do things like

edges.select(filtered("start")).show

Upvotes: 1

Related Questions