cmy113
cmy113

Reputation: 33

How to split content of 1 text file into different PCollections using Apache Beam

I just started learning on Apache Beam using Python and stuck with this problem for a while, hope to get some help from anyone who is good in Apache Beam.

This is my problem statement:

I have a text file that looks like this:

BEGIN=burger
blue
lettuce
mayonise 
END=burger
BEGIN=fish
green
strawberry
ketchup
END=fish

May I know how could I use apache beam to split burger and fish into different PCollections so that I can perform different operations to these 2 PCollections?

Here I attach with my code snippets in Python

import apache_beam as beam
from apache_beam import Create, Map, ParDo, Filter
from apache_beam.io import ReadFromText

class SplitRow(beam.DoFn):
  def process(self,element):
    return element.splitlines()


def ExtractBurger(element):
    if element == "BEGIN=burger":
        return element

p = beam.Pipeline()
squares = (
    p 
#     | "Read From Text" >> ReadFromText("gs://abc.txt")
    | "Create dummy text file" >> Create([
          'BEGIN=burger',
          'blue',
          'lettuce',
          'mayonise',
          'END=burger',
          'BEGIN=fish',
          'green',
          'strawberry',
          'ketchup',
          'END=fish',
      ])
    | "Decode and split lines" >> ParDo(SplitRow())
    | "Extract out Burger" >> Filter(ExtractBurger)
    | Map(print)
)
p.run()

My output is this

BEGIN=burger

I am able to extract out the rows which has "BEGIN=burger" but what I really want is to extract out all the data between "BEGIN=burger" to "END=burger" into 1 PCollection and "BEGIN=fish" to "END=fish" into another PCollection, not sure if it's possible to do it as I feel like Apache Beam could only do row operations, how do I write a logic that does something like this

  1. If found BEGIN=burger
  2. Continue to loop through next rows until you find END=burger
  3. Take the whole section and write it into a PCollection

Appreciate if anyone could give some insights! Thank you!

Upvotes: 0

Views: 1169

Answers (1)

ningk
ningk

Reputation: 1383

Beam processes elements in parallel. So there is no guarantee that it will process row by row in their original order.

To achieve this, you'd have to use a state (https://beam.apache.org/blog/stateful-processing/) to record whether the current processing is in-between the BEGIN and END. And you have to make sure the parallelism is 1 for Beam and its runner (whichever runner you choose) so that it does not process elements in parallel. But that defeats the purpose of using Beam.

  • If you cannot change the file: you can just write a Python script to do this.

  • If you can change the behavior that produces the file: you can give a uuid for each row between a "BEGIN" and "END". And your file doesn't even need to contain rows in their original order. For example:

       'burger=blue',
       'burger=lettuce',
       'burger=mayonise',
       'fish=green',
       'fish=strawberry',
       'fish=ketchup',
       'burger=pickle',
       'fish=chips',
    

Then you can process all rows in parallel, parse them into {key}={value}, and group by key into a PCollection that contains everything for further transforms.

Upvotes: 1

Related Questions