Reputation: 1590
I have a file .gz I need to read this file and add the time and file name to this file I have some problems and need your help to recommend a way for this points.
Because the file is compressed the first line is reading with not the proper format I think due to encoding problem I tried the below code but not working
implicit val codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
File has special format and I need to read it using Regex into a datafame ==> the only way i found is to read it using RDD and map it to the regex is there any way to read it direct to DF and pass the regex?
val Test_special_format_RawData = sc.textFile("file://"+filename.toString())
.map(line ⇒ line.replace("||", "|NA|NA"))
.map(line ⇒ if (line.takeRight(1) == "|") line+"NA" else line)
.map { x ⇒ regex_var.findAllIn(x).toArray }
import hiveSqlContext.implicits._
val Test_special_format_DF = Test_special_format_RawData.filter { x⇒x.length==30 }
.filter { x⇒x(0) !=header(0) }
.map { x⇒ (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7),
x(8), x(9), x(10), x(11), x(12), x(13), x(14),
x(15),x(16), x(17), x(18), x(19))}.toDF()
val Test_special_format_Tranformed_Data = Test_special_format_DF.withColumn("FileName", lit(filename.getName))
.withColumn("rtm_insertion_date", lit(RTM_DATE_FORMAT.format(Cal.getInstance().getTime())))
Can I ignore any delimiter between any special charachter for example if "|" pipe coming between ^~ ^~ ignore it?
Some times the dataframe columns types received by wrong data types. How can we handle this problem to apply data quality checks?
When I tried to insert into hive from the Spark using Dataframe. Can I specify the rejection Directory for un handle rows error below is the code I used?
Test_special_format_Tranformed_Data.write.partitionBy("rtm_insertion_date")
.mode(SaveMode.Append).insertInto("dpi_Test_special_format_source")
Sample of the file is here
Upvotes: 4
Views: 1252
Reputation: 1590
I will answer my question regarding the file format issue. The solution is to override the default extension format for the gzib.
import org.apache.hadoop.io.compress.GzipCodec
class TmpGzipCodec extends GzipCodec {
override def getDefaultExtension(): String = ".gz.tmp"
}
Now we just registered this codec, setting spark.hadoop.io.compression.codecs on SparkConf:
val conf = new SparkConf()
// Custom Codec that process .gz.tmp extensions as a common Gzip format
conf.set("spark.hadoop.io.compression.codecs", "smx.ananke.spark.util.codecs.TmpGzipCodec")
val sc = new SparkContext(conf)
val data = sc.textFile("s3n://my-data-bucket/2015/09/21/13/*")
I found this solution is this link
Regarding the malformed records, There are two solutions as follow:
Regarding delimiter delimiter issue, it required to use RDD with regex.
Upvotes: 3