Reputation: 397
I need to modify a complex dataframe schema adding columns based on a dynamic list of column names.
If a column in the schema is included in the list, that column needs to be "duplicated" in that same position in the schema with a suffix "_duplicated" in the name and with a string Type.
The solution needs to be dynamic as the schemas are very variable.
For instance, taking the following code:
data = [('{"hello": "world", "thisisastruct": {"thisisanarray": [{"thisisanotherstruct": {"ID": 1, "duplicateme": "please", "metoo": ["hello","world"]}}]}}') ]
columns_to_duplicate = ['hello','duplicateme', 'metoo']
df = spark.read.json(sc.parallelize(data))
df.printSchema()
The schema is as follows:
root
|-- hello: string (nullable = true)
|-- thisisastruct: struct (nullable = true)
| |-- thisisanarray: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- thisisanotherstruct: struct (nullable = true)
| | | | |-- ID: long (nullable = true)
| | | | |-- duplicateme: string (nullable = true)
| | | | |-- metoo: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
I would like the resulting schema to be as follows:
root
|-- hello: string (nullable = true)
|-- hello_duplicated: string (nullable = true)
|-- thisisastruct: struct (nullable = true)
| |-- thisisanarray: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- thisisanotherstruct: struct (nullable = true)
| | | | |-- ID: long (nullable = true)
| | | | |-- duplicateme: string (nullable = true)
| | | | |-- duplicateme_duplicated: string (nullable = true)
| | | | |-- metoo: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- metoo_duplicated: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
EDIT: This is what I have as of now, but Arrays are not working:
def transform_schema(schema, columns_to_dupe):
if schema == None:
return StructType()
updated = []
for f in schema.fields:
if isinstance(f.dataType, ArrayType):
updated.append(StructField(f.name, f.dataType, f.nullable))
if f.name in columns_to_dupe:
# if ArrayType unpack the array type
if not isinstance(f.dataType.elementType, StructType) and not isinstance(f.dataType.elementType, ArrayType):
updated.append(StructField(f.name+'_duplicated', ArrayType(StringType()), f.nullable))
elif isinstance(f.dataType.elementType, StructType):
updated.append(StructField(f.name, ArrayType(transform_schema(f.dataType.elementType, columns_to_dupe))))
elif isinstance(f.dataType, StructType):
# if StructType do recursion
updated.append(StructField(f.name, transform_schema(f.dataType,columns_to_dupe)))
if f.name in columns_to_dupe:
updated.append(StructField(f.name, transform_schema(f.dataType,f.dataType.fieldNames())))
else:
# else handle all the other cases i.e TimestampType, StringType etc
updated.append(StructField(f.name, f.dataType, f.nullable))
if f.name in columns_to_dupe:
updated.append(StructField(f.name+'_duplicated', StringType(), f.nullable))
return StructType(updated)
Thanks in advance
Upvotes: 1
Views: 1660
Reputation: 397
In the end I was able to do it using the JSON schema:
schema_json = df.schema.json()
schema_dict = json.loads(schema_json)
def walk_schema(schema, columns_to_duplicate):
for f in schema['fields']:
try:
if type(f['type']) != dict:
if f['name'] in columns_to_duplicate:
schema['fields'].append({"metadata":{},"name":f['name']+'_duplicated',"nullable":f['nullable'],"type":"string"})
else:
if 'fields' in f['type']:
if f['name'] in columns_to_duplicate:
walk_schema(f['type'], [n['name'] for n in f['type']['fields']])
else:
walk_schema(f['type'], columns_to_duplicate)
elif 'elementType' in f['type']:
if type (f['type']['elementType']) != dict:
if f['name'] in columns_to_duplicate:
schema['fields'].append({"metadata":{},"name":f['name']+'_duplicated',"nullable":f['nullable'],"type":{"containsNull":True,"elementType":"string","type":"array"}})
else:
if f['name'] in columns_to_duplicate:
walk_schema(f['type']['elementType'], [n['name'] for n in f['type']['elementType']['fields']])
else:
walk_schema(f['type']['elementType'], columns_to_duplicate)
except:
print('error')
walk_schema(schema_dict, columns_to_duplicate)
new_schema = StructType.fromJson(json.loads(json.dumps(schema_dict)))
Upvotes: 1