Dilip Sharma
Dilip Sharma

Reputation: 71

How to handle large in-memory data in Apache Beam Pipeline to run on Google Dataflow Runner

I'm having a simple following code. The size of the variable word_to_id in memory is ~50MB. This causing error in submitting pipeline to Dataflow Runner.

413 Request Entity Too Large

  word_to_id = {tok: idx for idx, tok in enumerate(vocab)}

  def extract_word_ids(tokens):
    return [word_to_id[w] for w in tokens if word_to_id.get(w, None)]

  with beam.pipeline.Pipeline(
    options=get_pipeline_option()) as p:
    lines = p | 'Read' >> beam.io.ReadFromText(path)

    word_ids = (
        lines
        | 'TokenizeLines' >> beam.Map(words)
        | 'IntergerizeTokens' >> beam.Map(extract_word_ids)
    )

Please provide me an alternate solution for this.

Upvotes: 1

Views: 1106

Answers (2)

Dilip Sharma
Dilip Sharma

Reputation: 71

Finally, I'm managed to solve it and it worked. I used DoFn.setup to initialize my variable from GCS bucket.

class IntergerizeTokens(beam.DoFn):
  """Beam line processing function."""

  def __init__(self, vocab_filename):
    self.vocab_filename = vocab_filename

  def setup(self):
    with tf.io.gfile.GFile(tf.io.gfile.glob(self.vocab_filename + '*')[0], 'r') as fh:
      # read from GCS bucket
    self.word_to_id = {tok: idx for idx, tok in enumerate(vocab)}
    print('Setup done!')

  def process(self, tokens):
    """Takes a lines and yield a list of (token, 1) tuples."""
    return [[self.word_to_id[w] for w in tokens if self.word_to_id.get(w, None)]]

Now pass the DoFn in ParDo

  with beam.pipeline.Pipeline(
    options=get_pipeline_option()) as p:
    lines = p | 'Read' >> beam.io.ReadFromText(path)

    word_ids = (
        lines
        | 'TokenizeLines' >> beam.Map(words)
        | 'IntergerizeTokens' >> beam.ParDo(IntergerizeTokens(vocab_temp_path))
    )

This is one way to solve it. I think DoFn.setup is good for initializing large variables in memory.

Upvotes: 2

Iñigo
Iñigo

Reputation: 2670

You can use GCS buckets as sources for both the text and the variable and use the variable as side input. You can use this side inputs as list, dict or singleton.

Here you have an example of a wordcount removing the stopwords, which are stored in a GCS bucket

with beam.Pipeline() as p:

    path = "gs://dataflow-samples/shakespeare/kinglear.txt"
    stopwords_path = "<BUCKET/stopwords>"

    output_path = "<BUCKET>"

    def split_words(text, stopwords):
            words = re.split('\W+', text)
            try:
                words.remove('')
            except:
                pass
            return [x for x in words if x.lower() not in stopwords]

    stopwords_p = (p | "Read Stop Words" >> ReadFromText(stopwords_path)
                     | FlatMap(lambda x: x.split(", "))) 

    text = p | "Read Text" >> ReadFromText(path)

    (text | "Split Words" >> FlatMap(split_words, stopwords=beam.pvalue.AsList(stopwords_p))
          | "Count" >> Count.PerElement()
          | "Write" >> WriteToText(file_path_prefix=output_path, file_name_suffix=".txt"))

Upvotes: 2

Related Questions