baatchen
baatchen

Reputation: 489

pyspark, new column, mismatch from pattern

I need to create a new column called Check that will show Mismatch if the value in a group of rows is not the same.

What I have now:

data = [
  ("Category2","File1",2,2),
  ("Category2","File2",2,2),
  ("Category2","File3",2,2),
  ("Category2","File4",5,2),
  ("Category1","File1",4,1),
  ("Category1","File2",4,1),
  ("Category1","File3",4,1),
  ("Category1","File4",4,1),
]

cols = ["Category","Filename","count","DistinctCount"]

df = spark.createDataFrame(data,cols)

df.show()


+---------+--------+-----+-------------+
| Category|Filename|count|DistinctCount|
+---------+--------+-----+-------------+
|Category2|   File1|    2|            2|
|Category2|   File2|    2|            2|
|Category2|   File3|    2|            2|
|Category2|   File4|    5|            2|
|Category1|   File1|    4|            1|
|Category1|   File2|    4|            1|
|Category1|   File3|    4|            1|
|Category1|   File4|    4|            1|
+---------+--------+-----+-------------+


Desired result:


+---------+--------+-----+-------------+---------+
| Category|Filename|count|DistinctCount|    Check|
+---------+--------+-----+-------------+---------+
|Category2|   File1|    2|            2|       OK|
|Category2|   File3|    2|            2|       OK|
|Category2|   File2|    2|            2|       OK|
|Category2|   File4|    5|            2| Mismatch|
|Category1|   File1|    4|            1|       OK|
|Category1|   File4|    4|            1|       OK|
|Category1|   File2|    4|            1|       OK|
|Category1|   File3|    4|            1|       OK|
+---------+--------+-----+-------------+---------+


I'm thinking of using a window function to group the rows by Category but stuck on how to think/write the logic for the mismatch.

Thank you!

/B

Upvotes: 3

Views: 781

Answers (2)

wwnde
wwnde

Reputation: 26686

from pyspark.sql import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import *

win=Window.partitionBy ('count')

(df.withColumn('UniqueCount',F.count('DistinctCount').over(win))#groupby count and count DistinctCount in eachgroup
 
     .withColumn('UniqueCount',when(F.col('UniqueCount')=='1','mismatch').otherwise('ok'))# Attribute with mismatch if UniqueCount=1, else OK
 
     .orderBy(F.asc('Category'))#Sort dataframe
 
     .show())


+---------+--------+-----+-------------+-----------+

| Category|Filename|count|DistinctCount|UniqueCount|
+---------+--------+-----+-------------+-----------+
|Category1|   File1|    4|            1|         ok|
|Category1|   File2|    4|            1|         ok|
|Category1|   File3|    4|            1|         ok|
|Category1|   File4|    4|            1|         ok|
|Category2|   File1|    2|            2|         ok|
|Category2|   File4|    5|            2|   mismatch|
|Category2|   File2|    2|            2|         ok|
|Category2|   File3|    2|            2|         ok|
+---------+--------+-----+-------------+-----------+

Upvotes: 2

werner
werner

Reputation: 14905

The first step is to calculate per Category which value of count is ok and which value should be mapped to Mismatch.

The data can be grouped by Category while collecting the amount of different values in the column count per Category into a list. Then the list is sorted by this amount in descending order. If the list has size 1, everything is ok. Otherwise we assume that the first element in the list is ok and all others are not. Only if the first and the second amount are equal, no entry is ok. This calculation is implemented via transform.

df_check = df.withColumnRenamed("count", "count_val") \
    .groupBy("Category", "count_val").count() \
    .groupBy("Category").agg(F.sort_array(F.collect_list(F.struct("count", "count_val")),False).alias("count")) \
    .withColumn("counts", F.expr("if(size(count)==1, array((count[0]['count_val'], 'OK')), \
        transform(count, (x,i)-> if( i == 0 and count[i+1]['count'] <> x['count'], \
        (x['count_val'], 'OK'),(x['count_val'],'Mismatch'))))")) \
    .withColumn("counts", F.explode("counts")) \
    .selectExpr("Category", "counts.col1 as count", "counts.col2 as Check")

df_check now contains

+---------+-----+--------+
| Category|count|   Check|
+---------+-----+--------+
|Category2|    2|      OK|
|Category2|    5|Mismatch|
|Category1|    4|      OK|
+---------+-----+--------+

The second step is to join the original df and df_check:

df.join(df_check, on=["Category", "count"], how="left_outer") \
    .orderBy("Category", "Filename") \
    .select("Category", "Filename", "count", "DistinctCount", "Check") \
    .show()

Result:

+---------+--------+-----+-------------+--------+
| Category|Filename|count|DistinctCount|   Check|
+---------+--------+-----+-------------+--------+
|Category1|   File1|    4|            1|      OK|
|Category1|   File2|    4|            1|      OK|
|Category1|   File3|    4|            1|      OK|
|Category1|   File4|    4|            1|      OK|
|Category2|   File1|    2|            2|      OK|
|Category2|   File2|    2|            2|      OK|
|Category2|   File3|    2|            2|      OK|
|Category2|   File4|    5|            2|Mismatch|
+---------+--------+-----+-------------+--------+

Upvotes: 1

Related Questions