Dylan Lawrence
Dylan Lawrence

Reputation: 1533

Using Hadoop InputFormat in Pyspark

I'm working on a file parser for Spark that can basically read in n lines at a time and place all of those lines as a single row in a dataframe.
I know I need to use InputFormat to try and specify that, but I cannot find a good guide to this in Python.
Is there a method for specifying a custom InputFormat in Python or do I need to create it as a scala file and then specify the jar in spark-submit?

Upvotes: 2

Views: 1717

Answers (2)

franklinsijo
franklinsijo

Reputation: 18270

You can directly use the InputFormats with Pyspark.

Quoting from the documentation,

PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both ‘new’ and ‘old’ Hadoop MapReduce APIs.

Pass the HadoopInputFormat class to any of these methods of pyspark.SparkContext as suited,

To read n lines, org.apache.hadoop.mapreduce.lib.NLineInputFormat can be used as the HadoopInputFormat class with the newAPI methods.

Upvotes: 2

OneCricketeer
OneCricketeer

Reputation: 191738

I cannot find a good guide to this in Python

In the Spark docs, under "Saving and Loading Other Hadoop Input/Output Formats", there is an Elasticsearch example + links to an HBase example.

can basically read in n lines at a time... I know I need to use InputFormat to try and specify that

There is NLineInputFormat specifically for that.


This is a rough translation of some Scala code I have from NLineInputFormat not working in Spark

def nline(n, path):
  sc = SparkContext.getOrCreate
  conf = {
    "mapreduce.input.lineinputformat.linespermap": n
  }

  hadoopIO = "org.apache.hadoop.io"
  return sc.newAPIHadoopFile(path, 
    "org.apache.hadoop.mapreduce.lib.NLineInputFormat", 
    hadoopIO + ".LongWritable", 
    hadoopIO + ".Text", 
    conf=conf).map(lambda x : x[1])  # To strip out the file-offset

n = 3
rdd = nline(n, "/file/input")

and place all of those lines as a single row in a dataframe

With NLineInputFormat, each string in the RDD is actually new-line delimited. You can rdd.map(lambda record : "\t".join(record.split('\n'))), for example to put make one line out them.

Upvotes: 1

Related Questions