Reputation: 3158
The obvious idea is to add the elements.
ArrayList<String> myvalues = new ArrayList<String>();
myRdd.foreach(new VoidFunction<org.apache.spark.sql.api.java.Row>() {
@Override
public void call(org.apache.spark.sql.api.java.Row row) throws Exception {
myvalues.add(row.getString(0); // Say I need only first element
}
});
This, and the other alternatives have been throwing org.apache.spark.SparkException: Task not serializable. I simplified the function further.. apparently I'm doing something illogical:-
LOG.info("Let's see..");
queryRdd.foreach(new VoidFunction<org.apache.spark.sql.api.java.Row>() {
@Override
public void call(org.apache.spark.sql.api.java.Row row) throws Exception {
LOG.info("Value is : "+row.getString(0));
}
});
There has to be a straightforward way. Here's the stacktrace for reference:
2015-10-08 10:16:48 INFO UpdateStatementTemplateImpl:141 - Lets see..
2015-10-08 10:16:48 WARN GenericExceptionMapper:20 - Error while executing service
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1476)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:781)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:313)
at org.apache.spark.sql.api.java.JavaSchemaRDD.foreach(JavaSchemaRDD.scala:42)
at com.simility.cassandra.template.DeviceIDTemplateImpl.test(DeviceIDTemplateImpl.java:144)
at com.kumbay.service.admin.BusinessEntityService.testSignal(BusinessEntityService.java:1801)
at com.kumbay.service.admin.BusinessEntityService$$FastClassByCGLIB$$157ddd50.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:701)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:96)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:260)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:94)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:64)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:634)
Upvotes: 0
Views: 1498
Reputation: 6059
I assume LOG
and myvalues
live in a containing class. Thus the whole class (as being part of the "capture" of call
is going to be serialized, which is not possible.
First, substitute LOG by a simple System.out.println
and see if that works.
Second, create a copy of the members you are using inside of call;
public void call(...) {
Log log = LOG // or
ArrayList<String> inside = myvalues
inside.add(...)
}
Third, never use an ArrayList inside foreach
as it is run on different nodes and each node will see its own ArrayList. So, you will never the what you expect.
Instead, use rdd.collect(...)
to collect your results!
Upvotes: 2