Reputation: 3209
I am using spark-sql-2.4.1version. I have a code something like below.
val dataDs = ///have dataset
val part_dataDs = dataDs.repartition(col("fieldX"));
StructType schemaType = part_dataDs.schema();
part_part_dataDs.foreachPartition(itr ->{
Iterable<Row> rowIt = () -> itr;
List<Row> objs = StreamSupport.stream(rowIt.spliterator(), false)
.collect(Collectors.toList());
System.out.println("inrow.length: " + objs.size());
Dataset<Row> partitionData = sparkSession.createDataFrame(objs, schemaType);
partitionData.show;
}
Error :
[Executor task launch worker for task 418] ERROR org.apache.spark.executor.Executor - Exception in task 1.0 in stage 21.0 (TID 418)
java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:77)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:380)
Upvotes: 0
Views: 588
Reputation: 13154
So, you seem to be misunderstanding a few basic things. The two functions: foreachPartition
and mapPartition
operate on each partition in the dataset. The itr
-variable in your
part_part_dataDs.foreachPartition(itr -> ....
refers to an iterator for the partition. You can use this iterator to iterate over the list of rows like you would if it was a list of strings.
In principle, you could write something like this:
part_part_dataDs.foreachPartition(itr ->{
itr.foreach(row -> {
System.out.println(row.getString(0));
})
})
although I have to stress that this piece of code make no sense at all. The println
statement would executed on some random worker node, so it's not something you would see, unless you run on a single node. Also, this example should simply use foreach
instead of foreachPartition
, but as this appear to be a contrived toy example, I can't tell if you do indeed require foreachPartition
or not.
Upvotes: 2