ajayian
ajayian

Reputation: 963

Spark SQL Java GenericRowWithSchema cannot be cast to java.lang.String

I have an application that's attempting to read a group of csv from a cluster dir and write them as parquet file using Spark.

SparkSession sparkSession = createSession();
    JavaRDD<Row> entityRDD = sparkSession.read()
            .csv(dataCluster + "measures/measures-*.csv")
            .javaRDD()
            .mapPartitionsWithIndex(removeHeader, false)
            .map((Function<String, Measure>) s -> {
                String[] parts = s.split(COMMA);
                Measure measure = new Measure();
                measure.setCobDate(parts[0]);
                measure.setDatabaseId(Integer.valueOf(parts[1]));
                measure.setName(parts[2]);

                return measure;
            });

    Dataset<Row> entityDataFrame = sparkSession.createDataFrame(entityRDD, Measure.class);
    entityDataFrame.printSchema();

    //Create parquet file here
    String parquetDir = dataCluster + "measures/parquet/measures";
    entityDataFrame.write().mode(SaveMode.Overwrite).parquet(parquetDir);


    sparkSession.stop();

The Measure class is a simple POJO that implements Serializable. The schema is printed so there must be a problem translating the DataFrame entries to the parquet file. Here's the error I get:

Lost task 2.0 in stage 1.0 (TID 3, redlxd00006.fakepath.com, executor 1): org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.String
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:244)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
        ... 8 more

Ultimately my intention is to use Spark SQL to filter and join the data with other csvs, containing other table data, and write the entire results to parquet. I've only found scala related questions which haven't addressed my problem. Any help is much appreciated.

csv:

cob_date, database_id, name
20181115,56459865,name1
20181115,56652865,name6
20181115,56459845,name32
20181115,15645936,name3

Upvotes: 0

Views: 12212

Answers (2)

ajayian
ajayian

Reputation: 963

Adding toDF() and updating the map lambda as suggested by Serge fixed my problem:

SparkSession sparkSession = createSession();
JavaRDD<Row> entityRDD = sparkSession.read()
     .csv(prismDataCluster + "measures/measures-*chop.csv")
     .toDF("cobDate","databaseId","name")
     .javaRDD()
     .mapPartitionsWithIndex(removeHeader, false)
     .map((Function<Row, Measure>) row -> {
              Measure measure = new Measure();
              measure.setCobDate(row.getString(row.fieldIndex("cobDate")));
              measure.setDatabaseId(row.getString(row.fieldIndex("databaseId")));
              measure.setName(row.getString(row.fieldIndex("name")));

TVM.

Upvotes: 1

Serge Harnyk
Serge Harnyk

Reputation: 1339

.map((Function<String, Measure>) s -> {

Looks like here should be

.map((Function<Row, Measure>) s -> {

Upvotes: 1

Related Questions