Smaillns
Smaillns

Reputation: 3127

Compare and check out differences between two dataframes using pySpark

Let's suppose that we have a dataframe with the following schema

    root
     |-- AUTHOR_ID: integer (nullable = false)
     |-- NAME: string (nullable = true)
     |-- Books: array (nullable = false)
     |    |-- element: struct (containsNull = false)
     |    |    |-- BOOK_ID: integer (nullable = false)
     |    |    |-- Chapters: array (nullable = true) 
     |    |    |    |-- element: struct (containsNull = true)
     |    |    |    |    |-- NAME: string (nullable = true)
     |    |    |    |    |-- NUMBER_PAGES: integer (nullable = true)

As you can see, we nested struct objects

Let's suppose that the following changes are occured:

{
    "AUTHOR_ID": 1,
    "Books": [
        {
            "BOOK_ID": 1,
            "Chapters": [
                {
                    "id": 1,
                    "NAME": {
                      "before": "Etranger",
                      "after": "L'étranger"
                    }      
                }
            ]
        }
    ]
}

Note: we will show only the Ids and the changed values for the relevant items

Upvotes: 0

Views: 5474

Answers (3)

vmartin
vmartin

Reputation: 503

Give a try to:

from gresearch.spark.diff import *
left.diff(right)

See https://github.com/G-Research/spark-extension/blob/master/DIFF.md

Upvotes: 0

greenie
greenie

Reputation: 444

Here is a sample code to join by authorId and then compare.

from pyspark.sql.functions import collect_list, struct, col
from operator import itemgetter
from  pyspark.sql.types import ArrayType, StructType, StringType, StructField

# construct data

data = [('AA_1',  'S1', "10", "1", "Introduction to Quadratic Equation"),
        ('AA_1',  'S1', "10", "2", "Fundamentals"),
        ('AA_1',  'S1', "11", "1", "Preface"),
        ('AA_1',  'S1', "11", "2", "Wading in to the waters"),
        ('AA_2',  'S2', "100", "1", "Introduction"),
        ('AA_2',  'S2', "100", "2", "Fundamentals"),
        ('AA_2',  'S2', "110", "1", "Prologue"),
        ('AA_2',  'S2', "110", "2", "Epilogue"),
]

data2  = [('AA_1',  'S1', "10", "1", "Introduction to Linear Algebra"),
        ('AA_1',  'S1', "10", "2", "Fundamentals"),
        ('AA_1',  'S1', "11", "1", "Preface"),
        ('AA_1',  'S1', "11", "2", "Wading in to the waters"),
        ('AA_2',  'S2', "100", "1", "Introduction"),
        ('AA_2',  'S2', "100", "2", "Fundamentals2"),
        ('AA_2',  'S2', "110", "1", "Prologue"),
        ('AA_2',  'S2', "110", "2", "Epilogue"),
]

df = spark.createDataFrame(data, ["authorId", "name", "bookId", "chapterId", "chapterName"]).groupBy(['authorId', 'name', 'bookId']).agg(collect_list(struct("chapterId", "chapterName")).alias("chapters")).groupBy(['authorId', 'name']).agg(collect_list(struct('bookId', 'chapters')).alias('books'))

df2 = spark.createDataFrame(data2, ["authorId", "name", "bookId", "chapterId", "chapterName"]).groupBy(['authorId', 'name', 'bookId']).agg(collect_list(struct("chapterId", "chapterName")).alias("chapters")).groupBy(['authorId', 'name']).agg(collect_list(struct('bookId', 'chapters')).alias('books'))

df2 = df2.select(col('authorId').alias('authorId2'),col('name').alias('name2'), col('books').alias('books2') )

# join on authorId
df3 = df.join(df2, [df.authorId == df2.authorId2])


# UDF to compare, needs additional checks on books and chapters lengths and Null checks

@udf(ArrayType(StructType([StructField("bookId", StringType()), StructField("chapters", ArrayType(StructType([StructField("chapterId", StringType()), StructField("name", StructType([StructField("before", StringType()), StructField("after", StringType())]))])))])))                                  
def get_book_diff(b1, b2):
  if (len(b1) != len(b2)):
    return None
  b1.sort(key = itemgetter('bookId'))
  b2.sort(key = itemgetter('bookId'))
  list_data = []
  i=0
  for book in b1:
    data = {}
    if book.bookId == b2[i].bookId:
      data['bookId']=book.bookId
      book.chapters.sort(key = itemgetter('chapterId'))
      b2[i].chapters.sort(key = itemgetter('chapterId'))
      data['chapters']=[]
      j=0
      for chap in book.chapters:
        if chap.chapterId == b2[i].chapters[j].chapterId:
          if chap.chapterName != b2[i].chapters[j].chapterName:
            data['chapters'].append({'chapterId':chap.chapterId, 'name': {"before": chap.chapterName, "after": b2[i].chapters[j].chapterName}})
        j+=1
    i+=1  
    list_data.append(data)  
    
  return list_data
        
  
df3 = df3.withColumn('book_diff', get_book_diff('books', 'books2'))

#df3.select('authorId', 'book_diff').show(truncate=False)
display(df3.select('authorId', 'book_diff'))

Upvotes: 1

Matt Andruff
Matt Andruff

Reputation: 5125

I think here the unfortunate requirement is we need to flatten the struct into columns to allow comparison.

import pyspark.sql.functions as F
columns = ["AUTHOR_ID","NAME","Books"] # lazy partial naming

#Original
data = [(1, "James,,Smith",[(1,[(1,"The beggining", 12, "It was a great day")])]), (2, "Stephen King", [(2,[(1,"The start", 12, "It was a great day")])])]
#Update
# Bookid 1 --> added a chapter, fixed a typo in the first chapter. 
# Bookid 2 -->  Changed nothing 
data_after = [(1, "James,,Smith",[(1,[(1,"The begining", 12, "It was a great day"),(2,"The end", 1, "It was a great night")])]), (2, "Stephen King", [(2,[(1,"The start", 12, "It was an a great day")])])]
df = spark.createDataFrame(data=data,schema=columns)
df2 = spark.createDataFrame(data=data_after,schema=columns)

#flatten the struct into columns Could have use withColumn.
df_flat = df.select("*", F.posexplode(F.col("Books")).alias("pos","Book")).select( "*", F.col("Book._1").alias("BookId"), F.posexplode(F.col("Book._2")).alias("pos","Chapter") ).select("*", F.col("Chapter.*"), F.lit("Original").alias("source") );
df2_flat = df2.select("*", F.posexplode(F.col("Books")).alias("pos","Book")).select( "*", F.col("Book._1").alias("BookId"), F.posexplode(F.col("Book._2")).alias("pos","Chapter") ).select("*", F.col("Chapter.*"), F.lit("Update").alias("source") );
#use a union to pull all data together
all = df_flat.union(df2_flat).withColumnRenamed("_1", "Chapter_id")\
.withColumnRenamed("_2", "text")
#Find things that don't have a match these are the additions/updates/deletion
all.groupBy("AUTHOR_ID","BookId","Chapter_id","text").agg(F.first("source"),F.count("text").alias("count")).where(F.col("count") != 2).show()



+---------+------+----------+-------------+--------------------+-----+
|AUTHOR_ID|BookId|Chapter_id|         text|first(source, false)|count|
+---------+------+----------+-------------+--------------------+-----+
|        1|     1|         2|      The end|              Update|    1|
|        1|     1|         1| The begining|              Update|    1|
|        1|     1|         1|The beggining|            Original|    1|
+---------+------+----------+-------------+--------------------+-----+

From here you need to do a little more work. (Think 1 more groupBy down to the Author/Bookid/chapterid, (count the chapterid) then select [with/otherwise] logic on source)

  • If a chapter exists in the Update and Original it's an edit. (count of 2)
  • Only exists in Update it's addition. (count of 1)
  • Only exists in Original it's a deletion. (count of 1)

Building back to your struct back from here is up to you, but I think this demonstrates the idea of what's required. Using the page number of the chapter might actually be a good method to detect change. It's certainly cheaper than comparing strings likely not as accurate.

Upvotes: 0

Related Questions