Reputation: 111
Cassandra & DataStax community, I have a question that I'm hoping someone wise can help me with.
We are migrating our analtics code from Hadoop to Spark running on top of Cassandra (via DataStax Enterprise). DSE 4.7 in production, but 4.8 in development.
Java 7 in production, either Java 7/8 in development.
There are a couple of DataFrame transformations we need and we think that writing a UDF used via the Spark SQLContext against an in memory DataFrame will do the job. The primary of these are:
Our code is below. This runs well without the inclusion of the UDF calls in the sqlContext but as soon as they are added we are getting “Task is not Serializable” error
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
I have tried putting “implements Serializable” as the base class of this (and many other classes), which changes the error class to the next one up the chain, however this gets as far as failing on the Exception class is not serializable… which probably means I am heading in the wrong direction.
I have also tried implemeting the UDFs as lambda’s and that also results in the same error.
If anyone could point out what I am doing wrong it would be much appreciated!
public class entities implements Serializable{
private spark_context m_spx = null;
private DataFrame m_entities = null;
private String m_timekey = null;
public entities(spark_context _spx, String _timekey){
m_spx = _spx;
m_timekey = _timekey;
}
public DataFrame get_dimension(){
if(m_entities == null) {
DataFrame df = m_spx.get_flat_data(m_timekey).select("event", "url");
//UDF to generate hashed ids
UDF2 get_hashed_id = new UDF2<String, String, String>() {
public String call(String o, String o2) throws Exception {
return o.concat(o2);
}
};
//UDF to clean the " from strings
UDF1 clean_string = new UDF1<String, String>() {
public String call(String o) throws Exception {
return o.replace("\"","");
}
};
//Get the Spark SQL Context from SC.
SQLContext sqlContext = new SQLContext(m_spx.sc());
//Register the UDFs
sqlContext.udf().register("getid", get_hashed_id, DataTypes.StringType);
sqlContext.udf().register("clean_string", clean_string, DataTypes.StringType);
//Register the DF as a table.
sqlContext.registerDataFrameAsTable(df, "entities");
m_entities = sqlContext.sql("SELECT getid(event, url) as event_key, clean_string(event) as event_cleaned, clean_string(url) as url_cleaned FROM entities");
}
return m_entities;
}
}
Upvotes: 1
Views: 2159
Reputation: 37832
Your entities
class contains a SparkContext
member - so it can't be serializable (SparkContexts are interntionally not serializable, you're not supposed to serialize them).
Since entities
isn't serializable, any of it's non-static methods / members / anonymous inner classes aren't serializable either (because they'll try to serialize the entities
instance that holds them).
The best workaround in this case is extracting the anonymous UDFs into static members of the class:
private final static UDF2 get_hashed_id = new UDF2<String, String, String>() {
public String call(String o, String o2) throws Exception {
return o.concat(o2);
}
};
private final static UDF1 clean_string = new UDF1<String, String>() {
public String call(String o) throws Exception {
return o.replace("\"","");
}
};
Then you'll be able to use them in get_dimension
.
Upvotes: 8