Reputation: 105
The original code that result in a schema below is:
df_array = (df_csv.groupBy(df_csv.depth,df_csv.height,df_csv.weight,df_csv.width,df_csv.seller_id,df_csv.sku,df_csv.navigation_id,df_csv.category,
df_csv.subcategory,df_csv.max_dimension_p,df_csv.max_side_p)
.agg(sf.struct(sf.collect_list('id_distribution_center').alias('id_distribution_center'),sf.collect_list('id_modality').alias('id_modality'),
sf.collect_list('zipcode_initial').alias('zipcode_initial'),sf.collect_list('zipcode_final').alias('zipcode_final'),sf.collect_list('cost').alias('cost'),sf.collect_list('city').alias('city'),sf.collect_list('state').alias('state')).alias("infos_gerais_product")))
I have the follow schema from a df pyspark:
root
|-- depth: double (nullable = true)
|-- height: double (nullable = true)
|-- weight: double (nullable = true)
|-- width: double (nullable = true)
|-- seller_id: string (nullable = true)
|-- sku: long (nullable = true)
|-- navigation_id: long (nullable = true)
|-- category: string (nullable = true)
|-- subcategory: string (nullable = true)
|-- max_dimension_p: double (nullable = true)
|-- max_side_p: double (nullable = true)
|-- infos_gerais_product: struct (nullable = false)
| |-- id_distribution_center: array (nullable = false)
| | |-- element: long (containsNull = false)
| |-- id_modality: array (nullable = false)
| | |-- element: long (containsNull = false)
| |-- zipcode_initial: array (nullable = false)
| | |-- element: long (containsNull = false)
| |-- zipcode_final: array (nullable = false)
| | |-- element: long (containsNull = false)
| |-- cost: array (nullable = false)
| | |-- element: double (containsNull = false)
| |-- city: array (nullable = false)
| | |-- element: string (containsNull = false)
| |-- state: array (nullable = false)
| | |-- element: string (containsNull = false)
I need to transform the schema above to: See that in infos_gerais_product I need leave a struct, but without the element (array)
|-- depth: double (nullable = true)
|-- height: double (nullable = true)
|-- weight: double (nullable = true)
|-- width: double (nullable = true)
|-- seller_id: string (nullable = true)
|-- sku: long (nullable = true)
|-- navigation_id: long (nullable = true)
|-- category: string (nullable = true)
|-- subcategory: string (nullable = true)
|-- max_dimension_p: double (nullable = true)
|-- max_side_p: double (nullable = true)
|-- infos_gerais_product: struct (nullable = false)
| |-- id_distribution_center: string (nullable = false)
| |-- id_modality: string (nullable = false)
| |-- zipcode_initial: string (nullable = false)
| |-- zipcode_final: string (nullable = false)
| |-- cost: string (nullable = false)
| |-- city: string (nullable = false)
| |-- state: string (nullable = false)
I tried the code:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType, DoubleType
schema = StructType([
StructField('infos_gerais_product', StructType([
StructField('id_distribution_center', StringType(), True),
StructField('id_modality', StringType(), True),
StructField('zipcode_initial', StringType(), True),
StructField('zipcode_final', StringType(), True),
StructField('cost', StringType(), True),
StructField('city', StringType(), True),
StructField('state', StringType(), True)
])),
StructField('depth', DoubleType(), True),
StructField('height', DoubleType(), True),
StructField('weight', DoubleType(), True),
StructField('width', DoubleType(), True),
StructField('seller_id', StringType(), True),
StructField('sku', LongType(), True),
StructField('navigation_id', LongType(), True),
StructField('category', StringType(), True),
StructField('subcategory', StringType(), True),
StructField('max_dimension_p', DoubleType(), True),
StructField('max_side_p', DoubleType(), True)
])
df2 = spark.createDataFrame(df_array, schema = schema)
df2.printSchema()
But, return a error: "TypeError: data is already a DataFrame", because the df_array already is a pyspark dataframe. Can anyone help me?
Upvotes: 0
Views: 105
Reputation: 4098
Assuming your daframe is like this:
schema = StructType([
StructField("infos_gerais_product", StructType([
StructField("id_distribution_center", ArrayType(LongType())),
StructField("id_modality", ArrayType(LongType())),
]))
])
df = spark.createDataFrame(data=[(([1],[101]),), (([4],[401]),)], schema=schema)
df.printSchema()
root
|-- infos_gerais_product: struct (nullable = true)
| |-- id_distribution_center: array (nullable = true)
| | |-- element: long (containsNull = true)
| |-- id_modality: array (nullable = true)
| | |-- element: long (containsNull = true)
You can achieve the desired transformation using:
df = df.withColumn("infos_gerais_product", F.struct( \
F.element_at(F.col("infos_gerais_product").getField("id_distribution_center"), 1).cast("string").alias("id_distribution_center"), \
F.element_at(F.col("infos_gerais_product").getField("id_modality"), 1).cast("string").alias("id_modality")))
df.printSchema()
root
|-- infos_gerais_product: struct (nullable = false)
| |-- id_distribution_center: string (nullable = true)
| |-- id_modality: string (nullable = true)
Logic:
Upvotes: 1