pdowling
pdowling

Reputation: 500

Using a dynamic generator as input in pyspark

I am trying to process a very large corpus using pyspark, however my input file is not structured "one document per line", so I can't simply load the file directly using sc.textFile.

Instead, I am loading the file using a generator function that yields documents whenever a stop-sequence is encountered. I can wrap this generator using sc.parallelize, however that will cause pyspark to load all my data into RAM all at once, which I can't afford.

Is there any way to work around this? Or will I definitely need to convert my text files?

Here is basically what I want to run:

def repaired_corpus(path):
    _buffer = ""
    for line in open(path):
        doc_end = line.find(doc_end_pattern)
        if doc_end != -1:
            _buffer += line[:doc_end + len(doc_end_pattern)]
            yield _buffer
            _buffer = ""
        else:
            _buffer += line

some_state = sc.broadcast(my_state)
in_rdd = spark.sparkContext.parallelize(repaired_corpus(path))
json_docs = in_rdd.map(
    lambda item: process_element(
        item, some_state.value
    )
).saveAsTextFile("processed_corpus.out")

Upvotes: 0

Views: 2726

Answers (1)

Assaf Mendelson
Assaf Mendelson

Reputation: 13001

While a little old you can try using the answer here

Basically:

rdd = sc.newAPIHadoopFile(path, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
            "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text",
            conf={"textinputformat.record.delimiter": doc_end_pattern}).map(lambda l:l[1])

Upvotes: 2

Related Questions