Reputation: 500
I am trying to process a very large corpus using pyspark, however my input file is not structured "one document per line", so I can't simply load the file directly using sc.textFile
.
Instead, I am loading the file using a generator function that yield
s documents whenever a stop-sequence is encountered. I can wrap this generator using sc.parallelize
, however that will cause pyspark to load all my data into RAM all at once, which I can't afford.
Is there any way to work around this? Or will I definitely need to convert my text files?
Here is basically what I want to run:
def repaired_corpus(path):
_buffer = ""
for line in open(path):
doc_end = line.find(doc_end_pattern)
if doc_end != -1:
_buffer += line[:doc_end + len(doc_end_pattern)]
yield _buffer
_buffer = ""
else:
_buffer += line
some_state = sc.broadcast(my_state)
in_rdd = spark.sparkContext.parallelize(repaired_corpus(path))
json_docs = in_rdd.map(
lambda item: process_element(
item, some_state.value
)
).saveAsTextFile("processed_corpus.out")
Upvotes: 0
Views: 2726
Reputation: 13001
While a little old you can try using the answer here
Basically:
rdd = sc.newAPIHadoopFile(path, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
"org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text",
conf={"textinputformat.record.delimiter": doc_end_pattern}).map(lambda l:l[1])
Upvotes: 2