Reputation: 471
I have set up a custom DataFlow job written in Python, that just copies an image (triggerd by a pubsub message) from one bucket to another and then sends a HTTP request.
It looks something like:
messages = pipeline | 'On Pubsub Message' >> ReadFromPubSub(topic=config['pubsub_image_topic'])
new_images = messages | 'Parse Pubsub Message' >> apache_beam.Map(parse_pubsub)
valid_images = new_images | 'Attach Metadata' >> apache_beam.ParDo(AttachMetadata())
collected_images = valid_images | 'Collect Image' >> apache_beam.Map(collect_image)
copied_images = collected_images | 'Copy To Bucket' >> apache_beam.ParDo(CopyToBucketDoFun())
data_bucket_images | 'Notify WebHook' >> apache_beam.ParDo(NotifyWebHookFn())
I run it in GCP DataFlow with arguments like:
python ./main.py \
--job_name=$JOB_NAME \
--streaming \
--enable_streaming_engine \
--runner="DataflowRunner" \
--project=$GCP_PROJECT \
--region=$REGION \
--subnetwork=$SUBNETWORK \
--staging_location=$STAGING_LOCATION \
--no_use_public_ips \
--setup_file ./setup.py \
--service_account_email "${SERVICE_ACCOUNT}" \
--requirements_file "requirements.txt" \
--pubsub_image_topic $PUBSUB_IMAGE_TOPIC \
--update
This finds the old job with that name, and stops it, copies the state, and then starts this job and restores the state. However, this causes 9 minutes of downtime while the old job is stopped and the old one is getting initialized. Is there any way to keep the old job running until the new one is ready to process?
Also, I would like this pipeline to run with minimal latency. However, this pipeline always has a minimum of 20 seconds "data freshness", even though all of the steps for a single message should take less than half a second. Is there any way to tell DataFlow to prioritize latency, and to run all of the steps for a message as soon as possible? And to run them in parallel? It seems to run each step of the pipeline in batch. Like it will read 8 messages, then it will parse all 8 messages, then it will collect all 8 images, and then copy 8 images, and then call the WebHook 8 times, and then repeat.
I feel like these features are obvious and necessary to have DataFlow run as a realtime streaming pipeline, but I can't find anything about it. What am I missing?
Upvotes: 0
Views: 285
Reputation: 75775
There is several ways to solve this
Upvotes: 1