Luckylukee
Luckylukee

Reputation: 595

Adding a new column to dataframe in Spark SQL using Java API and JavaRDD<Row>

I am trying to create a new dataframe (in SparkSQL 1.6.2) after applying a mapPartition function as follow:

FlatMapFunction<Iterator<Row>,Row> mapPartitonstoTTF=rows->
{
    List<Row> mappedRows=new ArrayList<Row>();      
    while(rows.hasNext())
    {
        Row row=rows.next();            
        Row mappedRow= RowFactory.create(row.getDouble(0),row.getString(1),row.getLong(2),row.getDouble(3),row.getInt(4),row.getString(5),
                row.getString(6),row.getInt(7),row.getInt(8),row.getString(9),0L);      
        mappedRows.add(mappedRow);

    }
    return mappedRows;

};


JavaRDD<Row> sensorDataDoubleRDD=oldsensorDataDoubleDF.toJavaRDD().mapPartitions(mapPartitonstoTTF);

StructType oldSchema=oldsensorDataDoubleDF.schema();
StructType newSchema =oldSchema.add("TTF",DataTypes.LongType,false);

System.out.println("The new schema is: ");
newSchema.printTreeString();

System.out.println("The old schema is: ");
oldSchema.printTreeString();

DataFrame sensorDataDoubleDF=hc.createDataFrame(sensorDataDoubleRDD, newSchema);
sensorDataDoubleDF.show();

As seen from above I am adding a new LongType column with values of 0 to RDDs using RowFactory.create() function

However, I get exception at line running sensorDataDoubleDF.show(); as follow:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 117 in stage 26.0 failed 4 times, most recent failure: Lost task 117.3 in stage 26.0 (TID 3249, AUPER01-01-20-08-0.prod.vroc.com.au): scala.MatchError: 1435766400001 (of class java.lang.Long)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
    at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
    at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

The old schema is

root
 |-- data_quality: double (nullable = false)
 |-- data_sensor: string (nullable = true)
 |-- data_timestamp: long (nullable = false)
 |-- data_valueDouble: double (nullable = false)
 |-- day: integer (nullable = false)
 |-- dpnode: string (nullable = true)
 |-- dsnode: string (nullable = true)
 |-- month: integer (nullable = false)
 |-- year: integer (nullable = false)
 |-- nodeid: string (nullable = true)
 |-- nodename: string (nullable = true)

The new schema is like above with addition of a TTF column as LongType

root
 |-- data_quality: double (nullable = false)
 |-- data_sensor: string (nullable = true)
 |-- data_timestamp: long (nullable = false)
 |-- data_valueDouble: double (nullable = false)
 |-- day: integer (nullable = false)
 |-- dpnode: string (nullable = true)
 |-- dsnode: string (nullable = true)
 |-- month: integer (nullable = false)
 |-- year: integer (nullable = false)
 |-- nodeid: string (nullable = true)
 |-- nodename: string (nullable = true)
 |-- TTF: long (nullable = false)

I appreciate any help to figure it our where I am making mistake.

Upvotes: 0

Views: 1826

Answers (1)

abaghel
abaghel

Reputation: 15317

You have 11 columns in old schema but you are mapping only 10. Add row.getString(10) in RowFactory.create function.

Row mappedRow= RowFactory.create(row.getDouble(0),row.getString(1),row.getLong(2),row.getDouble(3),row.getInt(4),row.getString(5),
               row.getString(6),row.getInt(7),row.getInt(8),row.getString(9),row.getString(10),0L); 

Upvotes: 1

Related Questions