agoebel
agoebel

Reputation: 401

How to merge dataframes in pyspark

I have been wracking my brain and I am trying to do the following. Essentially I have two dataframes, one from yesterday and one from today, where today is just a delta (ie new rows only). I want to merge these two together so that the new rows are updated and the old are carried forward.

sc = SparkContext.getOrCreate()

old = sc.parallelize([
    {"id": 1, "value": 10, "date": "yesterday"},
    {"id": 2, "value": 20, "date": "yesterday", "foo": "bar"},
    {"id": 3, "value": 30, "date": "yesterday"}
]).toDF()

new = sc.parallelize([
    {"id": 2, "value": 25, "date": "today"},
    {"id": 4, "value": 45, "date": "today"}
]).toDF()

expected = sc.parallelize([
    {"id": 1, "value": 10, "date": "yesterday"},
    {"id": 2, "value": 25, "date": "today"},
    {"id": 3, "value": 30, "date": "yesterday"},
    {"id": 4, "value": 45, "date": "today"},
]).toDF()

# something to merge old and new ...?

In pure python, I would just use:

old = {"a": 10, "b": 20, "c": 30 }
new = {"b": 25, "d": 45}
expected = {"a": 10, "b": 25, "c": 30, "d": 45 }
calculated = {**old, **new}

What is the 'correct' way to do this? Maybe by joining/coalescing at the same time?

Edit: As pointed out this question is a dupe of the below link. However, that example shows a very manually coded query against very specific column names.

I have a need to reuse this code in approximately 5 dataframes each of which have 20+ columns and I don't want to be hardcoding the merge step against the column names if I don't have to; the schema is still shifting.

Is there really no join/coalesce function in pyspark/spark? I have a working solution with left_anti and union but that smells wrong for some reason.

Upvotes: 0

Views: 5450

Answers (1)

Ali AzG
Ali AzG

Reputation: 1983

I think the simplest way is just to use union and groupby and first function.

old df:

+---+-----+---------+
| id|value|     date|
+---+-----+---------+
|  1|   10|yesterday|
|  2|   20|yesterday|
|  3|   30|yesterday|
+---+-----+---------+

new df:

+---+-----+---------+
| id|value|     date|
+---+-----+---------+
|  2|   25|    today|
|  4|   45|    today|
+---+-----+---------+

the code below union two dataframes:

import pyspark.sql.functions as f

unionDF = old.union(new).sort("date")
unionDF.show()

union df:

+---+-----+---------+
| id|value|     date|
+---+-----+---------+
|  2|   25|    today|
|  4|   45|    today|
|  1|   10|yesterday|
|  2|   20|yesterday|
|  3|   30|yesterday|
+---+-----+---------+

and in the final step, groupby and first function:

firstCols = [f.first(col).alias(str(col)) for col in unionDF.columns[1:]]
finalDF = unionDF.groupby("id").agg(*firstCols).sort("id")

finalDF.show()

final df:

+---+-----+---------+
| id|value|     date|
+---+-----+---------+
|  1|   10|yesterday|
|  2|   25|    today|
|  3|   30|yesterday|
|  4|   45|    today|
+---+-----+---------+

Upvotes: 2

Related Questions