Reputation: 339
My class is like this. I also get the result "is null". If remove transient tag on testClass, it will also result in error of task not serializable, even thought TestClass has been implement Serializable. So why is object testClass in mergeLog null?
public class MergeLog implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(LogFormat.class);
private transient SparkConf conf = new SparkConf().setAppName("log join");
private transient JavaSparkContext sc = new JavaSparkContext(conf);
private HiveContext hiveContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
private transient TestClass testClass = new TestClass();
public void process() {
JavaRDD<String> people = sc.textFile("/user/people.txt");
String schemaString = "name age";
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName: schemaString.split(" ")) {
fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = people.map(
new Function<String, Row>() {
@Override
public Row call(String record) throws Exception {
String[] fields = record.split(",");
return RowFactory.create(fields[0], fields[1].trim());
}
});
DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
JavaRDD<String> javaRDD = peopleDataFrame.toJavaRDD().map(
new Function<Row, String>() {
@Override
public String call(Row row) throws Exception {
String ins = null;
if (testClass == null) {
return "is null";
} else {
ins = testClass.calc(row);
}
}
});
}
public static void main(String[] args) {
MergeLog mergeLog = new MergeLog();
mergeLog.process();
}
}
class TestClass implements Serializable {
public String calc(Row row) {
return row.mkString();
}
}
Upvotes: 0
Views: 89
Reputation: 609
The test class is created on the driver end and as it is transient, the instance is not passed to the workers. Create a new instance of test inside
peopleDataFrame.toJavaRDD().map(
new Function<Row, String>() {
@Override
public String call(Row row) throws Exception {
String ins = null;
ins = new TestClass().calc(row);
}
}
});
Also, row class is not serializable, and hence it says not serializable exception when you remove the transient from TestClass. Pass only the required parameters from Row to the class for processing.
Upvotes: 1