mmz
mmz

Reputation: 1151

PySpark: How to explode two columns of arrays

I have a DF in PySpark where I'm trying to explode two columns of arrays. Here's my DF:

+--------+-----+--------------------+--------------------+
|      id| zip_|              values|                time|
+--------+-----+--------------------+--------------------+
|56434459|02138|[1.033990484, 1.0...|[1.624322475139E9...|
|56434508|02138|[1.04760919, 1.07...|[1.624322475491E9...|
|56434484|02138|[1.047177758, 1.0...|[1.62432247655E9,...|
|56434495|02138|[0.989590562, 1.0...|[1.624322476937E9...|
|56434465|02138|[1.051481754, 1.1...|[1.624322477275E9...|
|56434469|02138|[1.026476497, 1.1...|[1.624322477605E9...|
|56434463|02138|[1.10024864, 1.31...|[1.624322478085E9...|
|56434458|02138|[1.011091305, 1.0...|[1.624322478462E9...|
|56434464|02138|[1.038230333, 1.0...|[1.62432247882E9,...|
|56434474|02138|[1.041924752, 1.1...|[1.624322479386E9...|
|56434452|02138|[1.044482358, 1.1...|[1.624322479919E9...|
|56434445|02138|[1.050144598, 1.1...|[1.624322480344E9...|
|56434499|02138|[1.047851812, 1.0...|[1.624322480785E9...|
|56434449|02138|[1.044700917, 1.1...|[1.6243224811E9, ...|
|56434461|02138|[1.03341455, 1.07...|[1.624322481443E9...|
|56434526|02138|[1.04779412, 1.07...|[1.624322481861E9...|
|56434433|02138|[1.0498406, 1.139...|[1.624322482181E9...|
|56434507|02138|[1.0013894403, 1....|[1.624322482419E9...|
|56434488|02138|[1.047270063, 1.0...|[1.624322482716E9...|
|56434451|02138|[1.043182727, 1.1...|[1.624322483061E9...|
+--------+-----+--------------------+--------------------+
only showing top 20 rows

My current solution is to do a posexplode on each column, combined with a concat_ws for a unique ID, creating two DFs.

First DF:

+-----------+-----+-----------+
|     new_id| zip_| values_new|
+-----------+-----+-----------+
| 56434459_0|02138|1.033990484|
| 56434459_1|02138| 1.07805057|
| 56434459_2|02138| 1.09000133|
| 56434459_3|02138| 1.07009546|
| 56434459_4|02138|1.102403015|
| 56434459_5|02138|  1.1291009|
| 56434459_6|02138|1.088399924|
| 56434459_7|02138|1.047513142|
| 56434459_8|02138|1.010418795|
| 56434459_9|02138|        1.0|
|56434459_10|02138|        1.0|
|56434459_11|02138|        1.0|
|56434459_12|02138| 0.99048968|
|56434459_13|02138|0.984854524|
|56434459_14|02138|        1.0|
| 56434508_0|02138| 1.04760919|
| 56434508_1|02138| 1.07858897|
| 56434508_2|02138| 1.09084267|
| 56434508_3|02138| 1.07627785|
| 56434508_4|02138| 1.13778706|
+-----------+-----+-----------+
only showing top 20 rows

Second DF:

+-----------+-----+----------------+
|     new_id| zip_|      values_new|
+-----------+-----+----------------+
| 56434459_0|02138|1.624322475139E9|
| 56434459_1|02138|1.592786475139E9|
| 56434459_2|02138|1.561164075139E9|
| 56434459_3|02138|1.529628075139E9|
| 56434459_4|02138|1.498092075139E9|
| 56434459_5|02138|1.466556075139E9|
| 56434459_6|02138|1.434933675139E9|
| 56434459_7|02138|1.403397675139E9|
| 56434459_8|02138|1.371861675139E9|
| 56434459_9|02138|1.340325675139E9|
|56434459_10|02138|1.308703275139E9|
|56434459_11|02138|1.277167275139E9|
|56434459_12|02138|1.245631275139E9|
|56434459_13|02138|1.214095275139E9|
|56434459_14|02138|1.182472875139E9|
| 56434508_0|02138|1.624322475491E9|
| 56434508_1|02138|1.592786475491E9|
| 56434508_2|02138|1.561164075491E9|
| 56434508_3|02138|1.529628075491E9|
| 56434508_4|02138|1.498092075491E9|
+-----------+-----+----------------+
only showing top 20 rows

I then join the DFs on new_id, resulting in:

+------------+-----+----------------+-----+------------------+
|      new_id| zip_|      values_new| zip_|        values_new|
+------------+-----+----------------+-----+------------------+
| 123957783_3|02138|1.527644029268E9|02138|               1.0|
| 125820702_3|02138|1.527643636531E9|02138|       1.013462378|
|165689784_12|02138|1.243647038288E9|02138|0.9283950599999999|
|165689784_14|02138|1.180488638288E9|02138|       1.011595547|
| 56424973_12|02138|1.245630256025E9|02138|0.9566622300000001|
| 56424989_14|02138|1.182471866886E9|02138|               1.0|
|  56425304_7|02138|1.403398444955E9|02138|       1.028527131|
|  56425386_6|02138|1.432949752808E9|02138|        1.08516484|
| 56430694_17|02138|1.087866094991E9|02138|       1.120045416|
| 56430700_20|02138| 9.61635686239E8|02138|       1.099920854|
| 56430856_13|02138|1.214097787512E9|02138|       0.989263804|
| 56430866_12|02138|1.245633801277E9|02138|       0.990684134|
| 56430875_10|02138|1.308705777269E9|02138|               1.0|
|  56430883_3|02138|1.529630585921E9|02138|        1.06920212|
| 56430987_13|02138|1.214100806414E9|02138|       0.978794644|
|  56431009_1|02138|1.592792025664E9|02138|        1.07923349|
|  56431013_9|02138|1.340331235566E9|02138|               1.0|
|  56431025_8|02138|1.371860189767E9|02138|         0.9477155|
| 56432373_13|02138|1.214092187852E9|02138|       0.994825498|
|  56432421_2|02138|1.561161037707E9|02138|        1.11343257|
+------------+-----+----------------+-----+------------------+
only showing top 20 rows

My question: Is there a more effective way to get the resultant DF? I tried doing two posexplodes in parallel but PySpark allows only one.

Upvotes: 0

Views: 106

Answers (1)

Felix K Jose
Felix K Jose

Reputation: 882

You can achieve it as follows:

df = (df.withColumn("values_new", explode(col("values")))
      .withColumn("times_new", explode(col("time")))
      .withColumn("id_new", monotonically_increasing_id()))

Upvotes: 2

Related Questions