Reputation: 601
We are getting orders data which has following fields (Showing only relevant fields)
The same data as raw text:
+-----------+----------------+---------------------+----------+
|orderid |original_orderid|ttime |price |
+-----------+----------------+---------------------+----------+
|988782828 |0 |2020-09-0406:00:09.09|3444.0 |
|37377373374|0 |2020-09-0408:41:09.09|26262.0 |
|23222223378|37377373374 |2020-09-0409:02:55.55|33434.0 |
|2111111 |0 |2020-09-0409:05:55.55|44334.0 |
|2422244422 |0 |2020-09-0409:07:14.14|343434.0 |
|66666663388|23222223378 |2020-09-0409:10:14.14|1282.0 |
|44444443391|66666663388 |2020-09-0409:11:34.34|27272.6363|
|22222393392|44444443391 |2020-09-0409:13:38.38|333.0 |
|77777393397|22222393392 |2020-09-0409:14:31.31|3422.0 |
|55656563397|77777393397 |2020-09-0409:16:58.58|27272.0 |
+-----------+----------------+---------------------+----------+
As transformation we are required to map all the childs to their original parent (with original_orderid as NULL) and getting the number of level that order may have. Expected outcome would be :
This is part of migration work from sqlserver to spark. In sql server this was achieved in a view with recursively accessing the parent view.
We can try this transformation in spark with pseudo code like this :
val df = spark.read(raw_data_file)
val parent = df.filter(col(original_orderid).isNull)
.select(col(orderid).as("orderid"), col(order_id).as("parent_orderid")
val children = df.filter(col(original_orderid).isNotNull).sort(col(ttime))
var prentCollection = //Collect parent df in collection
val childrenCollection = //Collect child df in collection
//Traverse through the sorted childrenCollection
for (child <- childrenCollection) ={
if child.original_orderid in parentCollection.orderid.alias(parent)
insert into parentCollection - child.orderid as orderid, parent.parent_orderid as parent_orderid, child.ttime as ttime, child.price as price
}
This solution requires to collect all the data in driver and hence cannot be distributed and not suitable for big datasets.
Could you please suggest me any other approach to make it work for larger dataset in spark or any improvement to the existing one above.
Upvotes: 2
Views: 183
Reputation: 877
You can recursively join and accumulate parents in an array. Here is a quick prototype using Spark v2.1
val addToArray = udf((seq : Seq[String] , item: String) => seq :+ item)
//v2.4.0 use array_union
val concatArray = udf((seq1 : Seq[String] , seq2 : Seq[String]) => seq1 ++ seq2)
//v2.4.0 use element_at and size
val lastInArray = udf((seq: Seq[String]) => seq.lastOption.getOrElse(null))
//v2.4.0 and up use slice
val dropLastInArray = udf((seq: Seq[String]) => seq.dropRight(1))
val raw="""|988782828 |0 |2020-09-0406:00:09.09|3444.0 |
|37377373374|0 |2020-09-0408:41:09.09|26262.0 |
|23222223378|37377373374 |2020-09-0409:02:55.55|33434.0 |
|2111111 |0 |2020-09-0409:05:55.55|44334.0 |
|2422244422 |0 |2020-09-0409:07:14.14|343434.0 |
|66666663388|23222223378 |2020-09-0409:10:14.14|1282.0 |
|44444443391|66666663388 |2020-09-0409:11:34.34|27272.6363|
|22222393392|44444443391 |2020-09-0409:13:38.38|333.0 |
|77777393397|22222393392 |2020-09-0409:14:31.31|3422.0 |
|55656563397|77777393397 |2020-09-0409:16:58.58|27272.0 |"""
val df= raw.substring(1).split("\\n").map(_.split("\\|").map(_.trim)).map(r=> (r(0),r(1),r(2),r(3))).toSeq.toDF ("orderId","parentId","ttime","price").withColumn("parents",array(col("parentId")))
def selfJoin(df :DataFrame) : DataFrame = {
if (df.filter(lastInArray(col("parents")) =!= lit("0")).count > 0)
selfJoin(df.join(df.select(col("orderId").as("id"), col("parents").as("grandParents")), lastInArray(col("parents")) === col("id"),"left").withColumn("parents",when(lastInArray(col("parents")) =!= lit("0"),concatArray(col("parents"),col("grandParents"))).otherwise(col("parents"))).drop("grandParents").drop("id"))
else
df
}
selfJoin(df).withColumn("level",size(col("parents"))).withColumn("top parent",lastInArray(dropLastInArray(col("parents")))).show
Upvotes: 1