Reputation: 1534
I'm using Apache Spark 1.0.1. I have many files delimited with UTF8 \u0001
and not with the usual new line \n
. How can I read such files in Spark? Meaning, the default delimiter of sc.textfile("hdfs:///myproject/*")
is \n
, and I want to change it to \u0001
.
Upvotes: 14
Views: 26965
Reputation: 11
If you are using spark-context, the below code helped me
sc.hadoopConfiguration.set("textinputformat.record.delimiter","delimeter")
Upvotes: 1
Reputation: 61666
Here is a ready-to-use version of Chad's and @zsxwing's answers for Scala users, which can be used this way:
sc.textFile("some/path.txt", "\u0001")
The following snippet creates an additional textFile
method implicitly attached to the SparkContext
using an implicit class
(in order to replicate SparkContext
's default textFile
method):
package com.whatever
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
object Spark {
implicit class ContextExtensions(val sc: SparkContext) extends AnyVal {
def textFile(
path: String,
delimiter: String,
maxRecordLength: String = "1000000"
): RDD[String] = {
val conf = new Configuration(sc.hadoopConfiguration)
// This configuration sets the record delimiter:
conf.set("textinputformat.record.delimiter", delimiter)
// and this one limits the size of one record:
conf.set("mapreduce.input.linerecordreader.line.maxlength", maxRecordLength)
sc.newAPIHadoopFile(
path,
classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
conf
)
.map { case (_, text) => text.toString }
}
}
}
which can be used this way:
import com.whatever.Spark.ContextExtensions
sc.textFile("some/path.txt", "\u0001")
Note the additional setting mapreduce.input.linerecordreader.line.maxlength
which limits the maximum size of a record. This comes in handy when reading from a corrupted file for which a record could be too long to fit in memory (more chances of it happening when playing with the record delimiter).
With this setting, when reading a corrupted file, an exception (java.io.IOException
- thus catchable) will be thrown rather than getting a messy out of memory which will stop the SparkContext.
Upvotes: 1
Reputation: 1760
In Spark shell, I extracted data according to Setting textinputformat.record.delimiter in spark:
$ spark-shell
...
scala> import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.LongWritable
scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text
scala> import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configuration
scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
scala> val conf = new Configuration
conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
scala> conf.set("textinputformat.record.delimiter", "\u0001")
scala> val data = sc.newAPIHadoopFile("mydata.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString)
data: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = NewHadoopRDD[0] at newAPIHadoopFile at <console>:19
sc.newAPIHadoopFile("mydata.txt", ...)
is a RDD[(LongWritable, Text)]
, where the first part of the elements is the starting character index, and the second part is the actual text delimited by "\u0001"
.
Upvotes: 7
Reputation: 2636
In python this could be achieved using:
rdd = sc.newAPIHadoopFile(YOUR_FILE, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
"org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text",
conf={"textinputformat.record.delimiter": YOUR_DELIMITER}).map(lambda l:l[1])
Upvotes: 7
Reputation: 20816
You can use textinputformat.record.delimiter
to set the delimiter for TextInputFormat
, E.g.,
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val conf = new Configuration(sc.hadoopConfiguration)
conf.set("textinputformat.record.delimiter", "X")
val input = sc.newAPIHadoopFile("file_path", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
val lines = input.map { case (_, text) => text.toString}
println(lines.collect)
For example, my input is a file containing one line aXbXcXd
. The above code will output
Array(a, b, c, d)
Upvotes: 10