MOHD NAYYAR
MOHD NAYYAR

Reputation: 41

pyspark: merge two payload fields based on field ingested_at while joining two dataframes in pyspark

I have two dataframe below.

df1:

txn_id amount x orderStatus ingested_at
1 100 - PENDING 1652265268576
2 200 - FAILURE 1652265283215

df2:

txn_id amount y orderStatus ingested_at
1 150 - SUCCESS 1652265283215
2 200 - SUCCESS 1652265268576

Both the dataframes have different schema(as in above example one has col x and another has col y, there are more such different column.)

when joining these two dataframe on field txn_id, I am getting duplicate columns such as amount and orderStatus, ingested_at.

when joining these two dataframes, what I want is to merge these payload such that fields amount, orderStatus should be picked from dataframe row which has greater ingested_at, rest of the field can be picked from df1.

for example in above case the result should be something like that:

joined_df:

txn_id amount x y orderStatus ingested_at
1 150 - - SUCCESS 1652265283215
2 200 - - FAILURE 1652265283215

the values of fields amount, orderStatus is picked where ingested_at has greater value.

in java I have done something like that but not sure how to do that in pyspark.

        JavaPairRDD<String, Tuple2<TxnDateS3Model, TxnDateS3Model>> joinedResult = level2PairRdd.join(captureNotifyPairRdd);
    JavaPairRDD<String, TxnDateS3Model> l2joinedCaptureNotifyPairRdd = joinedResult
            .mapToPair(joinResult -> new Tuple2<>(joinResult._1, DataUtils.mergePayload(joinResult._2._1, joinResult._2._2)));

mergePayload class have the logic to merge two payload based on their ingestion_time.

Anyone have any idea how it can be achieved in pyspark.

Upvotes: 0

Views: 104

Answers (1)

fuzzy-memory
fuzzy-memory

Reputation: 195

There may be a more optimal solution of this but from the info you have given, I am assuming that both df1 and df2 have the same schema

  1. Start by doing a union of both DataFrames:
  2. Rank the rows to get the latest record for each value of txn_id
  3. Discard the rows where the rank indicates that row is not the latest record

The implementation would look something like this:

from pyspark.sql.functions import *
from pyspark.sql.window import Window

final_df = df1 \
    .unionByName(df2) \
    .withColumn('row_num', row_number().over(Window.partitionBy('txn_id').orderBy(desc('ingested_at')))) \
    .filter(col('row_num') > 1) \
    .drop('row_num')

final_df will contain the latest record for each txn_id

Upvotes: 0

Related Questions