Reputation: 1858
I am confused about the GCP Pub/Sub REST API.
Background: I am trying to write an application using GCP Pub/Sub where the language doesn't exit as a client library (I'm trying to use R).
Therefore, I will need to rely upon the REST API provided: https://cloud.google.com/pubsub/docs/reference/rest
Based on my understanding of the REST API, we would have to use the pull subscription: https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull
Question: My confusion is that this is a POST request
POST https://pubsub.googleapis.com/v1/{subscription}:pull
This POST request has a response:
{
"receivedMessages": [
{
object (ReceivedMessage)
}
]
}
How could I receive a response from a POST request? This doesn't make sense to me.
My goal would be to subscribe to a Pub/Sub subscription for messages, similar to the Python library here:
To subscribe to data in Cloud Pub/Sub, you create a subscription based on the topic, and subscribe to that, passing a callback function.
import os
from google.cloud import pubsub_v1
topic_name = 'projects/{project_id}/topics/{topic}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
topic='MY_TOPIC_NAME', # Set this to something appropriate.
)
subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
sub='MY_SUBSCRIPTION_NAME', # Set this to something appropriate.
)
def callback(message):
print(message.data)
message.ack()
with pubsub_v1.SubscriberClient() as subscriber:
subscriber.create_subscription(
name=subscription_name, topic=topic_name)
future = subscriber.subscribe(subscription_name, callback)
try:
future.result()
except KeyboardInterrupt:
future.cancel()
Upvotes: 3
Views: 16653
Reputation: 17161
The Cloud Pub/Sub client libraries use streaming pull to receive messages instead of pull. It also abstracts the user away from these actual requests. The client library itself receives the list of messages and then calls a user-provided callback.
If you use a pull request directly, then you'd want to iterate over each message in the response and call what would be your user callback on each message. There is an example of this in the Cloud Pub/Sub documentation in all supported languages such as Python:
from google.api_core import retry
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
NUM_MESSAGES = 3
# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
# The subscriber pulls a specific number of messages. The actual
# number of messages pulled may be smaller than max_messages.
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
retry=retry.Retry(deadline=300),
)
ack_ids = []
for received_message in response.received_messages:
print(f"Received: {received_message.message.data}.")
ack_ids.append(received_message.ack_id)
# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": ack_ids}
)
print(
f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)
Upvotes: 3