Clock Slave
Clock Slave

Reputation: 7967

Ambiguous behavior while adding new column to StructType

I defined a function in PySpark which is-

def add_ids(X):
    schema_new = X.schema.add("id_col", LongType(), False)
    _X = X.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)
    cols_arranged = [_X.columns[-1]] + _X.columns[0:len(_X.columns) - 1]
    return _X.select(*cols_arranged)

In the function above, I'm creating a new column(with the name of id_col) that gets appended to the dataframe which is basically just the index number of each row and it finally moves the id_col to the leftmost side.

The data I'm using

>>> X.show(4)
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 4 rows

Output of the function

>>> add_ids(X).show(4)
+------+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|id_col|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+------+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|     0|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|     1|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|     2|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|     3|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
+------+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 4 rows

All of this works fine but the issue is when I run the following two commands

>>> X.show(4)
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 4 rows

>>> X.columns
['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age', 'Outcome', 'id_col']

If you look at the result of X.columns, you'll notice id_col at the end. But when I ran the X.show(4) a line earlier, it doesn't show id_col as a column.

Now when I try running add_ids(X).show(4), I get the following error

pyspark.sql.utils.AnalysisException: "Reference 'id_col' is ambiguous, could be: id_col, id_col.;"

What is it that I am doing wrong?

Upvotes: 0

Views: 890

Answers (1)

zero323
zero323

Reputation: 330093

The mistake is here:

schema_new = X.schema.add("id_col", LongType(), False)

If you check the source you'll see that the add method modifies data in place.

It is easier to see on a simplified example:

from pyspark.sql.types import *

schema = StructType()
schema.add(StructField("foo", IntegerType()))

schema
StructType(List(StructField(foo,IntegerType,true)))

As you see the schema object has been modified.

Instead of using add method you should rebuild the schema:

schema_new = StructType(schema.fields + [StructField("id_col", LongType(), False)])

Alternatively you can create a deep copy of the object:

import copy

old_schema = StructType()
new_schehma = copy.deepcopy(old_schema).add(StructField("foo", IntegerType()))

old_schema
StructType(List())
new_schehma
StructType(List(StructField(foo,IntegerType,true)))

Upvotes: 2

Related Questions