Reputation: 3127
Let's suppose that we have a dataframe with the following schema
root
|-- AUTHOR_ID: integer (nullable = false)
|-- NAME: string (nullable = true)
|-- Books: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- BOOK_ID: integer (nullable = false)
| | |-- Chapters: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- NAME: string (nullable = true)
| | | | |-- NUMBER_PAGES: integer (nullable = true)
As you can see, we nested struct objects
Let's suppose that the following changes are occured:
{
"AUTHOR_ID": 1,
"Books": [
{
"BOOK_ID": 1,
"Chapters": [
{
"id": 1,
"NAME": {
"before": "Etranger",
"after": "L'étranger"
}
}
]
}
]
}
Note: we will show only the Ids and the changed values for the relevant items
Upvotes: 0
Views: 5474
Reputation: 503
Give a try to:
from gresearch.spark.diff import *
left.diff(right)
See https://github.com/G-Research/spark-extension/blob/master/DIFF.md
Upvotes: 0
Reputation: 444
Here is a sample code to join by authorId and then compare.
from pyspark.sql.functions import collect_list, struct, col
from operator import itemgetter
from pyspark.sql.types import ArrayType, StructType, StringType, StructField
# construct data
data = [('AA_1', 'S1', "10", "1", "Introduction to Quadratic Equation"),
('AA_1', 'S1', "10", "2", "Fundamentals"),
('AA_1', 'S1', "11", "1", "Preface"),
('AA_1', 'S1', "11", "2", "Wading in to the waters"),
('AA_2', 'S2', "100", "1", "Introduction"),
('AA_2', 'S2', "100", "2", "Fundamentals"),
('AA_2', 'S2', "110", "1", "Prologue"),
('AA_2', 'S2', "110", "2", "Epilogue"),
]
data2 = [('AA_1', 'S1', "10", "1", "Introduction to Linear Algebra"),
('AA_1', 'S1', "10", "2", "Fundamentals"),
('AA_1', 'S1', "11", "1", "Preface"),
('AA_1', 'S1', "11", "2", "Wading in to the waters"),
('AA_2', 'S2', "100", "1", "Introduction"),
('AA_2', 'S2', "100", "2", "Fundamentals2"),
('AA_2', 'S2', "110", "1", "Prologue"),
('AA_2', 'S2', "110", "2", "Epilogue"),
]
df = spark.createDataFrame(data, ["authorId", "name", "bookId", "chapterId", "chapterName"]).groupBy(['authorId', 'name', 'bookId']).agg(collect_list(struct("chapterId", "chapterName")).alias("chapters")).groupBy(['authorId', 'name']).agg(collect_list(struct('bookId', 'chapters')).alias('books'))
df2 = spark.createDataFrame(data2, ["authorId", "name", "bookId", "chapterId", "chapterName"]).groupBy(['authorId', 'name', 'bookId']).agg(collect_list(struct("chapterId", "chapterName")).alias("chapters")).groupBy(['authorId', 'name']).agg(collect_list(struct('bookId', 'chapters')).alias('books'))
df2 = df2.select(col('authorId').alias('authorId2'),col('name').alias('name2'), col('books').alias('books2') )
# join on authorId
df3 = df.join(df2, [df.authorId == df2.authorId2])
# UDF to compare, needs additional checks on books and chapters lengths and Null checks
@udf(ArrayType(StructType([StructField("bookId", StringType()), StructField("chapters", ArrayType(StructType([StructField("chapterId", StringType()), StructField("name", StructType([StructField("before", StringType()), StructField("after", StringType())]))])))])))
def get_book_diff(b1, b2):
if (len(b1) != len(b2)):
return None
b1.sort(key = itemgetter('bookId'))
b2.sort(key = itemgetter('bookId'))
list_data = []
i=0
for book in b1:
data = {}
if book.bookId == b2[i].bookId:
data['bookId']=book.bookId
book.chapters.sort(key = itemgetter('chapterId'))
b2[i].chapters.sort(key = itemgetter('chapterId'))
data['chapters']=[]
j=0
for chap in book.chapters:
if chap.chapterId == b2[i].chapters[j].chapterId:
if chap.chapterName != b2[i].chapters[j].chapterName:
data['chapters'].append({'chapterId':chap.chapterId, 'name': {"before": chap.chapterName, "after": b2[i].chapters[j].chapterName}})
j+=1
i+=1
list_data.append(data)
return list_data
df3 = df3.withColumn('book_diff', get_book_diff('books', 'books2'))
#df3.select('authorId', 'book_diff').show(truncate=False)
display(df3.select('authorId', 'book_diff'))
Upvotes: 1
Reputation: 5125
I think here the unfortunate requirement is we need to flatten the struct into columns to allow comparison.
import pyspark.sql.functions as F
columns = ["AUTHOR_ID","NAME","Books"] # lazy partial naming
#Original
data = [(1, "James,,Smith",[(1,[(1,"The beggining", 12, "It was a great day")])]), (2, "Stephen King", [(2,[(1,"The start", 12, "It was a great day")])])]
#Update
# Bookid 1 --> added a chapter, fixed a typo in the first chapter.
# Bookid 2 --> Changed nothing
data_after = [(1, "James,,Smith",[(1,[(1,"The begining", 12, "It was a great day"),(2,"The end", 1, "It was a great night")])]), (2, "Stephen King", [(2,[(1,"The start", 12, "It was an a great day")])])]
df = spark.createDataFrame(data=data,schema=columns)
df2 = spark.createDataFrame(data=data_after,schema=columns)
#flatten the struct into columns Could have use withColumn.
df_flat = df.select("*", F.posexplode(F.col("Books")).alias("pos","Book")).select( "*", F.col("Book._1").alias("BookId"), F.posexplode(F.col("Book._2")).alias("pos","Chapter") ).select("*", F.col("Chapter.*"), F.lit("Original").alias("source") );
df2_flat = df2.select("*", F.posexplode(F.col("Books")).alias("pos","Book")).select( "*", F.col("Book._1").alias("BookId"), F.posexplode(F.col("Book._2")).alias("pos","Chapter") ).select("*", F.col("Chapter.*"), F.lit("Update").alias("source") );
#use a union to pull all data together
all = df_flat.union(df2_flat).withColumnRenamed("_1", "Chapter_id")\
.withColumnRenamed("_2", "text")
#Find things that don't have a match these are the additions/updates/deletion
all.groupBy("AUTHOR_ID","BookId","Chapter_id","text").agg(F.first("source"),F.count("text").alias("count")).where(F.col("count") != 2).show()
+---------+------+----------+-------------+--------------------+-----+
|AUTHOR_ID|BookId|Chapter_id| text|first(source, false)|count|
+---------+------+----------+-------------+--------------------+-----+
| 1| 1| 2| The end| Update| 1|
| 1| 1| 1| The begining| Update| 1|
| 1| 1| 1|The beggining| Original| 1|
+---------+------+----------+-------------+--------------------+-----+
From here you need to do a little more work. (Think 1 more groupBy down to the Author/Bookid/chapterid, (count the chapterid) then select [with/otherwise] logic on source)
Building back to your struct back from here is up to you, but I think this demonstrates the idea of what's required. Using the page number of the chapter might actually be a good method to detect change. It's certainly cheaper than comparing strings likely not as accurate.
Upvotes: 0