Reputation: 733
I am having HMP dataset. This data set has 14 different folders (categories) and each category has multiple CSV files in it.
I want to read data from all csv files into a single dataframe. Schema for data is
val Tschema = StructType(Array(
StructField("X", IntegerType, true),
StructField("Y", IntegerType, nullable = true),
StructField("Z", IntegerType, true)
))
I addition i want to add Two more columns to dataframe. First column contains the name of folder(category) containing current CSV and second column contains name of CSV file.
I have tried following code but it did not work properly.
val path = System.getProperty("user.home") + "/Desktop/HMP/*" // Path to all categories
val df =spark.sparkContext.wholeTextFiles(path)
df.toDF().show(5 , false)
output of my code is
+----------------------------------------------------------------------+--------------------+
| _1| _2|
+----------------------------------------------------------------------+--------------------+
|Climb_stairs/Accelerometer-2012-06-06-14-13-20-climb_stairs-m7.txt |12 38 35 |
|Climb_stairs/Accelerometer-2012-06-06-14-13-20-climb_stairs-m7.txt |23 56 34 |
|Climb_stairs/Accelerometer-2012-06-06-14-13-20-climb_stairs-m7.txt |13 36 36 |
|Climb_stairs/Accelerometer-2012-06-06-14-13-20-climb_stairs-m7.txt |39 57 42 |
|Climb_stairs/Accelerometer-2012-06-06-14-13-20-climb_stairs-m7.txt |26 51 36 |
+----------------------------------------------------------------------+--------------------+
Here in first column (_1) before \ is part that i want to be on separate column class and remain part will be in column source. On the _2 part i want to apply schema that I defined.
I want final output to will look like following.
+---+---+---+--------------+---------------------+
| X| Y| Z| class| source|
+---+---+---+--------------+---------------------+
| 37| 34| 43| Climb_stairs|Accelerometer-2011...|
| 05| 39| 34| Climb_stairs|Accelerometer-2011...|
| 30| 53| 49| Climb_stairs|Accelerometer-2011...|
+---+---+---+-------------+----------------------+
Upvotes: 0
Views: 1714
Reputation: 1751
I think you're looking at files from local file system. can you include details of what do you get in df
? Are you running spark in local mode?
If you want to try on Cloudera VM, you can do something like, put two of those csv files into hdfs location by following below steps
hdfs dfs -mkdir /files
hdfs dfs -put sample.csv sample2.csv /files/
Run spark as
spark2-shell
val df = spark.read.csv("/files/")
df.show
For reading file name and directory, you may need to play with split
and input_file_name
functions depending on exact location of files on HDFS.
You could add something as below.
val df2 = df.withColumn("file_name", split(input_file_name(), "/").getItem(7).cast(StringType))
Similarly you can play with input_file_name
and probably substr
to grab input directory depending on what part you want.
Upvotes: 1
Reputation: 1892
you could achieve this by using input_file_name with split and reverse reading
multiple input file
scala> var df_v = spark.read.format("csv").option("header",true).option("inferSchema",true).load("input_file/*.csv")
scala> df_v.show
+---------------+-------------------+
| id| DateTime|
+---------------+-------------------+
|340054675199675|15-01-2018 19:43:23|
|340054675199675|15-01-2018 10:56:43|
|340028465709212|10-01-2018 02:47:11|
|340054675199675|09-01-2018 10:59:10|
|340028465709212|02-01-2018 03:25:35|
|340054675199675|28-12-2017 05:48:04|
|340054675199675|21-12-2017 15:47:51|
|340028465709212|18-12-2017 10:33:04|
|340028465709212|16-12-2017 19:55:40|
|340028465709212|16-12-2017 19:55:40|
|340028465709212|12-12-2017 07:04:51|
|340054675199675|06-12-2017 08:52:38|
| 21000101| null|
| 20991231| null|
+---------------+-------------------+
apply input_file_name inbuilt function to get filename
scala> var df_v1 =df_v.withColumn("file",input_file_name).withColumn("folder",reverse(split($"file","/"))(1)).withColumn("filename",reverse(split($"file","/"))(0))//.drop("file")
scala> df_v1.show(false)
+---------------+-------------------+------------------------------------------+----------+-----------+
|id |DateTime |file |folder |filename |
+---------------+-------------------+------------------------------------------+----------+-----------+
|340054675199675|15-01-2018 19:43:23|file:///home/mahesh/input_file/test.csv |input_file|test.csv |
|340054675199675|15-01-2018 10:56:43|file:///home/mahesh/input_file/test.csv |input_file|test.csv |
|340028465709212|10-01-2018 02:47:11|file:///home/mahesh/input_file/test.csv |input_file|test.csv |
|340054675199675|09-01-2018 10:59:10|file:///home/mahesh/input_file/test.csv |input_file|test.csv |
|340028465709212|02-01-2018 03:25:35|file:///home/mahesh/input_file/test.csv |input_file|test.csv |
|340054675199675|28-12-2017 05:48:04|file:///home/mahesh/input_file/test.csv |input_file|test.csv |
|340054675199675|21-12-2017 15:47:51|file:///home/mahesh/input_file/test.csv |input_file|test.csv |
|340028465709212|18-12-2017 10:33:04|file:///home/mahesh/input_file/test.csv |input_file|test.csv |
|340028465709212|16-12-2017 19:55:40|file:///home/mahesh/input_file/test.csv |input_file|test.csv |
|340028465709212|16-12-2017 19:55:40|file:///home/mahesh/input_file/test.csv |input_file|test.csv |
|340028465709212|12-12-2017 07:04:51|file:///home/mahesh/input_file/test.csv |input_file|test.csv |
|340054675199675|06-12-2017 08:52:38|file:///home/mahesh/input_file/test.csv |input_file|test.csv |
|21000101 |null |file:///home/mahesh/input_file/so_date.csv|input_file|so_date.csv|
|20991231 |null |file:///home/mahesh/input_file/so_date.csv|input_file|so_date.csv|
+---------------+-------------------+------------------------------------------+----------+-----------+
Uncomment drop column if you don't want I just keep for clarification.
Upvotes: 0