Reputation: 33
I am trying to create a pyspark dataframe manually using the below nested schema -
schema = StructType([
StructField('fields', ArrayType(StructType([
StructField('source', StringType()),
StructField('sourceids', ArrayType(IntegerType()))]))),
StructField('first_name',StringType()),
StructField('last_name',StringType()),
StructField('kare_id',StringType()),
StructField('match_key',ArrayType(StringType()))
])
I am using the below code to create a dataframe using this schema -
row = [Row(fields=[Row(
source='BCONNECTED',
sourceids=[10,202,30]),
Row(
source='KP',
sourceids=[20,30,40])],first_name='Christopher', last_name='Nolan', kare_id='kare1', match_key=['abc','abcd']),
Row(fields=[
Row(
source='BCONNECTED',
sourceids=[20,304,5,6]),
Row(
source='KP',
sourceids=[40,50,60])],first_name='Michael', last_name='Caine', kare_id='kare2', match_key=['ncnc','cncnc'])]
content = spark.createDataFrame(sc.parallelize(row), schema=schema)
content.printSchema()
Schema is getting printed correctly, but when I am doing content.show() I can see the values of kare_id and last_name column has swapped.
+--------------------+-----------+---------+-------+-------------+
| fields| first_name|last_name|kare_id| match_key|
+--------------------+-----------+---------+-------+-------------+
|[[BCONNECTED, [10...|Christopher| kare1| Nolan| [abc, abcd]|
|[[BCONNECTED, [20...| Michael| kare2| Caine|[ncnc, cncnc]|
+--------------------+-----------+---------+-------+-------------+
Upvotes: 0
Views: 781
Reputation: 294
PySpark sorts the Row
object on column names using lexicographic ordering. Thus, the ordering of the columns in your data will be fields, first_name, kare_id, last_name, match_key
.
Spark then associates each one of the column names with the data resulting in the mismatch. The fix is to swap the schema entry for last_name
and kare_id
as shown below:
schema = StructType([
StructField('fields', ArrayType(StructType([
StructField('source', StringType()),
StructField('sourceids', ArrayType(IntegerType()))]))),
StructField('first_name', StringType()),
StructField('kare_id', StringType()),
StructField('last_name', StringType()),
StructField('match_key', ArrayType(StringType()))
])
From PySpark Docs on Row: "Row can be used to create a row object by using named arguments, the fields will be sorted by names."
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Row
Upvotes: 1
Reputation: 1006
First you are actually defining schema twice once when you are creating data at that time you are already using row object in RDD thus you do not need to use createDataFrame function instead you can do following:
sc.parallelize(row).toDF().show()
But if still you want to mention schema explicitly then you need to keep schema and data in same order and your mentioned Schema is incorrect as per the data you are passing. The correct schema would be:
schema = StructType([
StructField('fields', ArrayType(StructType([StructField('source', StringType()),StructField('sourceids', ArrayType(IntegerType()))]))),
StructField('first_name',StringType()),
StructField('kare_id',StringType()),
StructField('last_name',StringType()),
StructField('match_key',ArrayType(StringType()))
])
kare_id should come before last_name because this is the order in which you are passing data
Upvotes: 0