Mia21
Mia21

Reputation: 119

Pyspark: How to return a tuple list of existing non null columns as one of the column values in dataframe

i'm working with a pyspark dataframe which is:

+----+----+---+---+---+----+
|   a|   b|  c|  d|  e|   f|
+----+----+---+---+---+----+
|   2|12.3|  5|5.6|  6|44.7|
|null|null|  9|9.3| 19|23.5|
|   8| 4.3|  7|0.5| 21| 8.2|
|   9| 3.8|  3|6.5| 45| 4.9|
|   3| 8.7|  2|2.8| 32| 2.9|
+----+----+---+---+---+----+

To create the above dataframe:

rdd =  sc.parallelize([(2,12.3,5,5.6,6,44.7), 
                (None,None,9,9.3,19,23.5), 
                (8,4.3,7,0.5,21,8.2),
                 (9,3.8,3,6.5,45,4.9),
                  (3,8.7,2,2.8,32,2.9)])
df = sqlContext.createDataFrame(rdd, ('a', 'b','c','d','e','f'))
df.show()

I want to create another column 'g' whose values are list of tuples based on existing non null columns. The list of tuples are of form :

((column a, column b),(column c, column d),(column e, column f))

Requirements for output col: 1) Only consider the non null columns while creating the list of tuples. 2) Return the list of tuples.

So the final dataframe with column 'g' would be:

+---+----+---+---+---+----+--------------------------+
|  a|   b|  c|  d|  e|   f|                   g      |
+---+----+---+---+---+----+--------------------------+
|  2|12.3|  5|5.6|  6|44.7|[[2,12.3],[5,5.6],[6,44.7]|
|nul|nul|  9 |9.3| 19|23.5|[[9,9.3],[19,23.5]        |
|  8| 4.3|  7|0.5| 21| 8.2|[[8,4.3],[7,0.5],[21,8.2] |
|  9| 3.8|  3|6.5| 45| 4.9|[[9,3.8],[3,6.5],[45,4.9] |
|  3| 8.7|  2|2.8| 32| 2.9|[[3,8.7],[2,2.8],[32,2.9] |
+---+----+---+---+---+----+--------------------------+

In column "g", the second row tuple has only two pairs as opposed to three, because for second row, we omit column 'a' and 'b' values since they are nulls.

I'm not sure how to dynamically omit the null values in columns and form the tuple list

I tried to partially achieve the final column by a udf:

l1=['a','c','e']
l2=['b','d','f']
def func1(r1,r2):
    l=[]
    for i in range(len(l1)):
        l.append((r1[i],r2[i]))
    return l
func1_udf=udf(func1)
df=df.withColumn('g',func1_udf(array(l1),array(l2)))
df.show()

I tried declaring the udf as ArrayType, it did not work. Any help would be much appreciated. I'm working with pyspark 1.6. Thank you!

Upvotes: 2

Views: 2203

Answers (3)

Suresh
Suresh

Reputation: 5880

Another solution using udf,

>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import *

>>> arr_udf = F.udf(lambda row : [x for x in [row[0:2],row[2:4],row[4:6]] if all(x)],ArrayType(ArrayType(StringType())))
>>> df.select("*",arr_udf(F.struct([df[x] for x in df.columns])).alias('g')).show(truncate=False)
+----+----+---+---+---+----+--------------------------------------------------------------------+
|a   |b   |c  |d  |e  |f   |g                                                                   |
+----+----+---+---+---+----+--------------------------------------------------------------------+
|2   |12.3|5  |5.6|6  |44.7|[WrappedArray(2, 12.3), WrappedArray(5, 5.6), WrappedArray(6, 44.7)]|
|null|null|9  |9.3|19 |23.5|[WrappedArray(9, 9.3), WrappedArray(19, 23.5)]                      |
|8   |4.3 |7  |0.5|21 |8.2 |[WrappedArray(8, 4.3), WrappedArray(7, 0.5), WrappedArray(21, 8.2)] |
|9   |3.8 |3  |6.5|45 |4.9 |[WrappedArray(9, 3.8), WrappedArray(3, 6.5), WrappedArray(45, 4.9)] |
|3   |8.7 |2  |2.8|32 |2.9 |[WrappedArray(3, 8.7), WrappedArray(2, 2.8), WrappedArray(32, 2.9)] |
+----+----+---+---+---+----+--------------------------------------------------------------------+

Upvotes: 0

mayank agrawal
mayank agrawal

Reputation: 2545

I think UDFs should work just fine.

import pyspark.sql.functions as F
from pyspark.sql.types import *

rdd =  sc.parallelize([(2,12.3,5,5.6,6,44.7), 
            (None,None,9,9.3,19,23.5), 
            (8,4.3,7,0.5,21,8.2),
             (9,3.8,3,6.5,45,4.9),
              (3,8.7,2,2.8,32,2.9)])
df = sql.createDataFrame(rdd, ('a', 'b','c','d','e','f'))
df = df.select(*(F.col(c).cast("float").alias(c) for c in df.columns))

def combine(a,b,c,d,e,f):

    combine_ = []
    if None not in [a,b]:
        combine_.append([a,b])
    if None not in [c,d]:
        combine_.append([c,d])
    if None not in [e,f]:
        combine_.append([e,f])
    return combine_

combine_udf = F.udf(combine,ArrayType(ArrayType(FloatType())))
df = df.withColumn('combined', combine_udf(F.col('a'),F.col('b'),F.col('c'),\
               F.col('d'),F.col('e'),F.col('f')))
df.show()

Upvotes: 0

Pratyush Sharma
Pratyush Sharma

Reputation: 289

You can try something like this:

df.withColumn("g", when(col("a").isNotNull() & col("b").isNotNull(), 
array(col("a"),col("b"))).otherwise(array(lit("")))).withColumn("h", 
when(col("c").isNotNull() & col("d").isNotNull(), 
array(col("c"),col("d"))).otherwise(array(lit ("")))).withColumn("i", 
when(col("e").isNotNull() & col("f").isNotNull(), 
array(col("e"),col("f"))).otherwise(array(lit("")))).withColumn("concat", 
array(col("g"),col("h"),col("i"))).drop('g','h','i').show(truncate=False)

Resulting df:

+----+----+---+---+---+----+------------------------------------------------
--------------------------+
|a   |b   |c  |d  |e  |f   |concat                                                                    
|
+----+----+---+---+---+----+------------------------------------------------
--------------------------+
|2   |12.3|5  |5.6|6  |44.7|[WrappedArray(2.0, 12.3), WrappedArray(5.0, 
5.6), WrappedArray(6.0, 44.7)]|
|null|null|9  |9.3|19 |23.5|[WrappedArray(), WrappedArray(9.0, 9.3), 
WrappedArray(19.0, 23.5)]        |
|8   |4.3 |7  |0.5|21 |8.2 |[WrappedArray(8.0, 4.3), WrappedArray(7.0, 0.5), 
WrappedArray(21.0, 8.2)] |
|9   |3.8 |3  |6.5|45 |4.9 |[WrappedArray(9.0, 3.8), WrappedArray(3.0, 6.5), 
WrappedArray(45.0, 4.9)] |
|3   |8.7 |2  |2.8|32 |2.9 |[WrappedArray(3.0, 8.7), WrappedArray(2.0, 2.8), 
WrappedArray(32.0, 2.9)] |
+----+----+---+---+---+----+------------------------------------------------
--------------------------+

Upvotes: 0

Related Questions