Azalkor
Azalkor

Reputation: 87

Writing from Spark to HBase : org.apache.spark.SparkException: Task not serializable

I'm on a heatmap project for my university, we have to get some data (212Go) from a txt file (coordinates, height), then put it in HBase to retrieve it on a web client with Express.

I practiced using a 144Mo file, this is working :

SparkConf conf = new SparkConf().setAppName("PLE");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> data = context.textFile(args[0]);
Connection co = ConnectionFactory.createConnection(getConf());
createTable(co);
Table table = co.getTable(TableName.valueOf(TABLE_NAME));
Put put = new Put(Bytes.toBytes("KEY"));

for (String s : data.collect()) {
    String[] tmp = s.split(",");
    put.addImmutable(FAMILY,
                    Bytes.toBytes(tmp[2]),
                    Bytes.toBytes(tmp[0]+","+tmp[1]));
}

table.put(put);

But I now that I use the 212Go file, I got some memory errors, I guess the collect method gather all the data in memory, so 212Go is too much.

So now I'm trying this :

SparkConf conf = new SparkConf().setAppName("PLE");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> data = context.textFile(args[0]);
Connection co = ConnectionFactory.createConnection(getConf());
createTable(co);
Table table = co.getTable(TableName.valueOf(TABLE_NAME));
Put put = new Put(Bytes.toBytes("KEY"));

data.foreach(line ->{
    String[] tmp = line.split(",");
    put.addImmutable(FAMILY,
                    Bytes.toBytes(tmp[2]),
                    Bytes.toBytes(tmp[0]+","+tmp[1]));
});

table.put(put);

And I'm getting "org.apache.spark.SparkException: Task not serializable", I searched about it and tried some fixing, without success, upon what I read here : Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

Actually I don't understand everything in this topic, I'm just a student, maybe the answer to my problem is obvious, maybe not, anyway thanks in advance !

Upvotes: 2

Views: 262

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35229

As a rule of thumb - serializing database connections (any type) doesn't make sense. There are not designed to be serialized and deserialized, Spark or not.

Create connection for each partition:

data.foreachPartition(partition -> {
  Connection co = ConnectionFactory.createConnection(getConf());
  ... // All required setup
  Table table = co.getTable(TableName.valueOf(TABLE_NAME));
  Put put = new Put(Bytes.toBytes("KEY"));
   while (partition.hasNext()) {
     String line = partition.next();
     String[] tmp = line.split(",");
     put.addImmutable(FAMILY,
                Bytes.toBytes(tmp[2]),
                Bytes.toBytes(tmp[0]+","+tmp[1]));
   }
   ... // Clean connections
});

I also recommend reading Design Patterns for using foreachRDD from the official Spark Streaming programming guide.

Upvotes: 1

Related Questions