Eswar
Eswar

Reputation: 309

Scala: Write RDD to .txt file

My Final RDD looks like this

FinalRDD.collect()

Array[(Int, Seq[Iterable[Int]])] = Array((1,List(List(97), List(98), List(99), List(100))), (2,List(List(97, 98), List(97, 99), List(97, 101))), (3,List(List(97, 98, 99),List(99, 102, 103))))

I would like to write this RDD to a text file in the following format

('97'), ('98'), ('100')

('97', '98'), ('97', '99'), List(97, 101)

('97','98', '99'), ('97', '99', '101')

I found many websites suggesting PrintWriter class from java.io as one option to achieve this. Here is the code that I have tried.

val writer = new PrintWriter(new File(outputFName))

def writefunc(chunk : Seq[Iterable[Int]])
{
  var n=chunk
  print("inside write func")
  for(i <- 0 until n.length)
  {
    writer.print("('"+n(i)+"')"+", ")

  }
 }

finalRDD.mapValues(list =>writefunc(list)).collect()

I ended up getting task serializble error shown below

finalRDD.mapValues(list =>writefunc(list)).collect()
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1.apply(PairRDDFunctions.scala:758)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1.apply(PairRDDFunctions.scala:757)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.mapValues(PairRDDFunctions.scala:757)
... 50 elided
Caused by: java.io.NotSerializableException: java.io.PrintWriter
Serialization stack:
- object not serializable (class: java.io.PrintWriter, value:   java.io.PrintWriter@b0c0abe)
- field (class: $iw, name: writer, type: class java.io.PrintWriter)
- object (class $iw, $iw@31afbb30)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@672ca5ae)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@528ac6dd)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@b772a0e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7b11bb43)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@94c2342)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2bacf377)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@718e1924)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6d112a64)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5747e8e4)
- field (class: $line411.$read, name: $iw, type: class $iw)
- object (class $line411.$read, $line411.$read@59a0616c)
- field (class: $iw, name: $line411$read, type: class $line411.$read)
- object (class $iw, $iw@a375f8f)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@4e3978ff)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:337)
... 59 more

I'm still in the phase of learning scala. Could someone suggest me, how to write "Seq[Iterable[Int]]" object to a text file

Upvotes: 1

Views: 11894

Answers (2)

IgorK
IgorK

Reputation: 906

Since you don't really want to let Spark hand saving data and collected result is expected to be small - just do finalRDD.collect() and apply any of the solutions to print output to file like https://stackoverflow.com/a/4608061/44647 :

// taken from https://stackoverflow.com/a/4608061/44647
def printToFile(fileName: String)(op: java.io.PrintWriter => Unit) {
  val p = new java.io.PrintWriter(fileName)
  try { op(p) } finally { p.close() }
}

val collectedData: Seq[(Int, Seq[Iterable[Int]])] = finalRDD.collect()
val output: Seq[String] = collectedData
  .map(_._2) // use only second part of tuple Seq[Iterable[Int]]
  .map { seq: Seq[Iterable[Int]] =>
     // render inner Iterable[Int] as String in ('1', '2', '3') format
     val inner: Seq[String] = seq.map("(" + _.map(i => s"'$i'").mkString(", ") + ")")
     inner.mkString(", ")
  }

printToFile(outputFileName) { p => output.foreach(p.println) }

If your RDD changes schema - type of collected collection will change and you will have to adjust this code.

Test output from your example collected data (since there is no context to reconstruct RDD):

('97'), ('98'), ('99'), ('100')
('97', '98'), ('97', '99'), ('97', '101')
('97', '98', '99'), ('99', '102', '103')

UPDATE: the other answer https://stackoverflow.com/a/49074625/44647 is correct that you can generate text as RDD[String] and save file(s) somewhere via Spark rdd.saveAsTextFile(...). But there are several potential issues with this approach (also covered in how to make saveAsTextFile NOT split output into multiple file? ):

1) RDD with multiple partitions will generate multiple files (you have to do something like rdd.repartition(1) to at least ensure one file with data is generated)

2) File names are mangled (the path parameter is treated as a directory name) and a bunch of temp junk is generated too. In the example below RDD got split into 4 files part-00000...part-00003 because RDD had 4 partitions - illustrates 1) + 2):

scala> sc.parallelize(collectedData, 4).map(x => x._2.map("("+_.mkString(", ")+")").mkString(", ")).saveAsTextFile("/Users/igork/testdata/test6")

 ls -al  ~/testdata/test6
total 64
drwxr-xr-x  12 igork  staff  408 Mar  2 11:40 .
drwxr-xr-x  10 igork  staff  340 Mar  2 11:40 ..
-rw-r--r--   1 igork  staff    8 Mar  2 11:40 ._SUCCESS.crc
-rw-r--r--   1 igork  staff    8 Mar  2 11:40 .part-00000.crc
-rw-r--r--   1 igork  staff   12 Mar  2 11:40 .part-00001.crc
-rw-r--r--   1 igork  staff   12 Mar  2 11:40 .part-00002.crc
-rw-r--r--   1 igork  staff   12 Mar  2 11:40 .part-00003.crc
-rw-r--r--   1 igork  staff    0 Mar  2 11:40 _SUCCESS
-rw-r--r--   1 igork  staff    0 Mar  2 11:40 part-00000
-rw-r--r--   1 igork  staff   24 Mar  2 11:40 part-00001
-rw-r--r--   1 igork  staff   30 Mar  2 11:40 part-00002
-rw-r--r--   1 igork  staff   29 Mar  2 11:40 part-00003

3) When you run on Spark cluster with multiple nodes (specifically when worker and driver are on different hosts) if given local path it will generate files on local filesystems of worker nodes (and can disperse part-0000* files between different worker nodes). Example run on Google Dataproc with 4 worker hosts is provided below. To overcome this you will want to use a real distributed filesystem like HDFS or blob storage like S3 or GCS and get generated files from there. Otherwise, it's up to you to retrieve multiple files from worker nodes.

Test job had main() with code:

val collectedData: Seq[(Int, Seq[Seq[Int]])] =
  Array((1, List(List(97), List(98), List(99), List(100))),
    (2,List(List(97, 98), List(97, 99), List(97, 101))),
    (3,List(List(97, 98, 99),List(99, 102, 103))))
val rdd = sc.parallelize(collectedData, 4)

val uniqueSuffix = UUID.randomUUID()

// expected to run on Spark executors
rdd.saveAsTextFile(s"file:///tmp/just-testing/$uniqueSuffix/test3")

// expected to run on Spark driver and find NO files
println("Files on driver:")
val driverHostName = InetAddress.getLocalHost.getHostName
Files.walk(Paths.get(s"/tmp/just-testing/$uniqueSuffix/test3"))
  .toArray.map(driverHostName + " : " + _).foreach(println)

// just a *hack* to list files on every executor and get output to the driver
// PLEASE DON'T DO THAT IN PRODUCTION CODE
val outputRDD = rdd.mapPartitions[String] { _ =>
  val hostName = InetAddress.getLocalHost.getHostName
  Seq(Files.walk(Paths.get(s"/tmp/just-testing/$uniqueSuffix/test3"))
    .toArray.map(hostName + " : " + _).mkString("\n")).toIterator
}

// expected to list files as was seen on executor nodes - multiple files should be present
println("Files on executors:")
outputRDD.collect().foreach(println)

Note how files are split between different hosts and driver dp-igork-test-m has no useful files at all because they are on worker nodes dp-igork-test-w-*. The output of test job (changed hostnames a bit for anonymity):

18/03/02 20:54:00 INFO org.spark_project.jetty.util.log: Logging initialized @1950ms

18/03/02 20:54:00 INFO org.spark_project.jetty.server.Server: jetty-9.2.z-SNAPSHOT

18/03/02 20:54:00 INFO org.spark_project.jetty.server.ServerConnector: Started ServerConnector@772485dd{HTTP/1.1}{0.0.0.0:4172}

18/03/02 20:54:00 INFO org.spark_project.jetty.server.Server: Started @2094ms

18/03/02 20:54:00 INFO com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.3-hadoop2

18/03/02 20:54:01 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at dp-igork-test-m/10.142.0.2:8032

18/03/02 20:54:03 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1520023415468_0003

18/03/02 20:54:07 WARN org.apache.spark.SparkContext: Use an existing SparkContext, some configuration may not take effect.

Files on driver:

dp-igork-test-m : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3

dp-igork-test-m : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/._SUCCESS.crc

dp-igork-test-m : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_SUCCESS

Files on executors:

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000003_3

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000002_2

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00002

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00003

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00003.crc

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00002.crc

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00001.crc

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00000

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000001_1

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000000_0

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00001

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00000.crc

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000003_3

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000002_2

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00002

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00003

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00003.crc

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00002.crc

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00001.crc

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00000

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000001_1

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000000_0

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00001

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00000.crc

18/03/02 20:54:12 INFO org.spark_project.jetty.server.ServerConnector: Stopped ServerConnector@772485dd{HTTP/1.1}{0.0.0.0:4172}

Upvotes: 1

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

Neither you need to collect the rdd nor you need PrintWriter apis.

A simple combinations of map and mkString functions should do the trick for you and finally just use saveAsTextFile api to save the rdd to text file.

finalRDD.map(x => x._2.map("("+_.mkString(", ")+")").mkString(", ")).saveAsTextFile("path to output text file")

You should have your text file with following text lines

(97), (98), (99), (100)
(97, 98), (97, 99), (97, 101)
(97, 98, 99), (99, 102, 103)

Upvotes: 2

Related Questions