Reputation: 401
I have been wracking my brain and I am trying to do the following. Essentially I have two dataframes, one from yesterday and one from today, where today is just a delta (ie new rows only). I want to merge these two together so that the new rows are updated and the old are carried forward.
sc = SparkContext.getOrCreate()
old = sc.parallelize([
{"id": 1, "value": 10, "date": "yesterday"},
{"id": 2, "value": 20, "date": "yesterday", "foo": "bar"},
{"id": 3, "value": 30, "date": "yesterday"}
]).toDF()
new = sc.parallelize([
{"id": 2, "value": 25, "date": "today"},
{"id": 4, "value": 45, "date": "today"}
]).toDF()
expected = sc.parallelize([
{"id": 1, "value": 10, "date": "yesterday"},
{"id": 2, "value": 25, "date": "today"},
{"id": 3, "value": 30, "date": "yesterday"},
{"id": 4, "value": 45, "date": "today"},
]).toDF()
# something to merge old and new ...?
In pure python, I would just use:
old = {"a": 10, "b": 20, "c": 30 }
new = {"b": 25, "d": 45}
expected = {"a": 10, "b": 25, "c": 30, "d": 45 }
calculated = {**old, **new}
What is the 'correct' way to do this? Maybe by joining/coalescing at the same time?
Edit: As pointed out this question is a dupe of the below link. However, that example shows a very manually coded query against very specific column names.
I have a need to reuse this code in approximately 5 dataframes each of which have 20+ columns and I don't want to be hardcoding the merge step against the column names if I don't have to; the schema is still shifting.
Is there really no join/coalesce function in pyspark/spark? I have a working solution with left_anti and union but that smells wrong for some reason.
Upvotes: 0
Views: 5450
Reputation: 1983
I think the simplest way is just to use union
and groupby
and first
function.
old df:
+---+-----+---------+
| id|value| date|
+---+-----+---------+
| 1| 10|yesterday|
| 2| 20|yesterday|
| 3| 30|yesterday|
+---+-----+---------+
new df:
+---+-----+---------+
| id|value| date|
+---+-----+---------+
| 2| 25| today|
| 4| 45| today|
+---+-----+---------+
the code below union two dataframes:
import pyspark.sql.functions as f
unionDF = old.union(new).sort("date")
unionDF.show()
union df:
+---+-----+---------+
| id|value| date|
+---+-----+---------+
| 2| 25| today|
| 4| 45| today|
| 1| 10|yesterday|
| 2| 20|yesterday|
| 3| 30|yesterday|
+---+-----+---------+
and in the final step, groupby
and first
function:
firstCols = [f.first(col).alias(str(col)) for col in unionDF.columns[1:]]
finalDF = unionDF.groupby("id").agg(*firstCols).sort("id")
finalDF.show()
final df:
+---+-----+---------+
| id|value| date|
+---+-----+---------+
| 1| 10|yesterday|
| 2| 25| today|
| 3| 30|yesterday|
| 4| 45| today|
+---+-----+---------+
Upvotes: 2