Reputation: 11
I am trying to figure out how to use the first line of text file as header and skip seconds line. So far I have tried this:
scala> val file = spark.sparkContext.textFile("/home/webwerks/Desktop/UseCase-03-March/Temp/temp.out")
file: org.apache.spark.rdd.RDD[String] = /home/webwerks/Desktop/UseCase-03-March/Temp/temp.out MapPartitionsRDD[40] at textFile at <console>:23
scala> val clean = file.flatMap(x=>x.split("\t")).filter(x=> !(x.contains("-")))
clean: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[42] at filter at <console>:25
scala> val df=clean.toDF()
df: org.apache.spark.sql.DataFrame = [value: string]
scala> df.show
+--------------------+
| value|
+--------------------+
|time task...|
|03:27:51.199 FCPH...|
|03:27:51.199 PORT...|
|03:27:51.200 PORT...|
|03:27:51.200 PORT...|
|03:27:59.377 PORT...|
|03:27:59.377 PORT...|
|03:27:59.377 FCPH...|
|03:27:59.377 FCPH...|
|03:28:00.468 PORT...|
|03:28:00.468 PORT...|
|03:28:00.469 FCPH...|
|03:28:00.469 FCPH...|
|03:28:01.197 FCPH...|
|03:28:01.197 FCPH...|
|03:28:01.197 PORT...|
|03:28:01.198 PORT...|
|03:28:09.380 PORT...|
|03:28:09.380 PORT...|
|03:28:09.380 FCPH...|
Here I want first line as header and data should be separate by tab
data is like:
time task event port cmd args
--------------------------------------------------------------------------------------
03:27:51.199 FCPH seq 13 28 00300000,00000000,00000591,00020182,00000000
03:27:51.199 PORT Rx 11 0 c0fffffd,00fffffd,0ed10335,00000001
03:27:51.200 PORT Tx 13 40 02fffffd,00fffffd,0ed3ffff,14000000
03:27:51.200 PORT Rx 13 0 c0fffffd,00fffffd,0ed329ae,00000001
03:27:59.377 PORT Rx 15 40 02fffffd,00fffffd,0336ffff,14000000
03:27:59.377 PORT Tx 15 0 c0fffffd,00fffffd,03360ed2,00000001
03:27:59.377 FCPH read 15 40 02fffffd,00fffffd,d0000000,00000000,03360ed2
03:27:59.377 FCPH seq 15 28 22380000,03360ed2,0000052b,0000001c,00000000
03:28:00.468 PORT Rx 13 40 02fffffd,00fffffd,29afffff,14000000
03:28:00.468 PORT Tx 13 0 c0fffffd,00fffffd,29af0ed5,00000001
Upvotes: 0
Views: 930
Reputation: 409
scala> val ds = spark.read.textFile("data.txt") > spark-v2.0
(or)
val ds = spark.sparkContext.textFile("data.txt")
scala> val schemaArr = ds.filter(x=>x.contains("time")).collect.mkString.split("\t").toList
scala> val df = ds.filter(x=> !x.contains("time"))
.map(x=>{
val cols = x.split("\t")
(cols(0),cols(1),cols(2),cols(3),cols(4),cols(5))
}).toDF(schemaArr:_*)
scala> df.show(false)
+------------+----+-----+----+---+--------------------------------------------+
|time |task|event|port|cmd|args |
+------------+----+-----+----+---+--------------------------------------------+
|03:27:51.199|FCPH|seq |13 |28 |00300000,00000000,00000591,00020182,00000000|
|03:27:51.199|PORT|Rx |11 | 0 |c0fffffd,00fffffd,0ed10335,00000001 |
|03:27:51.200|PORT|Tx |13 |40 |02fffffd,00fffffd,0ed3ffff,14000000 |
|03:27:51.200|PORT|Rx |13 | 0 |c0fffffd,00fffffd,0ed329ae,00000001 |
|03:27:59.377|PORT|Rx |15 |40 |02fffffd,00fffffd,0336ffff,14000000 |
|03:27:59.377|PORT|Tx |15 | 0 |c0fffffd,00fffffd,03360ed2,00000001 |
|03:27:59.377|FCPH|read |15 |40 |02fffffd,00fffffd,d0000000,00000000,03360ed2|
|03:27:59.377|FCPH|seq |15 |28 |22380000,03360ed2,0000052b,0000001c,00000000|
|03:28:00.468|PORT|Rx |13 |40 |02fffffd,00fffffd,29afffff,14000000 |
|03:28:00.468|PORT|Tx |13 | 0 |c0fffffd,00fffffd,29af0ed5,00000001 |
+------------+----+-----+----+---+--------------------------------------------+
please try something like above and if you want schema then apply to it by using costume schema
Upvotes: 1