Shubham Jain
Shubham Jain

Reputation: 5526

How to split one into multiple in spark?

I am trying to split a record having nested data into multiple records.

df = spark.createDataFrame([('1','[{price:100, quantity:1},{price:200, quantity:2},{price:900, quantity:3},{price:500, quantity:5},{price:100, quantity:1},{price:800, quantity:8},{price:700, quantity:7},{price:600, quantity:6}]'),('2','[{price:100, quantity:1}]')],['id','data'])

Input data looks like

id,data
1,[{price:100, quantity:1},{price:200, quantity:2},{price:900, quantity:3},{price:500, quantity:5},{price:100, quantity:1},{price:800, quantity:8},{price:700, quantity:7},{price:600, quantity:6}]
2,[{price:100, quantity:1}]

Expected is to split the records if the array column contains more than 5 records and provide and id2 for each row

id,id2,data
1,1,[{price:100, quantity:1},{price:200, quantity:2},{price:900, quantity:3},{price:500, quantity:5},{price:100, quantity:1}]
1,2,[{price:800, quantity:8},{price:700, quantity:7},{price:600, quantity:6}]
2,1,[{price:100, quantity:1}]

I tried exploding the array column but getting new row with each element i.e. for id 1 getting 8 rows instead of 2.

How can it be done so that it gets exploded such that each row contains minimum 5 records in array?

Upvotes: 2

Views: 136

Answers (1)

jxc
jxc

Reputation: 13998

For Spark 2.4+, you can use SparkSQL builitin functions sequence + transform and do some math on the array indices:

from pyspark.sql import functions as F

df = spark.createDataFrame([('1','[{price:100, quantity:1},{price:200, quantity:2},{price:900, quantity:3},{price:500, quantity:5},{price:100, quantity:1},{price:800, quantity:8},{price:700, quantity:7},{price:600, quantity:6}]'),('2','[{price:100, quantity:1}]')],['id','data'])

N = 5

# for data column, convert String into array of structs
df1 = df.withColumn("data", F.from_json("data", "array<struct<price:int,quantity:int>>",{"allowUnquotedFieldNames":"True"}))

df1.selectExpr("id", f"""
    inline_outer(
      transform(
        sequence(1,ceil(size(data)/{N})), i ->
        (i as id2, slice(data,(i-1)*{N}+1,{N}) as data)
      )
    )
 """).show(truncate=False)
+---+---+--------------------------------------------------+
|id |id2|data                                              |
+---+---+--------------------------------------------------+
|1  |1  |[[100, 1], [200, 2], [900, 3], [500, 5], [100, 1]]|
|1  |2  |[[800, 8], [700, 7], [600, 6]]                    |
|2  |1  |[[100, 1]]                                        |
+---+---+--------------------------------------------------+

Upvotes: 3

Related Questions