Reputation: 737
I have two dataframes, in which I want to know whether they are different based on column as a key otherwise will update the one that is different so they become equal
val TMP_SITE = spark.load("jdbc", Map("url" -> "jdbc:oracle:thin:System/maher@//localhost:1521/XE", "dbtable" -> "IPTECH.TMP_SITE"))
.withColumn("SITE",'SITE.cast(LongType))
val local_pos = spark.load("jdbc", Map("url" -> url, "dbtable" -> "pos")).select("id","name")
TMP_SITE.printSchema()
local_pos.printSchema()
val join = TMP_SITE.join(local_pos, 'SITE === 'id, "inner")
root
|-- SITE: long (nullable = true)
|-- LIBELLE: string (nullable = false)
root
|-- id: long (nullable = false)
|-- name: string (nullable = true)
the result of joining is
|id |name |SITE|LIBELLE |
+---+----------------------+----+----------------------+
|51 |Ezzahra |51 |Ezzahra |
|7 |BENIKHALLED |7 |BENIKHALLED |
|15 |Kram |15 |Kram |
|54 |El Mourouj |54 |El Mourouj |
|11 |LE BARDO |11 |LE BARDO |
|29 |Mini M Ksar said |29 |Mini M Ksar said |
|69 |ZAGHOUAN |69 |ZAGHOUAN |
|42 |BEB EL KHADHRA |42 |BEB EL KHADHRA |
|73 |Zaouit Kontech |73 |Zaouit Kontech |
|87 |Aouina |87 |Aouina |
|64 |Sousse I I |64 |Sousse I I |
|3 |SAHRA CONFORT : KORBA |3 |SAHRA CONFORT : KORBA |
|34 |SOUKRA SQUARE |34 |SOUKRA SQUARE |
|59 |SAHRA CONFORT : ZARZIS|59 |SAHRA CONFORT : ZARZIS|
|8 |Jerba |8 |Jerba |
|22 |Moknine |22 |Moknine |
|28 |RDAYEF |28 |RDAYEF |
|85 |MONASTIR ABSORBA |85 |MONASTIR ABSORBA |
|16 |BARDO HANAYA |16 |BARDO HANAYA |
|35 |Mini M Agba |35 |Mini M Agba |
+---+----------------------+----+----------------------+
I did this
val temp = join.withColumn("changes", when($"LIBELLE" === $"name", lit("nothing")).otherwise("need an update"))
I got this
|id |name |SITE|LIBELLE |changes |
+---+----------------------+----+----------------------+--------------+
|51 |Ezzahra |51 |Ezzahra |nothing |
|7 |BENIKHALLED |7 |BENIKHALLED |nothing |
|15 |Kram |15 |Kram |nothing |
|54 |El Mourouj |54 |El Mourouj |nothing |
|11 |LE BARDO |11 |LE BARDO |nothing |
|29 |Mini M Ksar said |29 |Mini M Ksar said |nothing |
|69 |ZAGHOUAN |69 |ZAGHOUAN |nothing |
|42 |BEB EL KHADHRA |42 |BEB EL KHADHRA |nothing |
|73 |Zaouit Kontech |73 |Zaouit Kontech |need an update|
|87 |Aouina |87 |Aouina |nothing |
|64 |Sousse I I |64 |Sousse I I |nothing |
|3 |SAHRA CONFORT : KORBA |3 |SAHRA CONFORT : KORBA |nothing |
|34 |SOUKRA SQUARE |34 |SOUKRA SQUARE |nothing |
|59 |SAHRA CONFORT : ZARZIS|59 |SAHRA CONFORT : ZARZIS|nothing |
|8 |Jerba |8 |Jerba |nothing |
|22 |Moknine |22 |Moknine |need an update|
|28 |RDAYEF |28 |RDAYEF |nothing |
|85 |MONASTIR ABSORBA |85 |MONASTIR ABSORBA |nothing |
|16 |BARDO HANAYA |16 |BARDO HANAYA |nothing |
|35 |Mini M Agba |35 |Mini M Agba |nothing |
+---+----------------------+----+----------------------+--------------+
I do not why it said that they need to be updated because they are the same. Although it should say nothing for all of them, because they are equal
Upvotes: 0
Views: 925
Reputation: 41957
Once you have a dataframe
, its quite easy to play with the columns
and rows
.
So you have following dataframe
after your join
+----+---------------------+----+---------------------+
|SITE|LIBELLE |id |name |
+----+---------------------+----+---------------------+
|48 |Mini M Boumhel |48 |Mini M Boumhel |
|67 |Lac |67 |Lac |
|992 |test2 |992 |test |
|44 |KAIROUAN |44 |KAIROUAN |
|61 |Tunis |61 |Tunis |
|9001|MONOPRIX |9001|MONOPRIX |
|3 |SAHRA CONFORT : KORBA|3 |SAHRA CONFORT : KORBA|
|37 |Mini M Borj Lozir |37 |Mini M Borj Lozir |
|83 |Jendouba |83 |Jendouba |
|12 |Bigro |12 |Bigro |
+----+---------------------+----+---------------------+
You can create another column with the logic you have written but by using when
function as
import org.apache.spark.sql.functions._
val temp = join.withColumn("changes", when($"LIBELLE" === $"name", lit("nothing")).otherwise("need an update"))
temp
dataframe
would be
+----+---------------------+----+---------------------+--------------+
|SITE|LIBELLE |id |name |changes |
+----+---------------------+----+---------------------+--------------+
|48 |Mini M Boumhel |48 |Mini M Boumhel |nothing |
|67 |Lac |67 |Lac |nothing |
|992 |test2 |992 |test |need an update|
|44 |KAIROUAN |44 |KAIROUAN |nothing |
|61 |Tunis |61 |Tunis |nothing |
|9001|MONOPRIX |9001|MONOPRIX |nothing |
|3 |SAHRA CONFORT : KORBA|3 |SAHRA CONFORT : KORBA|nothing |
|37 |Mini M Borj Lozir |37 |Mini M Borj Lozir |nothing |
|83 |Jendouba |83 |Jendouba |nothing |
|12 |Bigro |12 |Bigro |nothing |
+----+---------------------+----+---------------------+--------------+
now you can just use filter
method on dataframe
as
temp.filter($"changes" === "need an update").show(false)
which should give you
+----+-------+---+----+--------------+
|SITE|LIBELLE|id |name|changes |
+----+-------+---+----+--------------+
|992 |test2 |992|test|need an update|
+----+-------+---+----+--------------+
You just need to play with the columns by using select
, groupBy
, aggregations
, filters
and other inbuilt functions or by using udf
functions etc etc. You can even convert into rdd
and tuples
too as you did in your example.
Upvotes: 1