Reputation: 33
I just started learning on Apache Beam using Python and stuck with this problem for a while, hope to get some help from anyone who is good in Apache Beam.
This is my problem statement:
I have a text file that looks like this:
BEGIN=burger
blue
lettuce
mayonise
END=burger
BEGIN=fish
green
strawberry
ketchup
END=fish
May I know how could I use apache beam to split burger and fish into different PCollections so that I can perform different operations to these 2 PCollections?
Here I attach with my code snippets in Python
import apache_beam as beam
from apache_beam import Create, Map, ParDo, Filter
from apache_beam.io import ReadFromText
class SplitRow(beam.DoFn):
def process(self,element):
return element.splitlines()
def ExtractBurger(element):
if element == "BEGIN=burger":
return element
p = beam.Pipeline()
squares = (
p
# | "Read From Text" >> ReadFromText("gs://abc.txt")
| "Create dummy text file" >> Create([
'BEGIN=burger',
'blue',
'lettuce',
'mayonise',
'END=burger',
'BEGIN=fish',
'green',
'strawberry',
'ketchup',
'END=fish',
])
| "Decode and split lines" >> ParDo(SplitRow())
| "Extract out Burger" >> Filter(ExtractBurger)
| Map(print)
)
p.run()
My output is this
BEGIN=burger
I am able to extract out the rows which has "BEGIN=burger" but what I really want is to extract out all the data between "BEGIN=burger" to "END=burger" into 1 PCollection and "BEGIN=fish" to "END=fish" into another PCollection, not sure if it's possible to do it as I feel like Apache Beam could only do row operations, how do I write a logic that does something like this
Appreciate if anyone could give some insights! Thank you!
Upvotes: 0
Views: 1169
Reputation: 1383
Beam processes elements in parallel. So there is no guarantee that it will process row by row in their original order.
To achieve this, you'd have to use a state (https://beam.apache.org/blog/stateful-processing/) to record whether the current processing is in-between the BEGIN and END. And you have to make sure the parallelism is 1 for Beam and its runner (whichever runner you choose) so that it does not process elements in parallel. But that defeats the purpose of using Beam.
If you cannot change the file: you can just write a Python script to do this.
If you can change the behavior that produces the file: you can give a uuid for each row between a "BEGIN" and "END". And your file doesn't even need to contain rows in their original order. For example:
'burger=blue',
'burger=lettuce',
'burger=mayonise',
'fish=green',
'fish=strawberry',
'fish=ketchup',
'burger=pickle',
'fish=chips',
Then you can process all rows in parallel, parse them into {key}={value}
, and group by key into a PCollection that contains everything for further transforms.
Upvotes: 1