NS Saravanan
NS Saravanan

Reputation: 303

decompress (unzip/extract) util using spark scala

I have customer_input_data.tar.gz in HDFS, which have 10 different tables data in csv file format. so i need to unzip this file to /my/output/path using spark scala

please suggest how to unzip customer_input_data.tar.gz file using spark scala

Upvotes: 0

Views: 3842

Answers (2)

NS Saravanan
NS Saravanan

Reputation: 303

I developed the below code for decompress the files using scala. You need to pass input path and output path and Hadoopfile system

    /*below method used for processing zip files*/
  @throws[IOException]
  private def processTargz(fullpath: String, houtPath: String, fs: FileSystem): Unit = {
    val path = new Path(fullpath)
    val gzipIn = new GzipCompressorInputStream(fs.open(path))
    try {
      val tarIn = new TarArchiveInputStream(gzipIn)
      try {
        var entry:TarArchiveEntry = null
        out.println("Tar entry")
        out.println("Tar Name entry :" + FilenameUtils.getName(fullpath))
        val fileName1 = FilenameUtils.getName(fullpath)
        val tarNamesFolder = fileName1.substring(0, fileName1.indexOf('.'))
        out.println("Folder Name : " + tarNamesFolder)
        while ( {
          (entry = tarIn.getNextEntry.asInstanceOf[TarArchiveEntry]) != null
        }) { // entity Name as tsv file name which are part of inside compressed tar file
          out.println("ENTITY NAME : " + entry.getName)

          /** If the entry is a directory, create the directory. **/
          out.println("While")
          if (entry.isDirectory) {
            val f = new File(entry.getName)
            val created = f.mkdir
            out.println("mkdir")
            if (!created) {
              out.printf("Unable to create directory '%s', during extraction of archive contents.%n", f.getAbsolutePath)
              out.println("Absolute path")
            }
          }
          else {
            var count = 0
            val slash = "/"
            val targetPath = houtPath + slash + tarNamesFolder + slash + entry.getName
            val hdfswritepath = new Path(targetPath)
            val fos = fs.create(hdfswritepath, true)
            try {
              val dest = new BufferedOutputStream(fos, BUFFER_SIZE)
              try {
                val data = new Array[Byte](BUFFER_SIZE)
                while ( {
                  (count = tarIn.read(data, 0, BUFFER_SIZE)) != -1
                }) dest.write(data, 0, count)
              } finally if (dest != null) dest.close()
            }
          }
        }
        out.println("Untar completed successfully!")
      } catch {
        case e: IOException =>
          out.println("catch Block")
      } finally {
        out.println("FINAL Block")
        if (tarIn != null) tarIn.close()
      }
    }
  }

Upvotes: 0

ameet chaubal
ameet chaubal

Reputation: 1540

gzip is not a splittable format in Hadoop. Consequently, the file is not really going to be distributed across the cluster and you don't get any benefit of distributed compute/processing in hadoop or Spark.

Better approach may be to,

  • uncompress the file on the OS and then individually send the files back to hadoop.

If you still want to uncompress in scala, you can simply resort to java class GZIPInputStream via

new GZIPInputStream(new FileInputStream("your file path"))

Upvotes: 1

Related Questions