Reputation: 453
I am trying to write data in single file using spark scala:
while (loop > 0) {
val getReq = new HttpGet(ww.url.com)
val httpResponse = client.execute(getReq)
val data = Source.fromInputStream(httpResponse.getEntity.getContent()).getLines.mkString
val parser = JSON.parseFull(data)
val globalMap = parser.get.asInstanceOf[Map[String, Any]]
val reviewMap = globalMap.get("payload").get.asInstanceOf[Map[String, Any]]
val df = context.sparkContext.parallelize(Seq(reviewMap.get("records").get.toString())).toDF()
if (startIndex == 0) {
df.coalesce(1).write.mode(SaveMode.Overwrite).json("C:\\Users\\kh\\Desktop\\spark\\raw\\data\\final")
} else {
df.coalesce(1).write.mode(SaveMode.Append).json("C:\\Users\\kh\\Desktop\\spark\\raw\\data\\final")
}
startIndex = startIndex + limit
loop = loop - 1
httpResponse.close()
}
The number of file created is the number of loops and I want to create one file only. and it is also creating CRC file as well I want to remove those: I tried below config but it only stops creation of Success files:
.config("dfs.client.read.shortcircuit.skip.checksum", "true")
.config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
.config("fs.file.impl.disable.cache", true)
Any ideas to create one file only without crc and success files?
Upvotes: 0
Views: 242
Reputation: 1121
Re: "The number of file created is the number of loops"
Even though you are using df.coalesce(1) in your code, it is still being executed as many number of times as you run the while loop.
I want to create one file only From your code it seems that you are trying to invoke HTTP GET requests to some URL and save the content after parsing. If this understanding is right then I believe you should not be using a while loop to do this task. There is
map
transformation that you could use in the following manner.
Please find below the Psuedo-Code for your reference.
val urls = List("a.com","b.com","c.com")
val sourcedf = sparkContext.parallelize(urls).toDF
//this could be map or flatMap based on your requirement.
val yourprocessedDF = sourcedf.map(<< do your parsing here and emit data>>)
yourprocessedDF.repartition(1).write(<<whichever format you need>>)
Upvotes: 1