Reputation: 107
I have two dataframes which has different types of columns. I need to join those two different dataframe. Please refer the below example
val df1 has
Customer_name
Customer_phone
Customer_age
val df2 has
Order_name
Order_ID
These two dataframe doesn't have any common column. Number of rows and Number of columns in the two dataframes also differs. I tried to insert a new dummy column to increase the row_index value as below val dfr=df1.withColumn("row_index",monotonically_increasing_id()).
But as i am using Spark 2, monotonically_increasing_id method is not supported. Is there any way to join two dataframe, so that I can create the value of two dataframe in a single sheet of excel file.
For example
val df1:
Customer_name Customer_phone Customer_age
karti 9685684551 24
raja 8595456552 22
val df2:
Order_name Order_ID
watch 1
cattoy 2
My final excel sheet should be like this:
Customer_name Customer_phone Customer_age Order_name Order_ID
karti 9685684551 24 watch 1
raja 8595456552 22 cattoy 2
Upvotes: 6
Views: 18859
Reputation: 91
I had a similar issue but I'm on Databricks, so I used Python/PySpark. In case anyone has the same question in a Python environment, this did the trick for me:
from pyspark.sql.window import Window
from pyspark.sql.functions import lit, row_number
w = Window().orderBy(lit(None))
df1 = df1.withColumn('row_num', row_number().over(w))
df2 = df2.withColumn('row_num', row_number().over(w))
df_merged = df1.join(df2, 'row_num').drop('row_num')
Upvotes: 3
Reputation: 672
add an index column to both dataframe using the below code
df1.withColumn("id1",monotonicallyIncreasingId)
df2.withColumn("id2",monotonicallyIncreasingId)
then join both the dataframes using the below code and drop the index column
df1.join(df2,col("id1")===col("id2"),"inner")
.drop("id1","id2")
Upvotes: 10
Reputation: 23119
monotonically_increasing_id()
is increasing and unique but not consecutive.
You can use zipWithIndex
by converting to rdd
and reconstructing Dataframe with the same schema for both dataframe
.
import spark.implicits._
val df1 = Seq(
("karti", "9685684551", 24),
("raja", "8595456552", 22)
).toDF("Customer_name", "Customer_phone", "Customer_age")
val df2 = Seq(
("watch", 1),
("cattoy", 2)
).toDF("Order_name", "Order_ID")
val df11 = spark.sqlContext.createDataFrame(
df1.rdd.zipWithIndex.map {
case (row, index) => Row.fromSeq(row.toSeq :+ index)
},
// Create schema for index column
StructType(df1.schema.fields :+ StructField("index", LongType, false))
)
val df22 = spark.sqlContext.createDataFrame(
df2.rdd.zipWithIndex.map {
case (row, index) => Row.fromSeq(row.toSeq :+ index)
},
// Create schema for index column
StructType(df2.schema.fields :+ StructField("index", LongType, false))
)
Now join the final dataframes
df11.join(df22, Seq("index")).drop("index")
Output:
+-------------+--------------+------------+----------+--------+
|Customer_name|Customer_phone|Customer_age|Order_name|Order_ID|
+-------------+--------------+------------+----------+--------+
|karti |9685684551 |24 |watch |1 |
|raja |8595456552 |22 |cattoy |2 |
+-------------+--------------+------------+----------+--------+
Upvotes: 7