Reputation: 79
I am working on a spark using Java, where I will download data from api and compare with mongodb data, while the downloaded json have 15-20 fields but database have 300 fields.
Now my task is to compare the downloaded jsons to mongodb data, and get whatever fields changed with past data.
Downloaded data from API
StudentId,Name,Phone,Email
1,tony,123,[email protected]
2,stark,456,[email protected]
3,spidy,789,[email protected]
Mongodb data
StudentId,Name,Phone,Email,State,City
1,tony,1234,[email protected],NY,Nowhere
2,stark,456,[email protected],NY,Nowhere
3,spidy,789,[email protected],OH,Nowhere
I can't use the except, because of column length.
Expected output
StudentId,Name,Phone,Email,Past_Phone,Past_Email
1,tony,1234,[email protected],1234, //phone number only changed
2,stark,456,[email protected],,[email protected] //Email only changed
3,spidy,789,[email protected],,
Upvotes: 0
Views: 132
Reputation: 25
We have :
df1.show
+-----------+------+-------+-------+
|StudentId_1|Name_1|Phone_1|Email_1|
+-----------+------+-------+-------+
| 1| tony| 123|[email protected]|
| 2| stark| 456|[email protected]|
| 3| spidy| 789|[email protected]|
+-----------+------+-------+-------+
df2.show
+-----------+------+-------+--------+-------+-------+
|StudentId_2|Name_2|Phone_2| Email_2|State_2| City_2|
+-----------+------+-------+--------+-------+-------+
| 1| tony| 1234| [email protected]| NY|Nowhere|
| 2| stark| 456|[email protected]| NY|Nowhere|
| 3| spidy| 789| [email protected]| OH|Nowhere|
+-----------+------+-------+--------+-------+-------+
After Join :
var jn = df2.join(df1,df1("StudentId_1")===df2("StudentId_2"))
Then
var ans = jn.withColumn("Past_Phone", when(jn("Phone_2").notEqual(jn("Phone_1")),jn("Phone_1")).otherwise("")).withColumn("Past_Email", when(jn("Email_2").notEqual(jn("Email_1")),jn("Email_1")).otherwise(""))
Reference : Spark: Add column to dataframe conditionally
Next :
ans.select(ans("StudentId_2") as "StudentId",ans("Name_2") as "Name",ans("Phone_2") as "Phone",ans("Email_2") as "Email",ans("Past_Email"),ans("Past_Phone")).show
+---------+-----+-----+--------+----------+----------+
|StudentId| Name|Phone| Email|Past_Email|Past_Phone|
+---------+-----+-----+--------+----------+----------+
| 1| tony| 1234| [email protected]| | 123|
| 2|stark| 456|[email protected]| [email protected]| |
| 3|spidy| 789| [email protected]| | |
+---------+-----+-----+--------+----------+----------+
Upvotes: 0
Reputation: 663
Consider your data is in 2 dataframes. We can create temporary views for them, as shown below,
api_df.createOrReplaceTempView("api_data")
mongo_df.createOrReplaceTempView("mongo_data")
Next we can use Spark SQL. Here, we join both these views using the StudentId
column and then use a case statement on top of them to compute the past phone number and email.
spark.sql("""
select a.*
, case when a.Phone = b.Phone then '' else b.Phone end as Past_phone
, case when a.Email = b.Email then '' else b.Email end as Past_Email
from api_data a
join mongo_data b
on a.StudentId = b.StudentId
order by a.StudentId""").show()
Output:
+---------+-----+-----+-------+----------+----------+
|StudentId| Name|Phone| Email|Past_phone|Past_Email|
+---------+-----+-----+-------+----------+----------+
| 1| tony| 123|[email protected]| 1234| |
| 2|stark| 456|[email protected]| | [email protected]|
| 3|spidy| 789|[email protected]| | |
+---------+-----+-----+-------+----------+----------+
Upvotes: 1
Reputation: 470
Please find the below same source code. Here I am taking the only phone number condition as an example.
val list = List((1,"tony",123,"[email protected]"), (2,"stark",456,"[email protected]")
(3,"spidy",789,"[email protected]"))
val df1 = list.toDF("StudentId","Name","Phone","Email")
.select('StudentId as "StudentId_1", 'Name as "Name_1",'Phone as "Phone_1",
'Email as "Email_1")
df1.show()
val list1 = List((1,"tony",1234,"[email protected]","NY","Nowhere"),
(2,"stark",456,"[email protected]", "NY", "Nowhere"),
(3,"spidy",789,"[email protected]","OH","Nowhere"))
val df2 = list1.toDF("StudentId","Name","Phone","Email","State","City")
.select('StudentId as "StudentId_2", 'Name as "Name_2", 'Phone as "Phone_2",
'Email as "Email_2", 'State as "State_2", 'City as "City_2")
df2.show()
val df3 = df1.join(df2, df1("StudentId_1") ===
df2("StudentId_2")).where(df1("Phone_1") =!= df2("Phone_2"))
df3.withColumnRenamed("Phone_1", "Past_Phone").show()
+-----------+------+-------+-------+
|StudentId_1|Name_1|Phone_1|Email_1|
+-----------+------+-------+-------+
| 1| tony| 123|[email protected]|
| 2| stark| 456|[email protected]|
| 3| spidy| 789|[email protected]|
+-----------+------+-------+-------+
+-----------+------+-------+--------+-------+-------+
|StudentId_2|Name_2|Phone_2| Email_2|State_2| City_2|
+-----------+------+-------+--------+-------+-------+
| 1| tony| 1234| [email protected]| NY|Nowhere|
| 2| stark| 456|[email protected]| NY|Nowhere|
| 3| spidy| 789| [email protected]| OH|Nowhere|
+-----------+------+-------+--------+-------+-------+
+-----------+------+----------+-------+-----------+------+-------+-------+-------+-------+
|StudentId_1|Name_1|Past_Phone|Email_1|StudentId_2|Name_2|Phone_2|Email_2|State_2| City_2|
+-----------+------+----------+-------+-----------+------+-------+-------+-------+-------+
| 1| tony| 123|[email protected]| 1| tony| 1234|[email protected]| NY|Nowhere|
+-----------+------+----------+-------+-----------+------+-------+-------+-------+-------+
Upvotes: 0