Reputation: 18914
Problem: My use case is I want to receive messages from Google Cloud Pub/Sub - one message at a time using the Python Api. All the current examples mention using Async/callback option for pulling the messages from a Pub/Sub subscription. The problem with that approach is I need to keep the thread alive.
Is it possible to just receive 1 message and close the connection i.e. is there a feature where I can just set a parameter (something like a max_messages
) to 1 so that once it receives 1 message the thread terminates?
The documentation here doesn't list anything for Python Synchronous pull which seem to have num_of_messages
option for other languages like Java.
Upvotes: 1
Views: 3182
Reputation: 1916
See the following example in this link:
from google.cloud import pubsub_v1
client = pubsub_v1.SubscriberClient()
subscription = client.subscription_path('[PROJECT]', '[SUBSCRIPTION]')
max_messages = 1
response = client.pull(subscription, max_messages)
print(response)
I've tried myself and using that you get one message at a time.
If you get some error try updating pubsub library to the last version:
pip install --upgrade google-cloud-pubsub
In docs here you have more info about the pull method used in the code snippet:
The Pull method relies on a request/response model:
The application sends a request for messages. The server replies with zero or more messages and closes the connection.
Upvotes: 2
Reputation: 2158
As per the official documentation here:
...you can achieve exactly once processing of Pub/Sub message streams, as PubsubIO de-duplicates messages based on custom message identifiers or identifiers assigned by Pub/Sub.
So you should be able to use record IDs, i.e. identifiers for you messages, to allow for exactly-once processing across the boundary between Dataflow and other systems. To use record IDs, you invoke idLabel when constructing PubsubIO.Read or PubsubIO.Write transforms, passing a string value of your choice. In java this would be:
public PubsubIO.Read.Bound<T> idLabel(String idLabel)
This returns a transform that's like this one but that reads unique message IDs from the given message attribute.
Upvotes: 0