Reputation: 566
I have a parquet directory with around 1000 files and the schemas are different. I wanted to merge all those files in to an optimal number of files with file repartition. I using pandas with pyarrow to read each partition file from the directory and doing concatenation of all the data frames and writing it as one file.
With this approach, when data size grows, I'm getting a memory issue and getting killed. So I have chosen another method to do this process.
I read the bunch of files first, merge using concat and write to new parquet directory. Similarly second time, I read the second bunch of files, concatenate as a single data frame and took a record from a second merged dataframe. Now I have one record from 2nd merged dataframe and again I read the first merged dataframe from file and merge it with record from second merged dataframe. then I'm using dask to_parquet, append functionality to add the new file to that parquet folder.
Is it a valid parquet file? while we read data from this parquet, will I get all the columns like parquet schema evolution ? will it be similar to spark merge schema?
Update:
sample.parquet - contains 1000 part files
def read_files_from_path(inputPath):
return {"inputPath": ["part-001","part-002",...,"part-100"]}
def mergeParquet(list_of_files,output_path)
dfs_list = []
for i in range:
df = pd.read_parquet(i, engine='pyarrow')
dfs_list.append(df)
df = pd.concat(dfs_list,axis=0,sort=True)
df_sample_record_df = df[2:3]
if os.path.exists(output_path + '/_metadata'):
files_in_output_path = getFiles(output_path)
for f in files_in_output_path:
temp_df = pd.read_parquet(f, engine='pyarrow')
temp_combine_df = pd.concat(temp_df,df_sample_record_df)
temp_combine_df.repartition(partition_size="128MB") \
.to_parquet(output_path+"/tmp",engine='pyarrow',
ignore_divisions=True,append=True)
os.remove(output_path+"/"+each_file)
return df
def final_write_parquet(df,output_path):
if os.path.exists(output_path+"/tmp"):
df.repartition(partition_size="128MB")\
.to_parquet(output_path+str(self.temp_dir),engine='pyarrow',
ignore_divisions=True,append=True)
files = os.listdir(output_path + "/tmp")
for f in files:
shutil.move(output_path+"/tmp"+"/"+f, output_path)
shutil.rmtree(output_path+"/tmp")
else:
df.repartition(partition_size="128MB")\
.to_parquet(output_path, engine='pyarrow', append=False)
if __name__ == "__main__":
files_dict = read_files_from_path(inputPath)
number_of_batches = 1000/500 # total files/batchsize
for sub_file_names in np.array_split(files_dict[0], num_parts):
paths = [os.path.join(root_dir, file_name) for file_name in sub_file_names]
mergedDF = parquetMerge(paths)
final_write_parquet(megedDF,outputPath)
Upvotes: 4
Views: 6542
Reputation: 26
For memory issue : Use 'pyarrow table' instead of 'pandas dataframes'
For schema issue : You can create your own customized 'pyarrow schema' and cast each pyarrow table with your schema.
import pyarrow as pa
import pyarrow.parquet as pq
def merge_small_parquet_files(small_files, result_file):
pqwriter = None
for small_file in small_files:
table = pq.read_table(small_file)
pyarrow_schema = get_pyarrow_schema()
if not pqwriter:
pqwriter = pq.ParquetWriter(result_file,
schema=pyarrow_schema,
compression='GZIP',
coerce_timestamps='ms', allow_truncated_timestamps=True)
table = table.cast(pyarrow_schema)
pqwriter.write_table(table)
table = None
del table
if pqwriter:
pqwriter.close()
def get_pyarrow_schema():
fields = []
fields.append(pa.field('first_name', pa.string()))
fields.append(pa.field('last_name', pa.string()))
fields.append(pa.field('Id', pa.float64()))
fields.append(pa.field('Salary', pa.float64()))
fields.append(pa.field('Time', pa.timestamp('ms')))
pyarrow_schema = pa.schema(fields)
return pyarrow_schema
if __name__ == '__main__':
small_files = ['file1.parquet', 'file2.parquet', 'file3.parquet', 'file4.parquet']
result_file = 'large.parquet'
merge_small_parquet_files(small_files, result_file)
Upvotes: 1
Reputation: 57271
Dask dataframes assume that all partitions have the same schema (column names and datatypes). If you want to mix different datasets that have almost the same schema then you will need to handle this manually. Dask dataframe provides no automated support here today.
Upvotes: 2