Reputation: 11
I cannot resolve explode('learner')
due to data type mismatch: input to function explode should be array or map type, not struct<_1:structname:string,email:string,city:string,_2:structname:string,email:string,city:string,_3:structname:string,email:string,city:string>;;
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
course_structure = Row('course_id','course_name','fee')
course1=course_structure('1001','hadoop','7000')
course2=course_structure('1002','spark','8000')
course3=course_structure('1003','terraform','9000')
course4=course_structure('1004','python','3000')
learner = Row('name','email','city')
learner1=learner('amit','[email protected]','mumbai')
learner2=learner('rakesh','[email protected]','pune')
learner3=learner('money','[email protected]','newyork')
learner4=learner('simon', '[email protected]','hongkong')
learner5=learner('venkat','[email protected]','chennai')
learner6=learner('rama','[email protected]','aus')
learner7=learner('dal','[email protected]','london')
courselearner1 = Row(course=course1,learner=(learner1,learner2,learner3))
courselearner2 = Row(course=course2,learner=(learner3,learner4,learner2))
courselearner3 = Row(course=course3,learner=(learner5,learner6,learner4))
courselearner4 = Row(course=course4,learner=(learner7,learner1,learner5))
seq1=courselearner1,courselearner2
seq2=courselearner3,courselearner4
df1 = spark.createDataFrame(seq1)
df2 = spark.createDataFrame(seq2)
combinedf = df1.union(df2)
combinedf.printSchema()
combinedf.show(truncate=False)
combinedf.select("course.*",explode("learner"))
Upvotes: 1
Views: 4002
Reputation: 11
Here is the answer that i have found
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
course_structure = Row('course_id','course_name','fee')
course1=course_structure('1001','hadoop','7000')
course2=course_structure('1002','spark','8000')
course3=course_structure('1003','terraform','9000')
course4=course_structure('1004','python','3000')
learner = Row('name','email','city')
learner1=learner('amit','[email protected]','mumbai')
learner2=learner('rakesh','[email protected]','pune')
learner3=learner('money','[email protected]','newyork')
learner4=learner('simon', '[email protected]','hongkong')
learner5=learner('venkat','[email protected]','chennai')
learner6=learner('ram','[email protected]','aus')
learner7=learner('daljeet','[email protected]','london')
courselearner1 = Row(course=list(course1),learner=list((list(learner1),list(learner2),list(learner3))))
courselearner2 = Row(course=list(course2),learner=list((list(learner3),list(learner4),list(learner2))))
courselearner3 = Row(course=list(course3),learner=list((list(learner5),list(learner6),list(learner4))))
courselearner4 = Row(course=list(course4),learner=list((list(learner7),list(learner1),list(learner5))))
seq1=courselearner1,courselearner2
seq2=courselearner3,courselearner4
df1 = spark.createDataFrame(seq1)
df2 = spark.createDataFrame(seq2)
combinedf = df1.union(df2)
df = combinedf.withColumn("learner",explode("learner"))
df.show(truncate=False)
course_id = df["course"].getItem(0)
course_name = df["course"].getItem(1)
course_fees = df["course"].getItem(2)
learner_name = df["learner"].getItem(0)
learner_email =df["learner"].getItem(1)
learner_city = df["learner"].getItem(2)
learnerdf = df.select(course_id,course_name,course_fees,learner_name,learner_email,learner_city) \
.select(col("course[1]").alias("course_name"), \
col("learner[0]").alias("learner_name"), \
col("learner[1]").alias("email"), \
col("learner[2]").alias("city")).groupBy("email").count().show()
Upvotes: 0
Reputation: 31490
explode only works with array or map
types but you are having all struct
type.
You can directly access struct by struct_field_name.*
Example:
combinedf.printSchema()
#root
# |-- course: struct (nullable = true)
# | |-- course_id: string (nullable = true)
# | |-- course_name: string (nullable = true)
# | |-- fee: string (nullable = true)
# |-- learner: struct (nullable = true)
# | |-- _1: struct (nullable = true)
# | | |-- name: string (nullable = true)
# | | |-- email: string (nullable = true)
# | | |-- city: string (nullable = true)
# | |-- _2: struct (nullable = true)
# | | |-- name: string (nullable = true)
# | | |-- email: string (nullable = true)
# | | |-- city: string (nullable = true)
# | |-- _3: struct (nullable = true)
# | | |-- name: string (nullable = true)
# | | |-- email: string (nullable = true)
# | | |-- city: string (nullable = true)
combinedf.select("course.*","learner.*").select("course_id","course_name","fee","_1.*","_2.*","_3.*").show()
#+---------+-----------+----+------+------------------+-------+------+-----------------+--------+------+-----------------+--------+
#|course_id|course_name| fee| name| email| city| name| email| city| name| email| city|
#+---------+-----------+----+------+------------------+-------+------+-----------------+--------+------+-----------------+--------+
#| 1001| hadoop|7000| amit| [email protected]| mumbai|rakesh|[email protected]| pune| money| [email protected]| newyork|
#| 1002| spark|8000| money| [email protected]|newyork| simon| [email protected]|hongkong|rakesh|[email protected]| pune|
#| 1003| terraform|9000|venkat| [email protected]|chennai| rama| [email protected]| aus| simon| [email protected]|hongkong|
#| 1004| python|3000| dal|[email protected]| london| amit| [email protected]| mumbai|venkat|[email protected]| chennai|
#+---------+-----------+----+------+------------------+-------+------+-----------------+--------+------+-----------------+--------+
Upvotes: 1