ForestGump
ForestGump

Reputation: 59

Read multiple CSV files with different number of columns for each CSV file

I wanted to read multiple CSV files with different number of columns using PySpark.

Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']

f1 file has 50 columns, f2 has 10 more columns that constitutes total 60 columns and f3 has 30 more columns that is total 80 columns for f3 file and so on.

However,

df = spark.read.csv(Files,header=True)

gives only 50 columns. I am expecting 80 columns. Since f1 file has only 50 columns, so remaining 30 columns will be filled NAN values for the f1 file data. Same is true for other CSV files. Pandas dataframe gives me the all 80 columns perfectly:

import pandas as pd
import glob
df = pd.concat(map(pd.read_csv, ['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']))

But I can't do the same thing with PySpark. How can I read all columns of the above 5 CSV files into single spark dataframe?

Upvotes: 5

Views: 3110

Answers (3)

leerssej
leerssej

Reputation: 14958

In more recent versions of Spark - currently 3.4.1, adding the option to mergeSchema effectively allows the later, wider dataframes to be fully integrated with the skinnier prior ones.

Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']

df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .option("mergeSchema", True)
    .csv(Files)
)

Upvotes: 0

ForestGump
ForestGump

Reputation: 59

It was a very easy fix. What I did,

Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']
Files.reverse()
df = spark.read.csv(Files,inferSchema=True, header=True)

Last files had all columns because columns were added incrementally. Reversing them solved the issues.

Upvotes: 1

Daniel Sobrado
Daniel Sobrado

Reputation: 747

You can read each file into its own Spark dataframe, to combine all dataframes into one dataframe, use union.

Fill the the missing columns in the dataframes with fewer columns.

Merge them using union or reduce.

from functools import reduce
from pyspark.sql.functions import lit, col

df_list = [spark.read.csv("f{}.csv".format(i), header=True) for i in range(1, 6)]

cols = [len(df.columns) for df in df_list]
max_cols = max(cols)

df_list = [df.select(*[col(c) for c in df.columns] + [lit(None).alias("col_{}".format(i+j)) for i in range(len(df.columns), max_cols)]) for j, df in enumerate(df_list)]

df_final = reduce(lambda x, y: x.union(y), df_list)

I reproduced your case on this github.

Upvotes: 2

Related Questions