PPPP
PPPP

Reputation: 601

Converting a recursive sql transformation into spark

We are getting orders data which has following fields (Showing only relevant fields)

enter image description here

  1. Orders with NULL original_orderid can be considered as "parent orders"
  2. Some of these parents orders may have child order having child's original_orderid mapped to parent's orderid.
  3. The child order can spawn another child order as shown in image with color coding.

The same data as raw text:

+-----------+----------------+---------------------+----------+
|orderid    |original_orderid|ttime                |price     |
+-----------+----------------+---------------------+----------+
|988782828  |0               |2020-09-0406:00:09.09|3444.0    |
|37377373374|0               |2020-09-0408:41:09.09|26262.0   |
|23222223378|37377373374     |2020-09-0409:02:55.55|33434.0   |
|2111111    |0               |2020-09-0409:05:55.55|44334.0   |
|2422244422 |0               |2020-09-0409:07:14.14|343434.0  |
|66666663388|23222223378     |2020-09-0409:10:14.14|1282.0    |
|44444443391|66666663388     |2020-09-0409:11:34.34|27272.6363|
|22222393392|44444443391     |2020-09-0409:13:38.38|333.0     |
|77777393397|22222393392     |2020-09-0409:14:31.31|3422.0    |
|55656563397|77777393397     |2020-09-0409:16:58.58|27272.0   |
+-----------+----------------+---------------------+----------+

As transformation we are required to map all the childs to their original parent (with original_orderid as NULL) and getting the number of level that order may have. Expected outcome would be :

enter image description here

This is part of migration work from sqlserver to spark. In sql server this was achieved in a view with recursively accessing the parent view.

We can try this transformation in spark with pseudo code like this :

val df = spark.read(raw_data_file)
val parent = df.filter(col(original_orderid).isNull)
                .select(col(orderid).as("orderid"), col(order_id).as("parent_orderid")

val children = df.filter(col(original_orderid).isNotNull).sort(col(ttime))


var prentCollection = //Collect parent df in collection 
val childrenCollection = //Collect child df in collection 

//Traverse through the sorted childrenCollection
for (child <- childrenCollection) ={
    if child.original_orderid in parentCollection.orderid.alias(parent)
    
    insert into parentCollection - child.orderid as orderid, parent.parent_orderid as parent_orderid, child.ttime as ttime, child.price as price
}

This solution requires to collect all the data in driver and hence cannot be distributed and not suitable for big datasets.

Could you please suggest me any other approach to make it work for larger dataset in spark or any improvement to the existing one above.

Upvotes: 2

Views: 183

Answers (1)

St&#233;phane
St&#233;phane

Reputation: 877

You can recursively join and accumulate parents in an array. Here is a quick prototype using Spark v2.1

val addToArray = udf((seq : Seq[String] , item: String) => seq :+ item)
//v2.4.0 use array_union 
val concatArray = udf((seq1 : Seq[String] , seq2 : Seq[String]) => seq1 ++ seq2)
//v2.4.0 use element_at and size 
val lastInArray = udf((seq: Seq[String]) => seq.lastOption.getOrElse(null))
//v2.4.0 and up use slice
val dropLastInArray = udf((seq: Seq[String]) => seq.dropRight(1))

val raw="""|988782828  |0               |2020-09-0406:00:09.09|3444.0    |
|37377373374|0               |2020-09-0408:41:09.09|26262.0   |
|23222223378|37377373374     |2020-09-0409:02:55.55|33434.0   |
|2111111    |0               |2020-09-0409:05:55.55|44334.0   |
|2422244422 |0               |2020-09-0409:07:14.14|343434.0  |
|66666663388|23222223378     |2020-09-0409:10:14.14|1282.0    |
|44444443391|66666663388     |2020-09-0409:11:34.34|27272.6363|
|22222393392|44444443391     |2020-09-0409:13:38.38|333.0     |
|77777393397|22222393392     |2020-09-0409:14:31.31|3422.0    |
|55656563397|77777393397     |2020-09-0409:16:58.58|27272.0   |"""

val df= raw.substring(1).split("\\n").map(_.split("\\|").map(_.trim)).map(r=> (r(0),r(1),r(2),r(3))).toSeq.toDF        ("orderId","parentId","ttime","price").withColumn("parents",array(col("parentId"))) 

def selfJoin(df :DataFrame) : DataFrame = {
      if (df.filter(lastInArray(col("parents")) =!= lit("0")).count > 0)
        selfJoin(df.join(df.select(col("orderId").as("id"), col("parents").as("grandParents")), lastInArray(col("parents"))  === col("id"),"left").withColumn("parents",when(lastInArray(col("parents")) =!= lit("0"),concatArray(col("parents"),col("grandParents"))).otherwise(col("parents"))).drop("grandParents").drop("id"))
      else
      df
     
}

selfJoin(df).withColumn("level",size(col("parents"))).withColumn("top parent",lastInArray(dropLastInArray(col("parents")))).show

Upvotes: 1

Related Questions