Reputation: 261
I have 1000s of files with data in the below format:
a|b|c|clm4=1|clm5=3
a|b|c|clm4=9|clm6=60|clm7=23
And I want to read it and convert it to a dataframe as below:
clm1|clm2|clm3|clm4|clm5|clm6|clm7
a|b|c|1|3|null|null
a|b|c|9|null|60|23
I have tried the below method:
files = [f for f in glob.glob(pathToFile + "/**/*.txt.gz", recursive=True)]
df = spark.read.load(files, format='csv', sep = '|', header=None)
But it is giving me below result:
clm1, clm2, clm3, clm4, clm5
a, b, c, 1, 3
a, b, c, 9, null
Upvotes: 2
Views: 777
Reputation: 32640
For Spark 2.4+, you can read the files as a single column then split it by |
. You'll get an array column that you could transform using higher-order functions:
df.show(truncate=False)
+----------------------------+
|clm |
+----------------------------+
|a|b|c|clm4=1|clm5=3 |
|a|b|c|clm4=9|clm6=60|clm7=23|
+----------------------------+
We use transform
function to convert the array of string that we get from splitting the clm
column into an array of structs.
Each struct contains column name if present (check if a string contains =
) or name it clm + (i+1)
where i
is its position.
transform_expr = """
transform(split(clm, '[|]'), (x, i) ->
struct(
IF(x like '%=%', substring_index(x, '=', 1), concat('clm', i+1)),
substring_index(x, '=', -1)
)
)
"""
Now use map_from_entries
to convert the array to map. And finally, explode the map and pivot to get your columns
df.select("clm",
explode(map_from_entries(expr(transform_expr))).alias("col_name", "col_value")
) \
.groupby("clm").pivot('col_name').agg(first('col_value')) \
.drop("clm") \
.show(truncate=False)
Gives:
+----+----+----+----+----+----+----+
|clm1|clm2|clm3|clm4|clm5|clm6|clm7|
+----+----+----+----+----+----+----+
|a |b |c |9 |null|60 |23 |
|a |b |c |1 |3 |null|null|
+----+----+----+----+----+----+----+
Upvotes: 1