Reputation: 625
Java 8 and Spark 2.11:2.3.2 here. Although I would greatly prefer Java API answers, I do speak a wee bit of Scala so I will be able to understand any answers provided in it! But Java if at all possible (please)!
I have two datasets with different schema, with the exception of a common "model_number
" (string) column: that exists on both.
For each row in my first Dataset (we'll call that d1
), I need to scan/search the second Dataset ("d2
") to see if there is a row with the same model_number
, and if so, update another d2
column.
Here are my Dataset schemas:
d1
===========
model_number : string
desc : string
fizz : string
buzz : date
d2
===========
model_number : string
price : double
source : string
So again, if a d1
row has a model_number
of , say, 12345, and a d2
row also has the same model_number
, I want to update the d2.price
by multiplying it by 10.0
.
My best attempt thus far:
// I *think* this would give me a 3rd dataset with all d1 and d2 columns, but only
// containing rows from d1 and d2 that have matching 'model_number' values
Dataset<Row> d3 = d1.join(d2, d1.col("model_number") == d2.col("model_number"));
// now I just need to update d2.price based on matching
Dataset<Row> d4 = d3.withColumn("adjusted_price", d3.col("price") * 10.0);
Can anyone help me cross the finish line here? Thanks in advance!
Upvotes: 0
Views: 2632
Reputation: 770
Some points here, as @VamsiPrabhala mentioned in the comment, the function that you need to use is join
on your specific fields. Regarding the "update
", you need to take in mind that df
, ds
and rdd
in spark
are immutable, so you can not update
them. So, the solution here is, after join
your df
's, you need to perform your calculation, in this case multiplication, in a select
or using withColumn
and then select
. In other words, you can not update the column, but you can create the new df
with the "new
" column.
Example:
Input data:
+------------+------+------+----+
|model_number| desc| fizz|buzz|
+------------+------+------+----+
| model_a|desc_a|fizz_a|null|
| model_b|desc_b|fizz_b|null|
+------------+------+------+----+
+------------+-----+--------+
|model_number|price| source|
+------------+-----+--------+
| model_a| 10.0|source_a|
| model_b| 20.0|source_b|
+------------+-----+--------+
using join
will output:
val joinedDF = d1.join(d2, "model_number")
joinedDF.show()
+------------+------+------+----+-----+--------+
|model_number| desc| fizz|buzz|price| source|
+------------+------+------+----+-----+--------+
| model_a|desc_a|fizz_a|null| 10.0|source_a|
| model_b|desc_b|fizz_b|null| 20.0|source_b|
+------------+------+------+----+-----+--------+
applying your calculation:
joinedDF.withColumn("price", col("price") * 10).show()
output:
+------------+------+------+----+-----+--------+
|model_number| desc| fizz|buzz|price| source|
+------------+------+------+----+-----+--------+
| model_a|desc_a|fizz_a|null| 100.0|source_a|
| model_b|desc_b|fizz_b|null| 200.0|source_b|
+------------+------+------+----+-----+--------+
Upvotes: 2