user15653327
user15653327

Reputation:

Grouping & data wrangling on certain conditions in a Spark dataframe

I have the below dataframe in spark

+---------+--------------+-------+------------+--------+
|pid      |    tid       |account|date        |depid   |
+---------+--------------+-------+------------+--------+
|      153|4512          |  30095|11272020    |       0|
|      153|4512          |  30096|11272020    |      30|
|      145|4513          |  40095|11272020    |       0|
|      135|4512          |  30096|11272020    |       0|
|      153|4512          |  30097|11272020    |       0|
|      145|4513          |  30094|11272020    |       0|
+---------+--------------+-------+------------+--------+

I need to group the records by pid, tid and date so after grouping the dataframe looks like

+---------+--------------+-------+------------+---------+
|pid      |    tid       |account|date        |depid    |
+---------+--------------+-------+------------+---------+
|      153|4512          |  30095|11272020    |       0 |
|      153|4512          |  30096|11272020    |       30|
|      153|4512          |  30097|11272020    |        0|
|      145|4513          |  40095|11272020    |        0|
|      145|4513          |  30094|11272020    |        0|
|      135|4512          |  30096|11272020    |        0|
+---------+--------------+-------+------------+---------+

After grouping ,I need to check if any records in this group has an account in 30095 or 40095 then need to replace all the records in that group whose depId is 0 with first 4 digits of account , the expected outcome is

+---------+--------------+-------+------------+---------+
|pid      |    tid       |account|date        |depid    |
+---------+--------------+-------+------------+---------+
|      153|4512          |  30095|11272020    |    3009 |
|      153|4512          |  30096|11272020    |       30|
|      153|4512          |  30097|11272020    |     3009|
|      145|4513          |  40095|11272020    |     4009|
|      145|4513          |  30094|11272020    |     4009|
|      135|4512          |  30096|11272020    |        0|
+---------+--------------+-------+------------+---------+

I tried the below code but it is not working for me

WindowSpec windowSpec = Window.partitionBy("pid","tid","date").orderBy("account");
        Column roworder = rank().over(windowSpec).as("rank");
        Dataset<Row> df1 = df.select(df.col("*"),roworder);


      Dataset<Row> df2 = df1.withColumn("depid1",
                 
                  .when(df1.col("account").equalTo("40095").and(df1.col("depid").equalTo("0")), 4009)
                     .when(df1.col("rank").gt(1).and(df1.col("depid").equalTo("0")), 4009)
                         .when(df1.col("account").equalTo("30095").and(df1.col("depid").equalTo("0")), 3009)
                     .when(df1.col("rank").gt(1).and(df1.col("depid").equalTo("0")), 3009)

                     .otherwise(df1.col("depid"))
          ).orderBy(col("pid").desc()).drop("depid1").withColumnRenamed("sourcedid1","depid")

but it is producing the below output as

+---------+--------------+-------+------------+---------+
|pid      |    tid       |account|date        |depid    |
+---------+--------------+-------+------------+---------+
|      153|4512          |  30095|11272020    |    3009 |
|      153|4512          |  30096|11272020    |       30|
|      153|4512          |  30097|11272020    |     4009|
|      145|4513          |  40095|11272020    |     4009|
|      145|4513          |  30094|11272020    |     4009|
|      135|4512          |  30096|11272020    |        0|
+---------+--------------+-------+------------+---------+

I am not sure what am I doing incorrectly here

Upvotes: 1

Views: 99

Answers (1)

Ged
Ged

Reputation: 18108

You will need to convert to JAVA. I suggest you use the Scala API, it makes life far easier. Also, you may have different data types.

Here is my alternative which I see more as a data analysis task. I added some extra records to demonstrate the point and make more generic and robust. I do not think your approach is sound enough. Anyway, we can all learn.

So, here goes:

import org.apache.spark.sql.functions._
///...

// More a data analysys problem.

// 1. Gen sample data.
val df = Seq( ( 153, 4512, "30095", "11272020", 0 ),
              ( 153, 4512, "30096", "11272020", 30 ),
                           ( 153, 4512, "30096", "11272020", 30 ),  // extra record
              ( 145, 4513, "40095", "11272020", 0 ),
                           ( 145, 4513, "40095", "11272020", 0 ),   // extra record
                           ( 145, 4513, "40095", "11272020", 200 ), // extra record
              ( 135, 4512, "30096", "11272020", 0 ),
              ( 153, 4512, "30097", "11272020", 0 ),
              ( 145, 4513, "30094", "11272020", 0 )
).toDF("pid","tid","account","date","depid")
df.show()

// 2. Get the groups with accounts of relevance. Note they may have records not needing to be processed.
val dfg = df.filter(df("account").isin("30095", "40095")).select("pid","tid","date").distinct().toDF("pidg", "tidg", "dateg")
dfg.show()

// 3. Get the data that needs to be processed. Take into account performance.
val dfp = df.as("df").join(dfg.as("dfg"), $"df.pid" === $"dfg.pidg" && $"df.tid" === $"dfg.tidg" && $"df.date" === $"dfg.dateg" && $"df.depid" === 0, "inner")
            .drop("pidg").drop("tidg").drop("dateg")
dfp.show()

// 4. Get records that need not be processed for later UNION operation.
val res1 = df.exceptAll(dfp)
res1.show()

// 5. Process those records needed.
val res2 = dfp.withColumn("depid2", substring(col("account"), 0, 4).cast("int")).drop("depid").toDF("pid","tid","account","date","depid")
res2.show()

// 6. Final result.
val res = res1.union(res2)
res.show()

results finally in, in a performant way:

+---+----+-------+--------+-----+
|pid| tid|account|    date|depid|
+---+----+-------+--------+-----+
|153|4512|  30096|11272020|   30|
|153|4512|  30096|11272020|   30|
|145|4513|  40095|11272020|  200|
|135|4512|  30096|11272020|    0|
|153|4512|  30095|11272020| 3009|
|145|4513|  40095|11272020| 4009|
|145|4513|  40095|11272020| 4009|
|153|4512|  30097|11272020| 3009|
|145|4513|  30094|11272020| 3009|
+---+----+-------+--------+-----+

Upvotes: 1

Related Questions