Reputation: 471
I am using Spark Dataframe API for loading/reading file from NFS Share and then saving/writing the data of that file to HDFS.
I have a three node Spark Cluster with One Master node and two Worker nodes. My Spark Cluster is using YARN as Cluster Manager, hence the two Worker nodes are the YARN NodeManager Nodes and master node is Yarn ResourceManager node.
I have a remote location say /data/files which is mounted to all the three YARN/SPARK nodes as it is [/data/files] where all the csv files [more than one] are present which I want to read from and finally write to HDFS.
I am running following code on my master node
import java.io.File
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
object TestMoreThan1CSV2DF {
private val source: String = "file:///data/files/"
private val destination = "hdfs://<myHostIP>:8020/raw/"
private val fileFormat : String = "com.databricks.spark.csv"
def main(args:Array[String]):Unit={
val conf = new SparkConf().setAppName("TestMoreThan1CSV2DF").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val fileArray: Array[File] = new java.io.File(source).listFiles.filter(_.getName.endsWith(".csv"))
for(file<-fileArray){
// reading csv file from shared location and taking whole data in a dataframe
var df = loadCSV2DF(sqlContext, fileFormat, "true", "true", file.getName)
// variable for holding destination location : HDFS Location
var finalDestination: String = destination+file.getName
// saving data into HDFS
writeDF2HDFS(df,fileFormat,"true",finalDestination) /// saved using default number of partition = 1
}
}
def loadCSV2DF(sqlContext : SQLContext, fileFormat: String, header : String, inferSchema: String, source: String) : DataFrame = {
try{
sqlContext.read.format(fileFormat)
.option("header", header) // Use first line of all files as header
.option("inferSchema", inferSchema) // Automatically infer data types
.load(source)
}
catch{
case ex: OnboardingException => {
throw ex;
}
}
}
def writeDF2HDFS(df: DataFrame, fileFormat: String, header: String, destination: String, partitions: Integer = 1){
try{
df.repartition(partitions).write.format(fileFormat).option("header",header).save(destination)
}
catch{
Case ez : OnboardingException => {
throw ez;
}
}
}
}
This code reads all the csv files present at shared location /data/files/ and write each one of them to HDFS. Ex: /data/files/f1.csv will get loaded into HDFS as /raw/f1.csv/part-xxxxx file
While running this code, I am not able to make out:
1) Where this whole code is running? Is it running on driver? or using both workers?
2) Does load() and save() API runs on worker nodes, does it work in parallel? If yes then how does two workers keeps track of the portion of while which it has read or written?
3) As of now I am reading each file sequentially in "for" loop and working on each one of them sequentially, is it possible to make it a multi threaded application, where each file is allocated to one thread for performing end to end read and write in parallel. Will disk IO be any constraint while doing this?
Any quick response/reference/pointers would be appreciated.
Regards, Bhupesh
Upvotes: 1
Views: 1171
Reputation: 471
Very good explanation copied from another thread for my query: differentiate driver code and work code in Apache Spark
Copying some part of it here as well: Everything that happens inside the closure created by a transformation happens on a worker. It means if something is passed inside map(...), filter(...), mapPartitions(...), groupBy*(...), aggregateBy*(...) is executed on the workers. It includes reading data from a persistent storage or remote sources.
Actions like count, reduce(...), fold(...) are usually executed on both driver and workers. Heavy lifting is performed in parallel by the workers and some final steps, like reducing outputs received from the workers, is performed sequentially on the driver.
Everything else, like triggering an action or transformation happens on the driver. In particular it means every action which requires access to SparkContext.
As far as my queries are concern: 1) Yes part of main() method runs on driver but transformations happens on
2) load() and save() runs on workers as we could see that loading creates dataframe [which gets stored in memory in partitions] and save creates part-xxxx files in hdfs which shows that workers are doing that
3) Still trying to achieve this, will answer this once done.
Thanks
Upvotes: 1
Reputation: 455
Good Experiment!!.
1) your code always runs on worker. driver program just to manage workers.
2) yes load() and save() API runs on worker nodes. and they work sequentially.
3) using multi threaded application : i have not tried yet. good luck to you "go for a try !!". but why do you want put your self in a complex situation !!. SPARK knows how to handle this situations.
Upvotes: 0