Reputation: 963
I have an application that's attempting to read a group of csv from a cluster dir and write them as parquet file using Spark.
SparkSession sparkSession = createSession();
JavaRDD<Row> entityRDD = sparkSession.read()
.csv(dataCluster + "measures/measures-*.csv")
.javaRDD()
.mapPartitionsWithIndex(removeHeader, false)
.map((Function<String, Measure>) s -> {
String[] parts = s.split(COMMA);
Measure measure = new Measure();
measure.setCobDate(parts[0]);
measure.setDatabaseId(Integer.valueOf(parts[1]));
measure.setName(parts[2]);
return measure;
});
Dataset<Row> entityDataFrame = sparkSession.createDataFrame(entityRDD, Measure.class);
entityDataFrame.printSchema();
//Create parquet file here
String parquetDir = dataCluster + "measures/parquet/measures";
entityDataFrame.write().mode(SaveMode.Overwrite).parquet(parquetDir);
sparkSession.stop();
The Measure class is a simple POJO that implements Serializable. The schema is printed so there must be a problem translating the DataFrame entries to the parquet file. Here's the error I get:
Lost task 2.0 in stage 1.0 (TID 3, redlxd00006.fakepath.com, executor 1): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.String
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:244)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
... 8 more
Ultimately my intention is to use Spark SQL to filter and join the data with other csvs, containing other table data, and write the entire results to parquet. I've only found scala related questions which haven't addressed my problem. Any help is much appreciated.
csv:
cob_date, database_id, name
20181115,56459865,name1
20181115,56652865,name6
20181115,56459845,name32
20181115,15645936,name3
Upvotes: 0
Views: 12212
Reputation: 963
Adding toDF() and updating the map lambda as suggested by Serge fixed my problem:
SparkSession sparkSession = createSession();
JavaRDD<Row> entityRDD = sparkSession.read()
.csv(prismDataCluster + "measures/measures-*chop.csv")
.toDF("cobDate","databaseId","name")
.javaRDD()
.mapPartitionsWithIndex(removeHeader, false)
.map((Function<Row, Measure>) row -> {
Measure measure = new Measure();
measure.setCobDate(row.getString(row.fieldIndex("cobDate")));
measure.setDatabaseId(row.getString(row.fieldIndex("databaseId")));
measure.setName(row.getString(row.fieldIndex("name")));
TVM.
Upvotes: 1
Reputation: 1339
.map((Function<String, Measure>) s -> {
Looks like here should be
.map((Function<Row, Measure>) s -> {
Upvotes: 1