Manoj Mittal
Manoj Mittal

Reputation: 11

Explode function does not work with struct in pyspark

I cannot resolve explode('learner') due to data type mismatch: input to function explode should be array or map type, not struct<_1:structname:string,email:string,city:string,_2:structname:string,email:string,city:string,_3:structname:string,email:string,city:string>;;

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
    
course_structure = Row('course_id','course_name','fee')
course1=course_structure('1001','hadoop','7000')
course2=course_structure('1002','spark','8000')
course3=course_structure('1003','terraform','9000')
course4=course_structure('1004','python','3000')

learner = Row('name','email','city')
learner1=learner('amit','[email protected]','mumbai')
learner2=learner('rakesh','[email protected]','pune')
learner3=learner('money','[email protected]','newyork')
learner4=learner('simon', '[email protected]','hongkong')
learner5=learner('venkat','[email protected]','chennai')
learner6=learner('rama','[email protected]','aus')
learner7=learner('dal','[email protected]','london')

courselearner1 = Row(course=course1,learner=(learner1,learner2,learner3))
courselearner2 = Row(course=course2,learner=(learner3,learner4,learner2))
courselearner3 = Row(course=course3,learner=(learner5,learner6,learner4))
courselearner4 = Row(course=course4,learner=(learner7,learner1,learner5))

seq1=courselearner1,courselearner2                
seq2=courselearner3,courselearner4
    
df1 = spark.createDataFrame(seq1)
df2 = spark.createDataFrame(seq2)

combinedf = df1.union(df2)

combinedf.printSchema()
combinedf.show(truncate=False)

combinedf.select("course.*",explode("learner"))

Upvotes: 1

Views: 4002

Answers (2)

Manoj Mittal
Manoj Mittal

Reputation: 11

Here is the answer that i have found



from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
course_structure = Row('course_id','course_name','fee')
course1=course_structure('1001','hadoop','7000')
course2=course_structure('1002','spark','8000')
course3=course_structure('1003','terraform','9000')
course4=course_structure('1004','python','3000')
learner = Row('name','email','city')
learner1=learner('amit','[email protected]','mumbai')
learner2=learner('rakesh','[email protected]','pune')
learner3=learner('money','[email protected]','newyork')
learner4=learner('simon', '[email protected]','hongkong')
learner5=learner('venkat','[email protected]','chennai')
learner6=learner('ram','[email protected]','aus')
learner7=learner('daljeet','[email protected]','london')
courselearner1 = Row(course=list(course1),learner=list((list(learner1),list(learner2),list(learner3))))
courselearner2 = Row(course=list(course2),learner=list((list(learner3),list(learner4),list(learner2))))
courselearner3 = Row(course=list(course3),learner=list((list(learner5),list(learner6),list(learner4))))
courselearner4 = Row(course=list(course4),learner=list((list(learner7),list(learner1),list(learner5))))
seq1=courselearner1,courselearner2                
seq2=courselearner3,courselearner4
df1 = spark.createDataFrame(seq1)
df2 = spark.createDataFrame(seq2)
combinedf = df1.union(df2)
df = combinedf.withColumn("learner",explode("learner"))
df.show(truncate=False)
course_id =   df["course"].getItem(0)
course_name = df["course"].getItem(1)
course_fees = df["course"].getItem(2)
learner_name = df["learner"].getItem(0)
learner_email =df["learner"].getItem(1)
learner_city = df["learner"].getItem(2)
learnerdf = df.select(course_id,course_name,course_fees,learner_name,learner_email,learner_city) \
        .select(col("course[1]").alias("course_name"), \
           col("learner[0]").alias("learner_name"), \
           col("learner[1]").alias("email"), \
           col("learner[2]").alias("city")).groupBy("email").count().show()


Upvotes: 0

notNull
notNull

Reputation: 31490

explode only works with array or map types but you are having all struct type.

You can directly access struct by struct_field_name.*

Example:

combinedf.printSchema()
#root
# |-- course: struct (nullable = true)
# |    |-- course_id: string (nullable = true)
# |    |-- course_name: string (nullable = true)
# |    |-- fee: string (nullable = true)
# |-- learner: struct (nullable = true)
# |    |-- _1: struct (nullable = true)
# |    |    |-- name: string (nullable = true)
# |    |    |-- email: string (nullable = true)
# |    |    |-- city: string (nullable = true)
# |    |-- _2: struct (nullable = true)
# |    |    |-- name: string (nullable = true)
# |    |    |-- email: string (nullable = true)
# |    |    |-- city: string (nullable = true)
# |    |-- _3: struct (nullable = true)
# |    |    |-- name: string (nullable = true)
# |    |    |-- email: string (nullable = true)
# |    |    |-- city: string (nullable = true)

combinedf.select("course.*","learner.*").select("course_id","course_name","fee","_1.*","_2.*","_3.*").show()
#+---------+-----------+----+------+------------------+-------+------+-----------------+--------+------+-----------------+--------+
#|course_id|course_name| fee|  name|             email|   city|  name|            email|    city|  name|            email|    city|
#+---------+-----------+----+------+------------------+-------+------+-----------------+--------+------+-----------------+--------+
#|     1001|     hadoop|7000|  amit|   [email protected]| mumbai|rakesh|[email protected]|    pune| money| [email protected]| newyork|
#|     1002|      spark|8000| money|  [email protected]|newyork| simon| [email protected]|hongkong|rakesh|[email protected]|    pune|
#|     1003|  terraform|9000|venkat| [email protected]|chennai|  rama|   [email protected]|     aus| simon| [email protected]|hongkong|
#|     1004|     python|3000|   dal|[email protected]| london|  amit|  [email protected]|  mumbai|venkat|[email protected]| chennai|
#+---------+-----------+----+------+------------------+-------+------+-----------------+--------+------+-----------------+--------+

Upvotes: 1

Related Questions