ivanm
ivanm

Reputation: 138

Apache Beam Java SDK SparkRunner write to parquet error

I'm using Apache Beam with Java. I'm trying to read a csv file and write it to parquet format using the SparkRunner on a predeployed Spark env, using local mode. Everything worked fine with the DirectRunner, but the SparkRunner simply wont work. I'm using maven shade plugin to build a fat jat.

Code is as below:

Java:

public class ImportCSVToParquet{
-- ommitted
                File csv = new File(filePath);
                PCollection<String> vals = pipeline.apply(TextIO.read().from(filePath));

                String parquetFilename = csv.getName().replaceFirst("csv", "parquet");
                String outputLocation = FolderConventions.getRawFilePath(confETL.getHdfsRoot(), parquetFilename);

                PCollection<GenericRecord> processed = vals.apply(ParDo.of(new ProcessFiles.GenericRecordFromCsvFn()))
                        .setCoder(AvroCoder.of(new Config().getTransactionSchema()));

                LOG.info("Processed file will be written to: " + outputLocation);
                processed.apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(conf.getTransactionSchema())).to(outputLocation));


        pipeline.run().waitUntilFinish();


}

POM dependencies:

<dependencies>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-spark</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-parquet</artifactId>
        <version>2.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.2.3</version>
    </dependency>
/dependencies>

Spark script:

spark-submit \
--class package.ImportCSVToParquet \
--master local[*] \
--executor-cores 2 \
--executor-memory 2g \
--driver-memory 2g \
--driver-cores 2 \
--conf spark.sql.codegen.wholeStage=false \
--conf spark.wholeStage.codegen=false \
--conf spark.sql.shuffle.partitions=2005 \
--conf spark.driver.maxResultSize=2g \
--conf spark.executor.memoryOverhead=4048 \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35" \
--conf "spark.driver.extraJavaOptions=-Djava.io.tmpdir=/path-to-tmp/" \
--conf "spark.driver.extraClassPath=./" \
--jars path-to-jar \
/path-to-jar "$@"

I get the following error:

2019-08-07 13:37:49 ERROR Executor:91 - Exception in task 3.0 in stage 0.0 (TID 3)
org.apache.beam.sdk.util.UserCodeException: java.lang.NoSuchMethodError: org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(Lorg/apache/parquet/io/OutputFile;)V
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
        at org.apache.beam.sdk.io.WriteFiles$WriteUnshardedTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
       at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:214)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:176)
        at org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
        at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:137)
        at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
        at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
        at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:344)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(Lorg/apache/parquet/io/OutputFile;)V
        at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:162)
        at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:153)
        at org.apache.parquet.avro.AvroParquetWriter.builder(AvroParquetWriter.java:43)
        at org.apache.beam.sdk.io.parquet.ParquetIO$Sink.open(ParquetIO.java:304)
        at org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink$1$1.prepareWrite(FileIO.java:1359)
        at org.apache.beam.sdk.io.FileBasedSink$Writer.open(FileBasedSink.java:937)
        at org.apache.beam.sdk.io.WriteFiles$WriteUnshardedTempFilesFn.processElement(WriteFiles.java:533)

It seems that the job does the reading and transformations, but fails when tries to write to the filesystem. I'm not using HDFS at the moment. Any ideas?

Upvotes: 0

Views: 1162

Answers (2)

Khalid Bourhaba
Khalid Bourhaba

Reputation: 66

Do not use spark.driver.userClassPathFirst and spark.executor.userClassPathFirst as it's stil experimental. But intstead, use spark.driver.extraClassPath and spark.executor.extraClassPath.

Definition from the official documentation : "Extra classpath entries to prepend to the classpath of the driver."

  • "prepend", as in, put in front of Spark’s core classpath.

Example :

--conf spark.driver.extraClassPath=C:\Users\Khalid\Documents\Projects\libs\jackson-annotations-2.6.0.jar;C:\Users\Khalid\Documents\Projects\libs\jackson-core-2.6.0.jar;C:\Users\Khalid\Documents\Projects\libs\jackson-databind-2.6.0.jar

This solved my problem (conflict between the version of Jackson i want to use, and the one spark is using).

Hope it helps.

Upvotes: 1

Ryan Skraba
Ryan Skraba

Reputation: 1158

I am certain that the ParquetIO depends on the Parquet 1.10+ release, which added a "hadoop-neutral" API to the parquet file readers/writers.

Spark 2.2.3 depends on Parquet 1.8.2, which does not have the builder(...) constructor that the Beam ParquetIO uses, which is confirmed by the exception.

If possible, the simplest solution would be to update to Spark 2.4 which has bumped the Parquet version to 1.10.0.

If you can't upgrade Spark versions, there are a couple of techniques for overriding the jars brought in by Spark:

  1. You can set spark.(driver|executor).userClassPathFirst to true, which will place the classes in your fat jar before the jars provided by spark. This might work, or it might introduce new dependency conflicts.

  2. You can try replacing the parquet-xx-1.8.2.jar in your local spark installation with parquet-xx-1.10.0 (assuming that they are drop-in replacements). If this works, you can apply the same strategy to a spark job in a cluster by setting the spark.yarn.jars property when submitting the job.

  3. You can try shading the beam ParquetIO and its parquet dependencies in your fat jar.

Edit: This is a known issue BEAM-5164.

Edit (workaround):

I managed to get this to work for Spark 2.2.3 by following the instructions with some modifications:

  • I used the scala 2.11 dependencies and set them to <scope>provided</scope> (probably optional).

  • I added the following three locations to the maven-shade-plugin:

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <configuration>
          <createDependencyReducedPom>false</createDependencyReducedPom>
          <filters>

... unchanged ...

          </filters>
          <relocations>
            <relocation>
              <pattern>org.apache.parquet</pattern>
              <shadedPattern>shaded.org.apache.parquet</shadedPattern>
            </relocation>
            <!-- Some packages are shaded already, and on the original spark classpath. Shade them more. -->
            <relocation>
              <pattern>shaded.parquet</pattern>
              <shadedPattern>reshaded.parquet</shadedPattern>
            </relocation>
            <relocation>
              <pattern>org.apache.avro</pattern>
              <shadedPattern>shaded.org.apache.avro</shadedPattern>
            </relocation>
          </relocations>
        </configuration>
        <executions>

... unchanged ...

        </executions>
      </plugin>
    </plugins>
  </build>

Upvotes: 5

Related Questions