Quentin Geff
Quentin Geff

Reputation: 829

How to write an avro file from csv file with Spark?

I am faced with a NullPointerException when i try to write avro file from a DF created from csv files :

 public static void main(String[] args) {
    SparkSession spark = SparkSession
        .builder()
        .appName("SparkCsvToAvro")
        .master("local")
        .getOrCreate();

    SQLContext context = new SQLContext(spark);

    String path = "C:\\git\\sparkCsvToAvro\\src\\main\\resources";
    DataFrameReader read = context.read();
    Dataset<Row> csv = read.csv(path);
    DataFrameWriter<Row> write = csv.write();
    DataFrameWriter<Row> format = write.format("com.databricks.spark.avro");
    format.save("C:\\git\\sparkCsvToAvro\\src\\main\\resources\\avro");
}

My pom.xml :

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <junit.version>4.12</junit.version>
    <spark-core.version>2.1.0</spark-core.version>
    <maven-compiler-plugin.version>3.5.1</maven-compiler-plugin.version>
    <maven-compiler-plugin.source>1.8</maven-compiler-plugin.source>
    <maven-compiler-plugin.target>1.8</maven-compiler-plugin.target>
    <spark-avro.version>3.2.0</spark-avro.version>
    <spark-csv.version>1.5.0</spark-csv.version>
    <spark-sql.version>2.1.0</spark-sql.version>
</properties>

...
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>${maven-compiler-plugin.version}</version>
            <configuration>
                <source>${maven-compiler-plugin.source}</source>
                <target>${maven-compiler-plugin.target}</target>
            </configuration>
        </plugin>
    </plugins>
</build>

<dependencies>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark-core.version}</version>
    </dependency>

    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>${spark-avro.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark-sql.version}</version>
    </dependency>

</dependencies>

And the exception stacktrace :

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
...
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)

I don't know what i do wrong ? Maybe the dependencies are not correct ? Or it's just a bad pratice i do ?

The npe is here :DataFrameWriter<Row> format = write.format("com.databricks.spark.avro"); format.save("C:\\git\\sparkCsvToAvro\\src\\main\\resources\\avro");

"format" is null i don't know why ?

Upvotes: 0

Views: 3550

Answers (1)

Manish Saraf Bhardwaj
Manish Saraf Bhardwaj

Reputation: 1058

Way of parsing CSV in Spark 2.0 is

First initialize SparkSession object by default it will available in shells as spark

val spark = org.apache.spark.sql.SparkSession.builder
        .master("local")
        .appName("Spark CSV Reader")
        .getOrCreate;

Now use SparkSessions object to load CSV as DataFrame/DataSet

val df = spark.read
        .format("com.databricks.spark.csv")
        .option("header", "true") //reading the headers
        .option("mode", "DROPMALFORMED")
        .load("csv/file/path"); //.csv("csv/file/path") //spark 2.0 api

df.show()

Databricks provided library spark-avro, which helps us in reading and writing Avro data.

df.write.format("com.databricks.spark.avro").save(outputPath)

Upvotes: 1

Related Questions