Vitaliy
Vitaliy

Reputation: 8206

Ignore missing values when writing to parquet in pyspark

I need to convert the internal structure of a parquet file.

Currently I have a field that stores an arrays or arrays. I intend to turn it to an array of structs.

So from this:

root
  -array
     -array

I want to get this:

root
  -array
     -struct

I am performing the conversion in the following way:

I define a schema for the new struct:

newtype = ArrayType(StructType(
        [
            StructField("one", FloatType()),
            StructField("two", FloatType()),
            StructField("three", FloatType()),
            StructField("four", FloatType()),
            StructField("five", FloatType())
        ]))

I apply an 'empty' udf to the column. The interesting part is that I specify the schema for the udf.

def convert(arr):
   return arr

df = spark.read.parquet("....")
spark.udf.register(name="convert", f=convert, returnType=newtype)
df = df.withColumn("col", expr("convert(col)"))

finally I write it back to parquet.

The problem I get is this:

Input row doesn't have expected number of values required by the schema. 5 fields are required while 3 values are provided.

This is true indeed. Some of the arrays once had 3 value. At a later time more values were added so newer arrays have 5 values.

Why does this happen? I defined the fields to be nullable so I would have expected this to work. What are my options?

Upvotes: 1

Views: 1604

Answers (1)

Ryan Widmaier
Ryan Widmaier

Reputation: 8523

When you convert your data to the new struct schema you have to provide a value for every field. Spark doesn't want to make assumptions about what values to put in if you don't provide them. Just provide None for for the remaining values if the length is wrong. Also your convert function doesn't look like it is handling the nested array. Here is a working example with convert updated to pad with None's.

from pyspark.sql.types import *
from pyspark.sql.functions import * 

old_type = StructType([
    StructField("col", ArrayType(ArrayType(FloatType())))
])

new_type = ArrayType(StructType([
    StructField("one", FloatType()),
    StructField("two", FloatType()),
    StructField("three", FloatType()),
    StructField("four", FloatType()),
    StructField("five", FloatType())
]))

data = [
    ([[1., 2., 3.], [1., 2., 3., 4., 5.]],)
]


rdd = spark.sparkContext.parallelize(data)
df = sqlContext.createDataFrame(rdd, old_type)


def convert(arr):
    vals = []
    for v in arr:
        padding = [None] * (5 - len(v))
        vals.append(v + padding)
    return vals

spark.udf.register(name="convert", f=convert, returnType=new_type)
df = df.withColumn("col", expr("convert(col)"))
df.show(10, False)

Upvotes: 3

Related Questions