Reputation: 41
I have two dataframe below.
df1:
txn_id | amount | x | orderStatus | ingested_at |
---|---|---|---|---|
1 | 100 | - | PENDING | 1652265268576 |
2 | 200 | - | FAILURE | 1652265283215 |
df2:
txn_id | amount | y | orderStatus | ingested_at |
---|---|---|---|---|
1 | 150 | - | SUCCESS | 1652265283215 |
2 | 200 | - | SUCCESS | 1652265268576 |
Both the dataframes have different schema(as in above example one has col x and another has col y, there are more such different column.)
when joining these two dataframe on field txn_id, I am getting duplicate columns such as amount and orderStatus, ingested_at.
when joining these two dataframes, what I want is to merge these payload such that fields amount, orderStatus should be picked from dataframe row which has greater ingested_at, rest of the field can be picked from df1.
for example in above case the result should be something like that:
joined_df:
txn_id | amount | x | y | orderStatus | ingested_at |
---|---|---|---|---|---|
1 | 150 | - | - | SUCCESS | 1652265283215 |
2 | 200 | - | - | FAILURE | 1652265283215 |
the values of fields amount, orderStatus is picked where ingested_at has greater value.
in java I have done something like that but not sure how to do that in pyspark.
JavaPairRDD<String, Tuple2<TxnDateS3Model, TxnDateS3Model>> joinedResult = level2PairRdd.join(captureNotifyPairRdd);
JavaPairRDD<String, TxnDateS3Model> l2joinedCaptureNotifyPairRdd = joinedResult
.mapToPair(joinResult -> new Tuple2<>(joinResult._1, DataUtils.mergePayload(joinResult._2._1, joinResult._2._2)));
mergePayload class have the logic to merge two payload based on their ingestion_time.
Anyone have any idea how it can be achieved in pyspark.
Upvotes: 0
Views: 104
Reputation: 195
There may be a more optimal solution of this but from the info you have given, I am assuming that both df1
and df2
have the same schema
txn_id
The implementation would look something like this:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
final_df = df1 \
.unionByName(df2) \
.withColumn('row_num', row_number().over(Window.partitionBy('txn_id').orderBy(desc('ingested_at')))) \
.filter(col('row_num') > 1) \
.drop('row_num')
final_df
will contain the latest record for each txn_id
Upvotes: 0