Gummy
Gummy

Reputation: 46

Google Cloud Dataflow - Python Streaming JSON to PubSub - Differences between DirectRunner and DataflowRunner

Trying to do something conceptually simple but banging my head against the wall.

I am trying to create a streaming dataflow job in Python which consumes JSON messages from a PubSub topic/subscription, performs some basic manipulation on each message (in this case, converting the temperature from C to F) and then publishes the record back out on a different topic:

from __future__ import absolute_import

import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window
import json

'''Normalize pubsub string to json object'''
# Lines look like this:
#{"temperature": 29.223036004820123}


def transform_temp(line):
    record = json.loads(line)
    record['temperature'] = record['temperature'] * 1.8 + 32
    return json.dumps(record)

def run(argv=None):
  """Build and run the pipeline."""

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--output_topic', required=True,
      help=('Output PubSub topic of the form '
            '"projects/<PROJECT>/topic/<TOPIC>".'))
  group = parser.add_mutually_exclusive_group(required=True)
  group.add_argument(
      '--input_topic',
      help=('Input PubSub topic of the form '
            '"projects/<PROJECT>/topics/<TOPIC>".'))
  group.add_argument(
      '--input_subscription',
      help=('Input PubSub subscription of the form '
            '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
  known_args, pipeline_args = parser.parse_known_args(argv)

  with beam.Pipeline(argv=pipeline_args) as p:
    # Read the pubsub topic into a PCollection.
    lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
                | beam.Map(transform_temp)
                | beam.io.WriteStringsToPubSub(known_args.output_topic)
              )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

When running this code locally using the DirectRunner, everything works fine. However, in switching to the DataflowRunner, I never see any messages published on the new topic.

I've also tried adding some logging calls to the transform_temp function but don't see anything in the console logs in Dataflow.

Any suggestions? Btw - if I just plumb the input topic to the output topic, I can see the messages so I know the streaming is working ok.

Many thanks!

Upvotes: 1

Views: 1313

Answers (1)

Brecht Coghe
Brecht Coghe

Reputation: 306

You might just lack a windowinto function. Apache beam's docs state that for a streaming pipeline you are required to set either a non default window or a non default trigger. As you have not defined a window, you have one global window and it might thus wait endlessly on the end of the window before going to the sink.

Upvotes: 0

Related Questions