Seeker90
Seeker90

Reputation: 895

Merging Parquet Files with different columns in PySpark

I'm trying to merge multiple parquet files situated in HDFS by using PySpark.
These files have different columns and column types.

from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = SparkSession.builder.appName("test").config("spark.dynamicAllocation.enabled", "true").config("spark.shuffle.service.enabled", "true").config("spark.executor.cores","10").config("spark.executor.memory", "48G").config("spark.driver.memory", "86G").config('spark.dynamicAllocation.maxExecutors','30').enableHiveSupport().getOrCreate()

import os
import calendar
import time
import string

sc = spark.sparkContext
df = sqlContext.read.parquet("hdfs_path/*.parquet").coalesce(1)
df.write.parquet("hdfs_destination_path")

I got the below error-

Py4JJavaError: An error occurred while calling o83.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:474)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:509)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, pwccdhus-slave12.cip.com, executor 1): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
at parquet.column.Dictionary.decodeToBinary(Dictionary.java:44)
at org.apache.spark.sql.execution.vectorized.ColumnVector.getUTF8String(ColumnVector.java:631)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
... 8 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:188)
... 45 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.lang.UnsupportedOperationException: parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
at parquet.column.Dictionary.decodeToBinary(Dictionary.java:44)
at org.apache.spark.sql.execution.vectorized.ColumnVector.getUTF8String(ColumnVector.java:631)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
... 8 more

So I tried to set the parameter to enable schema merge, but it didn't work either.

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("test").config("spark.dynamicAllocation.enabled", "true").config("spark.shuffle.service.enabled", "true").config("spark.executor.cores","10").config("spark.executor.memory", "48G").config("spark.driver.memory", "86G").config('spark.dynamicAllocation.maxExecutors','30').enableHiveSupport().getOrCreate()

import os
import calendar
import time
import string

sc = spark.sparkContext
spark.conf.set("spark.sql.parquet.mergeSchema", "true")
df = sqlContext.read.parquet("hdfs_path/*.parquet").coalesce(1)
df.write.parquet("hdfs_destination_path")

spark.conf.set("spark.sql.parquet.mergeSchema", "true")

This resulted in the below error -

Py4JJavaError: An error occurred while calling o152.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 9, pwccdhus-slave22.cip.com, executor 1): org.apache.spark.SparkException: Failed merging schema of file hdfs://pwccdhus-master1.cip.com:8020/hdfs_path/xyz.parquet/part-00000-a6b8e35f-ce2f-416f-8cce-3e5a1e252380-c000.snappy.parquet:
root
 |-- CONTRACTING_FIRM_CLIENT_ID: string (nullable = true)
 |-- COMPANY_CODE: string (nullable = true)
 |-- PROFIT_CENTER: string (nullable = true)
 |-- FISCAL_MONTH: integer (nullable = true)
 |-- CHARGED_HOURS: double (nullable = true)
 |-- FEE_REV_EXTERNAL_CLIENTS: double (nullable = true)
 |-- ENGAGEMENT_MARGIN: double (nullable = true)
 |-- PRODUCT_CODE: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- WBS_ELEMENT_ID: double (nullable = true)

I want the final output to be one merged file in a specified location. What should be the approach?
PySpark is the only option that I have to go ahead with.

I also tried the following way -

import os
import calendar
import time
import string


sc = spark.sparkContext 
path = input("Enter Path: ")

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(path))
result = [file.getPath().getName() for file in list_status]
gzList = [ fi for fi in result if fi.endswith(".gz") ]
parquetList = [ fi for fi in result if fi.endswith(".parquet") ]


column_names = "ColA|ColB|ColC" 
temp = spark.createDataFrame( 
[ tuple('' for i in column_names.split("|")) 
], 
column_names.split("|") 
).where("1=0")

temp = temp.withColumn("id", monotonically_increasing_id())
if (len(gzList) == 0):
    for i in range(len(parquetList)):
        df = spark.read.parquet(path + parquetList[i])
        df.withColumn("id", monotonically_increasing_id())
        temp = df.join(df, "id", "outer").drop("id")

I am getting the below error -

AnalysisException: u'USING column `id` cannot be resolved on the left side of the join. The left-side columns: [WBS_ELEMENT_ID, WBS_ELEMENT_NAME, PROJECT_TYPE_ID, PROJECT_TYPE_NAME, CONTRACT_ID, CONTRACT_LINE_NUMBER, CONTRACT_LINE_NAME, WBS_FUNC, WBS_FUNC_DESCR, WBS_ELMT_STAT, WBS_ELMT_STAT_DESCR, ENG_CREATION_DATE, END_DATE, PROFIT_CENTER, COMPANY_CODE, CONTRACTING_FIRM_CLIENT_ID, PRODUCT_CODE];'

What am I doing wrong? I am trying to run a loop that will read all the files and merge them one at a time.

Upvotes: 0

Views: 5516

Answers (1)

shanmuga
shanmuga

Reputation: 4499

Since the file structures are different for each file in the directory hdfs_path you will have to read each file individually and create DataFrame. Use this scala code in spark-shell

// spark-shell
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);

val baseDir = "hdfs_path"
val fileUris = fs.listStatus(new Path(baseDir)).filter(_.isFile).map(_.getPath.toUri.toString).filter(_.endsWith("parquet"))
fileUris.foreach(println) // print file names

val dfs = fileUris.map(sqlContext.read.parquet)
val primaryKeyCols = Seq("col1", "col2")

// If you want to left join
val joined_df = dfs.reduce((x, y) => x.join(y, primaryKeyCols, "left"))

This will give you an Array of DataFrame. You should then proceed to merge them. You should either join (if you want to merge horizontally) or union (to merge vertically/append) method on DataFrame. Note the to union the DataFrames you will need to make them have same schema.

There is no AFAIK there are no filesystem api integration from pyspark. However you can print the fileUris in spark-shell and in separate pyspark console you could hard code this list.

# pyspark
file_uris = [
"hdfs://namenaode:8020/hdfs_path/file1.parquet", 
"hdfs://namenaode:8020/hdfs_path/file2.parquet", 

"hdfs://namenaode:8020/hdfs_path/fileN.parquet"
]

dfs = [sqlContext.read.parquet(x) for x in file_uris]
primary_key_cols = ["col1", "col2"]
df = reduce(lambda x,y: x.join(y, primary_key_cols, "left"), dfs) # for left join

Upvotes: 0

Related Questions