Reputation: 123
The error here:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 132.0 failed 4 times, most recent failure : Lost task 0.3 in stage 132.0:
java.lang.StringIndexOutOfBoundsException: String index out of range: -650599791
at java.lang.String.<init>(String.java:196)
at com.esotericsoftware.kryo.io.Input.readString(Input.java:484)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:195)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:184)
at com.esotericsoftware.kryo.readClassAndObject(kryo.java:790)
at arg.apache.spark.Serializer.kryoDeserializationStream.readObject(kryoSerializer.scala:244)
at arg.apache.spark.Serializer.DeserializationStream.readKey(Serializer.scala:157)
at arg.apache.spark.Serializer.DeserializationStream.$$anon$2.getNext(Serializer.scala:189)
at arg.apache.spark.Serializer.DeserializationStream.$$anon$2.getNext(Serializer.scala:186)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.completionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
at org.apache.spark.Aggregator.combineValuesBykey(Aggregator.scala:41)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1$$anonfun$apply$11.apply(PairRDDFunctions.scala:99)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1$$anonfun$apply$11.apply(PairRDDFunctions.scala:97)
The error occurred when I called spark grouping method in Java. Spark version is 2.1.0 and Java verson is 1.8.
JavaPairRDD<String, List<String>> combineRdd = pairRDD.partitionBy(new HashPartitioner(mission.getCombineCount()))
.combineByKey(new Function<String, List<String>>() {
private static final long serialVersionUID = 6592724289900217307L;
@Override
public List<String> call(String v1) throws Exception {
List<String> re = new ArrayList<>();
re.add(v1);
return re;
}
}, new Function2<List<String>, String, List<String>>() {
private static final long serialVersionUID = -5882646966686065803L;
@Override
public List<String> call(List<String> v1, String v2) throws Exception {
v1.add(v2);
return v1;
}
}, new Function2<List<String>, List<String>, List<String>>() {
private static final long serialVersionUID = -1624057077103693447L;
@Override
public List<String> call(List<String> v1, List<String> v2) throws Exception {
v1.addAll(v2);
return v1;
}
});
System.out.println("group rdd count: " + combineRdd.count());
The reason I can think of is too much data. Should I do something before group data. Any other reason?
Upvotes: 2
Views: 1157
Reputation: 2451
I guess there is a problem with v1.addAll(v2);
, you have to create and return new list:
@Override
public List<String> call(List<String> v1, List<String> v2) throws Exception {
List<String> list = new ArrayList<String>(v1);
list.addAll(v2);
return list;
// in java 8
// return Stream.concat(a.stream(), b.stream()).collect(Collectors.toList())
}
Moreover if you are using java 8 you can use lambdas and do it in one line, see example below:
JavaPairRDD<String, List<String>> pair = spark.range(10L)
.toJavaRDD()
.mapToPair(s -> Tuple2.apply(
Long.valueOf(s % 3).toString(),
Arrays.asList(s % 2, s, s + 1)
.stream()
.map(z -> z.toString())
.collect(Collectors.toList())
)
);
pair.foreach(s -> System.out.println(s._1 + "," + s._2.toString()));
pair.reduceByKey((a, b) ->
Stream.concat(a.stream(), b.stream()).collect(Collectors.toList())
).foreach(s -> System.out.println(s._1 + "," + s._2.toString() + "gr. count: " + s._2.size()));
output:
0,[0, 0, 1]
1,[1, 1, 2]
2,[0, 2, 3]
0,[1, 3, 4]
1,[0, 4, 5]
2,[1, 5, 6]
0,[0, 6, 7]
1,[1, 7, 8]
2,[0, 8, 9]
0,[1, 9, 10]
0,[0, 0, 1, 1, 3, 4, 0, 6, 7, 1, 9, 10]gr. count: 12
2,[0, 2, 3, 1, 5, 6, 0, 8, 9]gr. count: 9
1,[1, 1, 2, 0, 4, 5, 1, 7, 8]gr. count: 9
Upvotes: 1