Daniel Martinez
Daniel Martinez

Reputation: 397

Pyspark - Dynamically adding fields to schema

I need to modify a complex dataframe schema adding columns based on a dynamic list of column names.

If a column in the schema is included in the list, that column needs to be "duplicated" in that same position in the schema with a suffix "_duplicated" in the name and with a string Type.

The solution needs to be dynamic as the schemas are very variable.

For instance, taking the following code:

data = [('{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "duplicateme": "please", "metoo": ["hello","world"]}}]}}') ]

columns_to_duplicate = ['hello','duplicateme', 'metoo']

df = spark.read.json(sc.parallelize(data))

df.printSchema()

The schema is as follows:

root
 |-- hello: string (nullable = true)
 |-- thisisastruct: struct (nullable = true)
 |    |-- thisisanarray: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- thisisanotherstruct: struct (nullable = true)
 |    |    |    |    |-- ID: long (nullable = true)
 |    |    |    |    |-- duplicateme: string (nullable = true)
 |    |    |    |    |-- metoo: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)

I would like the resulting schema to be as follows:

root
 |-- hello: string (nullable = true)
 |-- hello_duplicated: string (nullable = true)
 |-- thisisastruct: struct (nullable = true)
 |    |-- thisisanarray: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- thisisanotherstruct: struct (nullable = true)
 |    |    |    |    |-- ID: long (nullable = true)
 |    |    |    |    |-- duplicateme: string (nullable = true)
 |    |    |    |    |-- duplicateme_duplicated: string (nullable = true)
 |    |    |    |    |-- metoo: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- metoo_duplicated: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)

EDIT: This is what I have as of now, but Arrays are not working:

def transform_schema(schema, columns_to_dupe):
    if schema == None:
        return StructType()
    
    updated = []

    for f in schema.fields:
        if isinstance(f.dataType, ArrayType):
            updated.append(StructField(f.name, f.dataType, f.nullable))
            if f.name in columns_to_dupe:
                # if ArrayType unpack the array type
                if not isinstance(f.dataType.elementType, StructType) and not isinstance(f.dataType.elementType, ArrayType):
                    updated.append(StructField(f.name+'_duplicated', ArrayType(StringType()), f.nullable))
                elif isinstance(f.dataType.elementType, StructType):
                    updated.append(StructField(f.name, ArrayType(transform_schema(f.dataType.elementType, columns_to_dupe))))
        elif isinstance(f.dataType, StructType):
            # if StructType do recursion
            updated.append(StructField(f.name, transform_schema(f.dataType,columns_to_dupe)))
            if f.name in columns_to_dupe:
                updated.append(StructField(f.name, transform_schema(f.dataType,f.dataType.fieldNames())))
        else:
            # else handle all the other cases i.e TimestampType, StringType etc
            updated.append(StructField(f.name, f.dataType, f.nullable))
            if f.name in columns_to_dupe:
                updated.append(StructField(f.name+'_duplicated', StringType(), f.nullable))
                
    return StructType(updated)

Thanks in advance

Upvotes: 1

Views: 1660

Answers (1)

Daniel Martinez
Daniel Martinez

Reputation: 397

In the end I was able to do it using the JSON schema:

schema_json = df.schema.json()

schema_dict = json.loads(schema_json)

def walk_schema(schema, columns_to_duplicate):
    for f in schema['fields']:
        try:
            if type(f['type']) != dict:
                if f['name'] in columns_to_duplicate:
                    schema['fields'].append({"metadata":{},"name":f['name']+'_duplicated',"nullable":f['nullable'],"type":"string"})
            else:
                if 'fields' in f['type']:
                    if f['name'] in columns_to_duplicate:
                        walk_schema(f['type'], [n['name'] for n in f['type']['fields']])
                    else:
                        walk_schema(f['type'], columns_to_duplicate)
                elif 'elementType' in f['type']:
                    if type (f['type']['elementType']) != dict:
                        if f['name'] in columns_to_duplicate:
                            schema['fields'].append({"metadata":{},"name":f['name']+'_duplicated',"nullable":f['nullable'],"type":{"containsNull":True,"elementType":"string","type":"array"}})
                    else:
                        if f['name'] in columns_to_duplicate:
                            walk_schema(f['type']['elementType'], [n['name'] for n in f['type']['elementType']['fields']])
                        else:
                            walk_schema(f['type']['elementType'], columns_to_duplicate)
        except:
            print('error')

walk_schema(schema_dict, columns_to_duplicate)

new_schema = StructType.fromJson(json.loads(json.dumps(schema_dict)))

Upvotes: 1

Related Questions