Vivian
Vivian

Reputation: 105

Change the schema of a df pyspark

The original code that result in a schema below is:

df_array = (df_csv.groupBy(df_csv.depth,df_csv.height,df_csv.weight,df_csv.width,df_csv.seller_id,df_csv.sku,df_csv.navigation_id,df_csv.category, 
                         df_csv.subcategory,df_csv.max_dimension_p,df_csv.max_side_p)
                         .agg(sf.struct(sf.collect_list('id_distribution_center').alias('id_distribution_center'),sf.collect_list('id_modality').alias('id_modality'),
                                    sf.collect_list('zipcode_initial').alias('zipcode_initial'),sf.collect_list('zipcode_final').alias('zipcode_final'),sf.collect_list('cost').alias('cost'),sf.collect_list('city').alias('city'),sf.collect_list('state').alias('state')).alias("infos_gerais_product")))

I have the follow schema from a df pyspark:

root
 |-- depth: double (nullable = true)
 |-- height: double (nullable = true)
 |-- weight: double (nullable = true)
 |-- width: double (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- sku: long (nullable = true)
 |-- navigation_id: long (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- max_dimension_p: double (nullable = true)
 |-- max_side_p: double (nullable = true)
 |-- infos_gerais_product: struct (nullable = false)
 |    |-- id_distribution_center: array (nullable = false)
 |    |    |-- element: long (containsNull = false)
 |    |-- id_modality: array (nullable = false)
 |    |    |-- element: long (containsNull = false)
 |    |-- zipcode_initial: array (nullable = false)
 |    |    |-- element: long (containsNull = false)
 |    |-- zipcode_final: array (nullable = false)
 |    |    |-- element: long (containsNull = false)
 |    |-- cost: array (nullable = false)
 |    |    |-- element: double (containsNull = false)
 |    |-- city: array (nullable = false)
 |    |    |-- element: string (containsNull = false)
 |    |-- state: array (nullable = false)
 |    |    |-- element: string (containsNull = false)

I need to transform the schema above to: See that in infos_gerais_product I need leave a struct, but without the element (array)

 |-- depth: double (nullable = true)
 |-- height: double (nullable = true)
 |-- weight: double (nullable = true)
 |-- width: double (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- sku: long (nullable = true)
 |-- navigation_id: long (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- max_dimension_p: double (nullable = true)
 |-- max_side_p: double (nullable = true)
 |-- infos_gerais_product: struct (nullable = false)
 |    |-- id_distribution_center: string (nullable = false)
 |    |-- id_modality: string (nullable = false)
 |    |-- zipcode_initial: string (nullable = false)
 |    |-- zipcode_final: string (nullable = false)
 |    |-- cost: string (nullable = false)
 |    |-- city: string (nullable = false)
 |    |-- state: string (nullable = false)

I tried the code:

from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType, DoubleType        
schema = StructType([
    StructField('infos_gerais_product', StructType([
         StructField('id_distribution_center', StringType(), True),
         StructField('id_modality', StringType(), True),
         StructField('zipcode_initial', StringType(), True),
         StructField('zipcode_final', StringType(), True),
         StructField('cost', StringType(), True),
         StructField('city', StringType(), True),
         StructField('state', StringType(), True)
         ])),
     StructField('depth', DoubleType(), True),
     StructField('height', DoubleType(), True),
     StructField('weight', DoubleType(), True),
     StructField('width', DoubleType(), True),
     StructField('seller_id', StringType(), True),
     StructField('sku', LongType(), True),
     StructField('navigation_id', LongType(), True),
     StructField('category', StringType(), True),
     StructField('subcategory', StringType(), True),
     StructField('max_dimension_p', DoubleType(), True),
     StructField('max_side_p', DoubleType(), True)
     ])
df2 = spark.createDataFrame(df_array, schema = schema)
df2.printSchema()

But, return a error: "TypeError: data is already a DataFrame", because the df_array already is a pyspark dataframe. Can anyone help me?

Upvotes: 0

Views: 105

Answers (1)

Azhar Khan
Azhar Khan

Reputation: 4098

Assuming your daframe is like this:

schema = StructType([
    StructField("infos_gerais_product", StructType([
        StructField("id_distribution_center", ArrayType(LongType())),
        StructField("id_modality", ArrayType(LongType())),
    ]))
])

df = spark.createDataFrame(data=[(([1],[101]),), (([4],[401]),)], schema=schema)
df.printSchema()

root
 |-- infos_gerais_product: struct (nullable = true)
 |    |-- id_distribution_center: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- id_modality: array (nullable = true)
 |    |    |-- element: long (containsNull = true)

You can achieve the desired transformation using:

df = df.withColumn("infos_gerais_product", F.struct( \
                                                    F.element_at(F.col("infos_gerais_product").getField("id_distribution_center"), 1).cast("string").alias("id_distribution_center"), \
                                                    F.element_at(F.col("infos_gerais_product").getField("id_modality"), 1).cast("string").alias("id_modality")))
df.printSchema()

root
 |-- infos_gerais_product: struct (nullable = false)
 |    |-- id_distribution_center: string (nullable = true)
 |    |-- id_modality: string (nullable = true)

Logic:

  • Access fields of "infos_gerais_product".
  • Get first element (you have not mentioned this; but it is my assumption that there is only one element in each array).
  • Create struct from these fields.

Upvotes: 1

Related Questions