Reputation: 289
The doc here says that
The Dataflow runner's implementation of PubsubIO automatically acknowledges messages after they are successfully processed by the first fused stage and side-effects of that processing are written to persistent storage. See the fusion documentation for more details. Messages are therefore only acknowledged when Dataflow can guarantee that there is no data loss if some component crashes or a connection is lost.
I understand that GroupByKey transforms used in the middle of a pipeline will prevent fusion, and thereby forcing a durable commit which results in the messages getting acked. But what happens if my entire pipeline only consists of 2 PTransforms - the first one being the pubsub source IO, and the next PTransform being a simple log DoFn that doesn't yield any output. In this case, does Dataflow still ack the message after the basically no op logging PTransform? Based on my testing, it does seem like the case but I would like to confirm my understanding with a more rigorous explanation.
Is it correct to say that pubsub messages also get acked at the end of the pipeline, even if the last PTransform didn't output any PCollection? (cause nothing was yielded in the logging DoFn)
Upvotes: 3
Views: 912
Reputation: 6572
I think if you are using a DoFn
without yield
in the output of the function and without error in the pipeline, it will still ack the messages despite that.
Your pipeline needs using a input connector and output connector (IOs) and if there are no error, the message should be acked.
The most important it's no having errors in the pipeline.
For your DoFn
that executes only a log, you can also return the current element in the transformation if it's more natural for you, example with Beam
Python
:
def log_element(element):
logging.info(element)
return element
def run():
with beam.Pipeline(options=pipeline_options) as p:
(p | "Read" >> ReadFromPubSub("topic")
| "Log" >> beam.Map(log_element))
In a language like Python
, you can return nothing if you want but in a language strongly typed like Java
, it's mandatory.
Example in Python
without the return
and yield
and only the log :
def run():
with beam.Pipeline(options=pipeline_options) as p:
(p | "Read" >> ReadFromPubSub("topic")
| "Log" >> beam.Map(logging.info))
Upvotes: 1