Reputation: 59
I wanted to read multiple CSV files with different number of columns using PySpark.
Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']
f1 file has 50 columns, f2 has 10 more columns that constitutes total 60 columns and f3 has 30 more columns that is total 80 columns for f3 file and so on.
However,
df = spark.read.csv(Files,header=True)
gives only 50 columns. I am expecting 80 columns. Since f1 file has only 50 columns, so remaining 30 columns will be filled NAN values for the f1 file data. Same is true for other CSV files. Pandas dataframe gives me the all 80 columns perfectly:
import pandas as pd
import glob
df = pd.concat(map(pd.read_csv, ['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']))
But I can't do the same thing with PySpark. How can I read all columns of the above 5 CSV files into single spark dataframe?
Upvotes: 5
Views: 3110
Reputation: 14958
In more recent versions of Spark - currently 3.4.1, adding the option to mergeSchema effectively allows the later, wider dataframes to be fully integrated with the skinnier prior ones.
Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']
df = (
spark.read
.option("header", True)
.option("inferSchema", True)
.option("mergeSchema", True)
.csv(Files)
)
Upvotes: 0
Reputation: 59
It was a very easy fix. What I did,
Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']
Files.reverse()
df = spark.read.csv(Files,inferSchema=True, header=True)
Last files had all columns because columns were added incrementally. Reversing them solved the issues.
Upvotes: 1
Reputation: 747
You can read each file into its own Spark dataframe, to combine all dataframes into one dataframe, use union.
Fill the the missing columns in the dataframes with fewer columns.
Merge them using union or reduce.
from functools import reduce
from pyspark.sql.functions import lit, col
df_list = [spark.read.csv("f{}.csv".format(i), header=True) for i in range(1, 6)]
cols = [len(df.columns) for df in df_list]
max_cols = max(cols)
df_list = [df.select(*[col(c) for c in df.columns] + [lit(None).alias("col_{}".format(i+j)) for i in range(len(df.columns), max_cols)]) for j, df in enumerate(df_list)]
df_final = reduce(lambda x, y: x.union(y), df_list)
I reproduced your case on this github.
Upvotes: 2