Reputation: 71
I'm having a simple following code. The size of the variable word_to_id
in memory is ~50MB. This causing error in submitting pipeline to Dataflow Runner.
413 Request Entity Too Large
word_to_id = {tok: idx for idx, tok in enumerate(vocab)}
def extract_word_ids(tokens):
return [word_to_id[w] for w in tokens if word_to_id.get(w, None)]
with beam.pipeline.Pipeline(
options=get_pipeline_option()) as p:
lines = p | 'Read' >> beam.io.ReadFromText(path)
word_ids = (
lines
| 'TokenizeLines' >> beam.Map(words)
| 'IntergerizeTokens' >> beam.Map(extract_word_ids)
)
Please provide me an alternate solution for this.
Upvotes: 1
Views: 1106
Reputation: 71
Finally, I'm managed to solve it and it worked. I used DoFn.setup
to initialize my variable from GCS bucket.
class IntergerizeTokens(beam.DoFn):
"""Beam line processing function."""
def __init__(self, vocab_filename):
self.vocab_filename = vocab_filename
def setup(self):
with tf.io.gfile.GFile(tf.io.gfile.glob(self.vocab_filename + '*')[0], 'r') as fh:
# read from GCS bucket
self.word_to_id = {tok: idx for idx, tok in enumerate(vocab)}
print('Setup done!')
def process(self, tokens):
"""Takes a lines and yield a list of (token, 1) tuples."""
return [[self.word_to_id[w] for w in tokens if self.word_to_id.get(w, None)]]
Now pass the
DoFn
inParDo
with beam.pipeline.Pipeline(
options=get_pipeline_option()) as p:
lines = p | 'Read' >> beam.io.ReadFromText(path)
word_ids = (
lines
| 'TokenizeLines' >> beam.Map(words)
| 'IntergerizeTokens' >> beam.ParDo(IntergerizeTokens(vocab_temp_path))
)
This is one way to solve it. I think DoFn.setup
is good for initializing large variables in memory.
Upvotes: 2
Reputation: 2670
You can use GCS buckets as sources for both the text and the variable and use the variable as side input
. You can use this side inputs as list, dict or singleton.
Here you have an example of a wordcount removing the stopwords, which are stored in a GCS bucket
with beam.Pipeline() as p:
path = "gs://dataflow-samples/shakespeare/kinglear.txt"
stopwords_path = "<BUCKET/stopwords>"
output_path = "<BUCKET>"
def split_words(text, stopwords):
words = re.split('\W+', text)
try:
words.remove('')
except:
pass
return [x for x in words if x.lower() not in stopwords]
stopwords_p = (p | "Read Stop Words" >> ReadFromText(stopwords_path)
| FlatMap(lambda x: x.split(", ")))
text = p | "Read Text" >> ReadFromText(path)
(text | "Split Words" >> FlatMap(split_words, stopwords=beam.pvalue.AsList(stopwords_p))
| "Count" >> Count.PerElement()
| "Write" >> WriteToText(file_path_prefix=output_path, file_name_suffix=".txt"))
Upvotes: 2