Reputation: 1855
number_items = lines | 'window' >> beam.WindowInto(window.GlobalWindows()) \
| 'CountGlobally' >> beam.combiners.Count.Globally() \
| 'print' >> beam.ParDo(PrintFn())
I tried to display that via prints and logs but I found nothing
class PrintFn(beam.DoFn):
def process(self, element):
print(element)
logging.error(element)
return [element]
Upvotes: 0
Views: 6879
Reputation: 143
For Batch, you can simply do
def print_row(element):
print element
count_pcol = (
lines
| 'Count elements' >> beam.combiners.Count.Globally()
| 'Print result' >> beam.Map(print_row)
)
beam.combiners.Count.Globally() is a PTransform that uses global combine to count all the elements of a PCollection and produce a single value.
For Streaming, counting elements is not possible because the source is an unbounded pcollection i.e. it never ends. CombineGlobally in your case will keep on waiting for the input and never produce an output.
A possible solution could be to set a window function and a non-default trigger.
I have written a simple pipeline that divides elements in fixed windows of 20 seconds and counts per key for each window. You can change window and trigger based on your requirements.
def form_pair(data):
return 1, data
def print_row(element):
print element
count_pcol = (
p
| 'Read from pub sub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Form key value pair' >> beam.Map(form_pair)
| 'Apply windowing and triggers' >>
beam.WindowInto(window.FixedWindows(20),
trigger=AfterProcessingTime(5),
accumulation_mode=AccumulationMode.DISCARDING)
| 'Count elements by key' >> beam.combiners.Count.PerKey()
| 'Print result' >> beam.Map(print_row)
)
Upvotes: 1
Reputation: 75765
I found strange to want to count elements of an unbounded collection. My first feeling is that never go after the global window, because Beam wait the end on the unbounded collection... Except is you perform a trigger.
Digging in the documentation, I found this
Set a non-default trigger. This allows the global window to emit results under other conditions, since the default windowing behavior (waiting for all data to arrive) will never occur
I'm right, with trigger, the end never occur, it's unbounded, unlimited.
Did you try to skip the window and directly count globally?
Upvotes: 0