Aditya
Aditya

Reputation: 3158

How to extract RDD's data to Java ArrayList?

The obvious idea is to add the elements.

ArrayList<String> myvalues = new ArrayList<String>();

myRdd.foreach(new VoidFunction<org.apache.spark.sql.api.java.Row>() {
    @Override
    public void call(org.apache.spark.sql.api.java.Row row) throws Exception {
        myvalues.add(row.getString(0); // Say I need only first element
    }
});

This, and the other alternatives have been throwing org.apache.spark.SparkException: Task not serializable. I simplified the function further.. apparently I'm doing something illogical:-

LOG.info("Let's see..");
queryRdd.foreach(new VoidFunction<org.apache.spark.sql.api.java.Row>() {
  @Override
  public void call(org.apache.spark.sql.api.java.Row row) throws Exception {
      LOG.info("Value is : "+row.getString(0));
  }
});

There has to be a straightforward way. Here's the stacktrace for reference:

2015-10-08 10:16:48 INFO  UpdateStatementTemplateImpl:141 - Lets see.. 
2015-10-08 10:16:48 WARN  GenericExceptionMapper:20 - Error while executing service
org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1476)
        at org.apache.spark.rdd.RDD.foreach(RDD.scala:781)
        at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:313)
        at org.apache.spark.sql.api.java.JavaSchemaRDD.foreach(JavaSchemaRDD.scala:42)
        at com.simility.cassandra.template.DeviceIDTemplateImpl.test(DeviceIDTemplateImpl.java:144)
        at com.kumbay.service.admin.BusinessEntityService.testSignal(BusinessEntityService.java:1801)
        at com.kumbay.service.admin.BusinessEntityService$$FastClassByCGLIB$$157ddd50.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:701)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
        at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:96)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:260)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:94)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
        at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:64)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:634)

Upvotes: 0

Views: 1498

Answers (1)

Martin Senne
Martin Senne

Reputation: 6059

I assume LOG and myvalues live in a containing class. Thus the whole class (as being part of the "capture" of call is going to be serialized, which is not possible.

Solution

First, substitute LOG by a simple System.out.println and see if that works.

Second, create a copy of the members you are using inside of call;

public void call(...) {
    Log log = LOG // or
    ArrayList<String> inside = myvalues
    inside.add(...)
}

Third, never use an ArrayList inside foreach as it is run on different nodes and each node will see its own ArrayList. So, you will never the what you expect.

Instead, use rdd.collect(...) to collect your results!

Upvotes: 2

Related Questions