Reputation: 599
I have a DoFn
class with process
method, which takes a string and enhance it:
class LolString(apache_beam.DoFn):
def process(self, element: str) -> str:
return element + "_lol"
I want to have a step in my Beam pipeline that gives me a tuple, for example:
"Stack" -> ("Stack", "Stack_lol")
This is my pipeline step (strings
is a PCollection[str]
):
strings | "Lol string" >> apache_beam.ParDo(LolString())
However this gives me the output, as per example:
"Stack_lol"
but I want the mentioned tuple.
How can I achieve desired output WITHOUT modifying the process
method?
Upvotes: 0
Views: 642
Reputation: 6582
Solution 1 : if you can change the DoFn
Returns the Tuple
in the DoFn
def test_dofn_tuple(self):
class LolString(apache_beam.DoFn):
def process(self, element: str) -> Tuple[str, str]:
return element, f"{element}_lol"
with TestPipeline() as p:
(
p
| beam.Create(['Stack'])
| 'Add Tuple' >> ParDo(LolString())
| 'Log Result' >> beam.Map(print)
)
DoFn
class take the given str
element and returns a Tuple[str, str]
: (element, element_lol)
.Solution 2 : if you can't change the DoFn
def test_dofn_tuple(self):
class LolString(DoFn):
def process(self, element: str):
yield element + "_lol"
class LolStringTuple(DoFn):
def process(self, element: str) -> Tuple:
yield tuple(element.split('_'))
with TestPipeline() as p:
(
p
| 'Create' >> beam.Create(['Stack'])
| 'Add Suffix' >> ParDo(LolString())
| 'Create Tuple' >> ParDo(LolStringTuple())
| 'Log Result' >> beam.Map(print)
)
In the second solution, a new DoFn
was added in addition of the other.
This DoFn
splits the str
and create the expected Tuple
The same solution but with a Map
instead of DoFn
for the creation of the Tuple
:
def test_dofn_tuple(self):
class LolString(DoFn):
def process(self, element: str):
yield element + "_lol"
with TestPipeline() as p:
(
p
| 'Create' >> beam.Create(['Stack'])
| 'Add Suffix' >> ParDo(LolString())
| 'Create Tuple' >> beam.Map(lambda e: tuple(e.split('_')))
| 'Log Result' >> beam.Map(print)
)
Upvotes: 3