Reputation: 309
My Final RDD looks like this
FinalRDD.collect()
Array[(Int, Seq[Iterable[Int]])] = Array((1,List(List(97), List(98), List(99), List(100))), (2,List(List(97, 98), List(97, 99), List(97, 101))), (3,List(List(97, 98, 99),List(99, 102, 103))))
I would like to write this RDD to a text file in the following format
('97'), ('98'), ('100')
('97', '98'), ('97', '99'), List(97, 101)
('97','98', '99'), ('97', '99', '101')
I found many websites suggesting PrintWriter class from java.io as one option to achieve this. Here is the code that I have tried.
val writer = new PrintWriter(new File(outputFName))
def writefunc(chunk : Seq[Iterable[Int]])
{
var n=chunk
print("inside write func")
for(i <- 0 until n.length)
{
writer.print("('"+n(i)+"')"+", ")
}
}
finalRDD.mapValues(list =>writefunc(list)).collect()
I ended up getting task serializble error shown below
finalRDD.mapValues(list =>writefunc(list)).collect()
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1.apply(PairRDDFunctions.scala:758)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1.apply(PairRDDFunctions.scala:757)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.mapValues(PairRDDFunctions.scala:757)
... 50 elided
Caused by: java.io.NotSerializableException: java.io.PrintWriter
Serialization stack:
- object not serializable (class: java.io.PrintWriter, value: java.io.PrintWriter@b0c0abe)
- field (class: $iw, name: writer, type: class java.io.PrintWriter)
- object (class $iw, $iw@31afbb30)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@672ca5ae)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@528ac6dd)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@b772a0e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7b11bb43)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@94c2342)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2bacf377)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@718e1924)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6d112a64)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5747e8e4)
- field (class: $line411.$read, name: $iw, type: class $iw)
- object (class $line411.$read, $line411.$read@59a0616c)
- field (class: $iw, name: $line411$read, type: class $line411.$read)
- object (class $iw, $iw@a375f8f)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@4e3978ff)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:337)
... 59 more
I'm still in the phase of learning scala. Could someone suggest me, how to write "Seq[Iterable[Int]]" object to a text file
Upvotes: 1
Views: 11894
Reputation: 906
Since you don't really want to let Spark hand saving data and collected result is expected to be small - just do finalRDD.collect()
and apply any of the solutions to print output to file like https://stackoverflow.com/a/4608061/44647 :
// taken from https://stackoverflow.com/a/4608061/44647
def printToFile(fileName: String)(op: java.io.PrintWriter => Unit) {
val p = new java.io.PrintWriter(fileName)
try { op(p) } finally { p.close() }
}
val collectedData: Seq[(Int, Seq[Iterable[Int]])] = finalRDD.collect()
val output: Seq[String] = collectedData
.map(_._2) // use only second part of tuple Seq[Iterable[Int]]
.map { seq: Seq[Iterable[Int]] =>
// render inner Iterable[Int] as String in ('1', '2', '3') format
val inner: Seq[String] = seq.map("(" + _.map(i => s"'$i'").mkString(", ") + ")")
inner.mkString(", ")
}
printToFile(outputFileName) { p => output.foreach(p.println) }
If your RDD changes schema - type of collected collection will change and you will have to adjust this code.
Test output from your example collected data (since there is no context to reconstruct RDD):
('97'), ('98'), ('99'), ('100')
('97', '98'), ('97', '99'), ('97', '101')
('97', '98', '99'), ('99', '102', '103')
UPDATE: the other answer https://stackoverflow.com/a/49074625/44647 is correct that you can generate text as RDD[String] and save file(s) somewhere via Spark rdd.saveAsTextFile(...)
. But there are several potential issues with this approach (also covered in how to make saveAsTextFile NOT split output into multiple file? ):
1) RDD with multiple partitions will generate multiple files (you have to do something like rdd.repartition(1)
to at least ensure one file with data is generated)
2) File names are mangled (the path parameter is treated as a directory name) and a bunch of temp junk is generated too. In the example below RDD got split into 4 files part-00000...part-00003 because RDD had 4 partitions - illustrates 1) + 2):
scala> sc.parallelize(collectedData, 4).map(x => x._2.map("("+_.mkString(", ")+")").mkString(", ")).saveAsTextFile("/Users/igork/testdata/test6")
ls -al ~/testdata/test6
total 64
drwxr-xr-x 12 igork staff 408 Mar 2 11:40 .
drwxr-xr-x 10 igork staff 340 Mar 2 11:40 ..
-rw-r--r-- 1 igork staff 8 Mar 2 11:40 ._SUCCESS.crc
-rw-r--r-- 1 igork staff 8 Mar 2 11:40 .part-00000.crc
-rw-r--r-- 1 igork staff 12 Mar 2 11:40 .part-00001.crc
-rw-r--r-- 1 igork staff 12 Mar 2 11:40 .part-00002.crc
-rw-r--r-- 1 igork staff 12 Mar 2 11:40 .part-00003.crc
-rw-r--r-- 1 igork staff 0 Mar 2 11:40 _SUCCESS
-rw-r--r-- 1 igork staff 0 Mar 2 11:40 part-00000
-rw-r--r-- 1 igork staff 24 Mar 2 11:40 part-00001
-rw-r--r-- 1 igork staff 30 Mar 2 11:40 part-00002
-rw-r--r-- 1 igork staff 29 Mar 2 11:40 part-00003
3) When you run on Spark cluster with multiple nodes (specifically when worker and driver are on different hosts) if given local path it will generate files on local filesystems of worker nodes (and can disperse part-0000* files between different worker nodes). Example run on Google Dataproc with 4 worker hosts is provided below. To overcome this you will want to use a real distributed filesystem like HDFS or blob storage like S3 or GCS and get generated files from there. Otherwise, it's up to you to retrieve multiple files from worker nodes.
Test job had main()
with code:
val collectedData: Seq[(Int, Seq[Seq[Int]])] =
Array((1, List(List(97), List(98), List(99), List(100))),
(2,List(List(97, 98), List(97, 99), List(97, 101))),
(3,List(List(97, 98, 99),List(99, 102, 103))))
val rdd = sc.parallelize(collectedData, 4)
val uniqueSuffix = UUID.randomUUID()
// expected to run on Spark executors
rdd.saveAsTextFile(s"file:///tmp/just-testing/$uniqueSuffix/test3")
// expected to run on Spark driver and find NO files
println("Files on driver:")
val driverHostName = InetAddress.getLocalHost.getHostName
Files.walk(Paths.get(s"/tmp/just-testing/$uniqueSuffix/test3"))
.toArray.map(driverHostName + " : " + _).foreach(println)
// just a *hack* to list files on every executor and get output to the driver
// PLEASE DON'T DO THAT IN PRODUCTION CODE
val outputRDD = rdd.mapPartitions[String] { _ =>
val hostName = InetAddress.getLocalHost.getHostName
Seq(Files.walk(Paths.get(s"/tmp/just-testing/$uniqueSuffix/test3"))
.toArray.map(hostName + " : " + _).mkString("\n")).toIterator
}
// expected to list files as was seen on executor nodes - multiple files should be present
println("Files on executors:")
outputRDD.collect().foreach(println)
Note how files are split between different hosts and driver dp-igork-test-m
has no useful files at all because they are on worker nodes dp-igork-test-w-*
. The output of test job (changed hostnames a bit for anonymity):
18/03/02 20:54:00 INFO org.spark_project.jetty.util.log: Logging initialized @1950ms
18/03/02 20:54:00 INFO org.spark_project.jetty.server.Server: jetty-9.2.z-SNAPSHOT
18/03/02 20:54:00 INFO org.spark_project.jetty.server.ServerConnector: Started ServerConnector@772485dd{HTTP/1.1}{0.0.0.0:4172}
18/03/02 20:54:00 INFO org.spark_project.jetty.server.Server: Started @2094ms
18/03/02 20:54:00 INFO com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.3-hadoop2
18/03/02 20:54:01 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at dp-igork-test-m/10.142.0.2:8032
18/03/02 20:54:03 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1520023415468_0003
18/03/02 20:54:07 WARN org.apache.spark.SparkContext: Use an existing SparkContext, some configuration may not take effect.
Files on driver:
dp-igork-test-m : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3
dp-igork-test-m : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/._SUCCESS.crc
dp-igork-test-m : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_SUCCESS
Files on executors:
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000003_3
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000002_2
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00002
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00003
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00003.crc
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00002.crc
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00001.crc
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00000
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000001_1
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000000_0
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00001
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00000.crc
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000003_3
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000002_2
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00002
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00003
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00003.crc
dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00002.crc
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00001.crc
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00000
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000001_1
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000000_0
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00001
dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00000.crc
18/03/02 20:54:12 INFO org.spark_project.jetty.server.ServerConnector: Stopped ServerConnector@772485dd{HTTP/1.1}{0.0.0.0:4172}
Upvotes: 1
Reputation: 41957
Neither you need to collect
the rdd
nor you need PrintWriter
apis.
A simple combinations of map
and mkString
functions should do the trick for you and finally just use saveAsTextFile
api to save the rdd to text file.
finalRDD.map(x => x._2.map("("+_.mkString(", ")+")").mkString(", ")).saveAsTextFile("path to output text file")
You should have your text file with following text lines
(97), (98), (99), (100)
(97, 98), (97, 99), (97, 101)
(97, 98, 99), (99, 102, 103)
Upvotes: 2