Mithril
Mithril

Reputation: 13768

spark union doesn't work as expect, add new rows

I can't use simple test set to reproduce this problem, it only happen on my dataset. So I can only tell the situation .

df have many distinct store_id,product_id groups, each group has many rows .

df1 have many distinct store_id,product_id groups with just one row or not.

df is order history table, I need get history price from it, and get current price from df1 . Union they to construct a whole time price change line .

But the strange thing is


sid = '00fbb2a6-f2de-42f1-a07b-163e3a050ddb'
pid = '66e06f08-dec2-498d-883f-24771da18358'

filtersp = lambda df: df.filter(col('store_id')==sid).filter(col('product_id')==pid)

filtersp(df).show()

+----------------+--------+----------+-----------+---+
|store_product_id|store_id|product_id|price_guide| ds|
+----------------+--------+----------+-----------+---+
+----------------+--------+----------+-----------+---+

filtersp(df1).show()

+----------------+----------+--------+-----------+---+
|store_product_id|product_id|store_id|price_guide| ds|
+----------------+----------+--------+-----------+---+
+----------------+----------+--------+-----------+---+


filtersp(df1).union(filtersp(df)).show()

+----------------+----------+--------+-----------+---+
|store_product_id|product_id|store_id|price_guide| ds|
+----------------+----------+--------+-----------+---+
+----------------+----------+--------+-----------+---+


filtersp(df1.union(df)).show()

+----------------+----------+--------+-----------+---+
|store_product_id|product_id|store_id|price_guide| ds|
+----------------+----------+--------+-----------+---+
+----------------+----------+--------+-----------+---+

filtersp(df.union(df1)).show()

+--------------------+--------------------+--------------------+-----------+-------------------+
|    store_product_id|            store_id|          product_id|price_guide|                 ds|
+--------------------+--------------------+--------------------+-----------+-------------------+
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|
+--------------------+--------------------+--------------------+-----------+-------------------+


Then I add a new column to track where these rows come from

df = df.withColumn('c', lit('df'))

df1 = df1.withColumn('c', lit('df1'))

filtersp(df.union(df1)).show()

+--------------------+--------------------+--------------------+-----------+-------------------+---+
|    store_product_id|            store_id|          product_id|price_guide|                 ds|  c|
+--------------------+--------------------+--------------------+-----------+-------------------+---+
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
|996864cf-8432-43d...|00fbb2a6-f2de-42f...|66e06f08-dec2-498...|        480|2019-08-06 09:00:00|df1|
+--------------------+--------------------+--------------------+-----------+-------------------+---+

Find the rows come from df1.

I don't understand in what situation filtersp(df.union(df1)).show() would show result, it is impossible .

Upvotes: 3

Views: 2733

Answers (1)

Mithril
Mithril

Reputation: 13768

To knock myself. Though I found the answer https://stackoverflow.com/a/55310670/1637673 :

def unionByName(other: Dataset[T]): Dataset[T]

The difference between this function and union is that this function resolves columns by name (not by position):

val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.union(df2).show

// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// |   1|   2|   3|
// |   4|   5|   6|
// +----+----+----+

But I don't think I have this problem, after some struglling finnally find the columns order are different.

df is

+----------------+--------+----------+-----------+---+---+
|store_product_id|store_id|product_id|price_guide| ds|  c|
+----------------+--------+----------+-----------+---+---+
+----------------+--------+----------+-----------+---+---+

df1 is

+----------------+----------+--------+-----------+---+---+
|store_product_id|product_id|store_id|price_guide| ds|  c|
+----------------+----------+--------+-----------+---+---+
+----------------+----------+--------+-----------+---+---+

The position of product_id|store_id is differnt .

Upvotes: 3

Related Questions