leech
leech

Reputation: 289

When does a pubsub message get acked in a Dataflow pipeline?

The doc here says that

The Dataflow runner's implementation of PubsubIO automatically acknowledges messages after they are successfully processed by the first fused stage and side-effects of that processing are written to persistent storage. See the fusion documentation for more details. Messages are therefore only acknowledged when Dataflow can guarantee that there is no data loss if some component crashes or a connection is lost.

I understand that GroupByKey transforms used in the middle of a pipeline will prevent fusion, and thereby forcing a durable commit which results in the messages getting acked. But what happens if my entire pipeline only consists of 2 PTransforms - the first one being the pubsub source IO, and the next PTransform being a simple log DoFn that doesn't yield any output. In this case, does Dataflow still ack the message after the basically no op logging PTransform? Based on my testing, it does seem like the case but I would like to confirm my understanding with a more rigorous explanation.

Is it correct to say that pubsub messages also get acked at the end of the pipeline, even if the last PTransform didn't output any PCollection? (cause nothing was yielded in the logging DoFn)

Upvotes: 3

Views: 912

Answers (1)

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

I think if you are using a DoFn without yield in the output of the function and without error in the pipeline, it will still ack the messages despite that.

Your pipeline needs using a input connector and output connector (IOs) and if there are no error, the message should be acked.

The most important it's no having errors in the pipeline.

For your DoFn that executes only a log, you can also return the current element in the transformation if it's more natural for you, example with Beam Python :

def log_element(element):
    logging.info(element)

    return element


def run():
    with beam.Pipeline(options=pipeline_options) as p:
        (p | "Read" >> ReadFromPubSub("topic")
           | "Log" >> beam.Map(log_element))

In a language like Python, you can return nothing if you want but in a language strongly typed like Java, it's mandatory.

Example in Python without the return and yield and only the log :

def run():
    with beam.Pipeline(options=pipeline_options) as p:
        (p | "Read" >> ReadFromPubSub("topic")
           | "Log" >> beam.Map(logging.info))

Upvotes: 1

Related Questions