Moustafa Mahmoud
Moustafa Mahmoud

Reputation: 1590

Spark Reading Compressed with Special Format

I have a file .gz I need to read this file and add the time and file name to this file I have some problems and need your help to recommend a way for this points.

  1. Because the file is compressed the first line is reading with not the proper format I think due to encoding problem I tried the below code but not working

    implicit val codec = Codec("UTF-8")
    codec.onMalformedInput(CodingErrorAction.REPLACE)
    codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
    
  2. File has special format and I need to read it using Regex into a datafame ==> the only way i found is to read it using RDD and map it to the regex is there any way to read it direct to DF and pass the regex?

    val Test_special_format_RawData = sc.textFile("file://"+filename.toString())
      .map(line ⇒ line.replace("||", "|NA|NA"))
      .map(line ⇒ if (line.takeRight(1) == "|") line+"NA" else line)
      .map { x ⇒ regex_var.findAllIn(x).toArray }
    
    import hiveSqlContext.implicits._
    
    val Test_special_format_DF = Test_special_format_RawData.filter { x⇒x.length==30 }
      .filter { x⇒x(0) !=header(0) }
      .map { x⇒ (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7),
                 x(8), x(9), x(10), x(11), x(12), x(13), x(14),
                 x(15),x(16), x(17), x(18), x(19))}.toDF()
    
    val Test_special_format_Tranformed_Data = Test_special_format_DF.withColumn("FileName", lit(filename.getName))
      .withColumn("rtm_insertion_date", lit(RTM_DATE_FORMAT.format(Cal.getInstance().getTime())))
    
  3. Can I ignore any delimiter between any special charachter for example if "|" pipe coming between ^~ ^~ ignore it?

  4. Some times the dataframe columns types received by wrong data types. How can we handle this problem to apply data quality checks?

  5. When I tried to insert into hive from the Spark using Dataframe. Can I specify the rejection Directory for un handle rows error below is the code I used?

    Test_special_format_Tranformed_Data.write.partitionBy("rtm_insertion_date")
      .mode(SaveMode.Append).insertInto("dpi_Test_special_format_source")
    

Sample of the file is here

Upvotes: 4

Views: 1252

Answers (1)

Moustafa Mahmoud
Moustafa Mahmoud

Reputation: 1590

I will answer my question regarding the file format issue. The solution is to override the default extension format for the gzib.

import org.apache.hadoop.io.compress.GzipCodec

class TmpGzipCodec extends GzipCodec {

  override def getDefaultExtension(): String = ".gz.tmp"

}

Now we just registered this codec, setting spark.hadoop.io.compression.codecs on SparkConf:

val conf = new SparkConf()

// Custom Codec that process .gz.tmp extensions as a common Gzip format
conf.set("spark.hadoop.io.compression.codecs", "smx.ananke.spark.util.codecs.TmpGzipCodec")

val sc = new SparkContext(conf)

val data = sc.textFile("s3n://my-data-bucket/2015/09/21/13/*")

I found this solution is this link

Regarding the malformed records, There are two solutions as follow:

  1. Case as case class and then check if it pattern matched this case class or not.
  2. Parse the RDD line by line but it required update in the spark.csv library.

Regarding delimiter delimiter issue, it required to use RDD with regex.

Upvotes: 3

Related Questions