Cvitoybamp
Cvitoybamp

Reputation: 77

Scala (Spark) .txt to .csv

I've got both .txt an d .dat files with structure:

Number    Date     Time         Nns     Ans Nwe     Awe     
1   22.07.17 08:00:23   12444   427 8183    252     
2   22.07.17 08:00:24   13  312 9   278     
3   22.07.17 08:00:25   162 1877    63  273     
4   22.07.17 08:00:26   87  400 29  574     
5   22.07.17 08:00:27   72  349 82  2047        
6   22.07.17 08:00:28   79  294 63  251     
7   22.07.17 08:00:29   35  318 25  248 

I can't translate it to .csv by using Spark/Scala.

  val data = spark
      .read
      .option("header", "true")
      .option("inferSchema","true")
      .csv() /.text /.textfile 

doesn't work!

Please help.

Here s file - https://github.com/CvitoyBamp/overflow

Upvotes: 0

Views: 803

Answers (2)

SCouto
SCouto

Reputation: 7928

hope this helps, it's wotking fine for me with your A.txt test file

First, read the File as usual:

val df  = spark.read.csv("A.txt")

get the headers from the first row and zip them with index

val headers = df.first.toSeq.asInstanceOf[Seq[String]].flatMap(_.split("\\s+")).zipWithIndex

RESULT

 headers: Seq[(String, Int)] = ArrayBuffer((Number,0), (Date,1), (Time,2), (Nns,3), (Ans,4), (Nwe,5), (Awe,6))

Then foldLeft the headers retrieving the item indicated by the index (second item in each header element) and assigning it the name of the column (first item in each header element)

Also drop the undesired columns and filter the row containing the headers value

headers.foldLeft(df.withColumn("tmp", split($"_c0", "\\s+")))
    ((acc, elem) => acc.withColumn(elem._1, $"tmp".getItem(elem._2)))
       .drop("_c0", "tmp")
       .filter("Number <> 'Number'")

RESULT

+------+--------+--------+-----+----+----+----+
|Number|    Date|    Time|  Nns| Ans| Nwe| Awe|
+------+--------+--------+-----+----+----+----+
|     1|22.07.17|08:00:23|12444| 427|8183| 252|
|     2|22.07.17|08:00:24|   13| 312|   9| 278|
|     3|22.07.17|08:00:25|  162|1877|  63| 273|
|     4|22.07.17|08:00:26|   87| 400|  29| 574|
|     5|22.07.17|08:00:27|   72| 349|  82|2047|
|     6|22.07.17|08:00:28|   79| 294|  63| 251|
|     7|22.07.17|08:00:29|   35| 318|  25| 248|
|     8|22.07.17|08:00:30|   10| 629|  12| 391|
|     9|22.07.17|08:00:31|   58| 511|  67| 525|
|    10|22.07.17|08:00:32|   72| 234|  29| 345|
|    11|22.07.17|08:00:33|  277|1181|  38| 250|
|    12|22.07.17|08:00:34|   40| 268|  31| 292|
|    13|22.07.17|08:00:35|   16| 523|  10| 368|
|    14|22.07.17|08:00:36|  319|1329| 143| 703|
|    15|22.07.17|08:00:37|  164| 311| 124| 352|
|    16|22.07.17|08:00:38|   62| 320| 116| 272|
|    17|22.07.17|08:00:39|  223| 356| 217|1630|
|    18|22.07.17|08:00:40|   50|1659|  94|1611|
|    19|22.07.17|08:00:41|   34| 431|  26| 587|
|    20|22.07.17|08:00:42|    0|   0|   5| 277|
+------+--------+--------+-----+----+----+----+
only showing top 20 rows

Also, a solution close to the one from the other answer

You can load your data as a String Dataset

  val stringDF = spark.read.textFile("Downloads/A.txt").map(_.replaceAll("\\s+", " "))

And then

val data = spark
  .read
  .option("header", true)
  .option("sep", " ")
  .option("inferSchema", true)
  .csv(cleaned)
  .drop("_c7")

RESULT

+------+--------+--------+-----+----+----+----+
|Number|    Date|    Time|  Nns| Ans| Nwe| Awe|
+------+--------+--------+-----+----+----+----+
|     1|22.07.17|08:00:23|12444| 427|8183| 252|
|     2|22.07.17|08:00:24|   13| 312|   9| 278|
|     3|22.07.17|08:00:25|  162|1877|  63| 273|
|     4|22.07.17|08:00:26|   87| 400|  29| 574|
|     5|22.07.17|08:00:27|   72| 349|  82|2047|
|     6|22.07.17|08:00:28|   79| 294|  63| 251|
|     7|22.07.17|08:00:29|   35| 318|  25| 248|
|     8|22.07.17|08:00:30|   10| 629|  12| 391|
|     9|22.07.17|08:00:31|   58| 511|  67| 525|
|    10|22.07.17|08:00:32|   72| 234|  29| 345|
|    11|22.07.17|08:00:33|  277|1181|  38| 250|
|    12|22.07.17|08:00:34|   40| 268|  31| 292|
|    13|22.07.17|08:00:35|   16| 523|  10| 368|
|    14|22.07.17|08:00:36|  319|1329| 143| 703|
|    15|22.07.17|08:00:37|  164| 311| 124| 352|
|    16|22.07.17|08:00:38|   62| 320| 116| 272|
|    17|22.07.17|08:00:39|  223| 356| 217|1630|
|    18|22.07.17|08:00:40|   50|1659|  94|1611|
|    19|22.07.17|08:00:41|   34| 431|  26| 587|
|    20|22.07.17|08:00:42|    0|   0|   5| 277|
+------+--------+--------+-----+----+----+----+
only showing top 20 rows

Upvotes: 0

Jasper-M
Jasper-M

Reputation: 15086

You could try

val text = spark.read.textFile(pathToFile)
val cleaned = text.map(_.replaceAll(" +", " ").trim)
val data = spark
  .read
  .option("header", true)
  .option("sep", " ")
  .option("inferSchema", true)
  .csv(cleaned)

It will first read the file as simple strings, line by line. Then it replaces every sequence of 1 or more spaces with exactly 1 space, and then tries to parse the result as csv with a single space as separator. One thing you have to be aware of is that when one of your fields contains a sequence of multiple spaces they will also be replaced by a single space.

Upvotes: 1

Related Questions