Reputation: 46
Trying to do something conceptually simple but banging my head against the wall.
I am trying to create a streaming dataflow job in Python which consumes JSON messages from a PubSub topic/subscription, performs some basic manipulation on each message (in this case, converting the temperature from C to F) and then publishes the record back out on a different topic:
from __future__ import absolute_import
import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window
import json
'''Normalize pubsub string to json object'''
# Lines look like this:
#{"temperature": 29.223036004820123}
def transform_temp(line):
record = json.loads(line)
record['temperature'] = record['temperature'] * 1.8 + 32
return json.dumps(record)
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--output_topic', required=True,
help=('Output PubSub topic of the form '
'"projects/<PROJECT>/topic/<TOPIC>".'))
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'--input_topic',
help=('Input PubSub topic of the form '
'"projects/<PROJECT>/topics/<TOPIC>".'))
group.add_argument(
'--input_subscription',
help=('Input PubSub subscription of the form '
'"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
# Read the pubsub topic into a PCollection.
lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
| beam.Map(transform_temp)
| beam.io.WriteStringsToPubSub(known_args.output_topic)
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
When running this code locally using the DirectRunner, everything works fine. However, in switching to the DataflowRunner, I never see any messages published on the new topic.
I've also tried adding some logging calls to the transform_temp function but don't see anything in the console logs in Dataflow.
Any suggestions? Btw - if I just plumb the input topic to the output topic, I can see the messages so I know the streaming is working ok.
Many thanks!
Upvotes: 1
Views: 1313
Reputation: 306
You might just lack a windowinto function. Apache beam's docs state that for a streaming pipeline you are required to set either a non default window or a non default trigger. As you have not defined a window, you have one global window and it might thus wait endlessly on the end of the window before going to the sink.
Upvotes: 0