Rodrigo Rondena
Rodrigo Rondena

Reputation: 31

Write HDFS outputfile with Scala

I'm trying to write a HDFS output file using Scala, and I'm receiving the error below:

exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.foreach(RDD.scala:868) Caused by: java.io.NotSerializableException: java.io.PrintWriter Serialization stack:

All line 23 I need to write a line in output file.

Code Source:

package com.mycode.logs;

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import scala.io._
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.PrintWriter;

/**
 * @author RondenaR
 * 
 */
object NormalizeMSLogs{

  def main(args: Array[String]){
    processMsLogs("/user/temporary/*file*")
  }

  def processMsLogs(path: String){
    System.out.println("INFO: ****************** started ******************")

    // **** SetMaster is Local only to test *****
    // Set context
    val sparkConf = new SparkConf().setAppName("tmp-logs").setMaster("local")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val hiveContext = new HiveContext(sc)

    // Set HDFS
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val hdfsconf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
    hdfsconf.set("fs.defaultFS", "hdfs://192.168.248.130:8020")
    val hdfs = FileSystem.get(hdfsconf)

    val output = hdfs.create(new Path("hdfs://192.168.248.130:8020/tmp/mySample.txt"))
    val writer = new PrintWriter(output)

    val sourcePath = new Path(path)
    var count :Int = 0
    var lineF :String = ""

    hdfs.globStatus( sourcePath ).foreach{ fileStatus =>
      val filePathName = fileStatus.getPath().toString()
      val fileName = fileStatus.getPath().getName()

      val hdfsfileIn = sc.textFile(filePathName)
      val msNode = fileName.substring(1, fileName.indexOf("es"))

      System.out.println("filePathName: " + filePathName)
      System.out.println("fileName: " + fileName)
      System.out.println("hdfsfileIn: " + filePathName)
      System.out.println("msNode: " + msNode)

      for(line <- hdfsfileIn){
        //System.out.println("line = " + line)
        count += 1

        if(count != 23){
          lineF = lineF + line + ", "
        }

        if(count == 23){
          lineF = lineF + line + ", " + msNode
          System.out.println(lineF)
          writer.write(lineF) 
          writer.write("\n")
          count = 0
          lineF = ""
        }
      } // end for loop in file
    } // end foreach loop
    writer.close()
    System.out.println("INFO: ******************ended ******************")
    sc.stop()
  }
}

Upvotes: 0

Views: 973

Answers (1)

WestCoastProjects
WestCoastProjects

Reputation: 63022

Not only is the PrintWriter object writer not serializable: Also you can not put the SparkContext (sc) inside of the foreach: it is a construct only for the Driver and does not make sense to send across the wire to the workers.

You should take some time to think about what types of objects make sense to send over the wire. Any pointers / streams / handles do NOT make sense. Structs, Strings, primitives: these do make sense to include in a Closure (or broadcast).

Upvotes: 1

Related Questions