user2994884
user2994884

Reputation: 163

Move Google Pub/Sub Messages Between Topics

How can I bulk move messages from one topic to another in GCP Pub/Sub?

I am aware of the Dataflow templates that provide this, however unfortunately restrictions do not allow me to use Dataflow API.

Any suggestions on ad-hoc movement of messages between topics (besides one-by-one copy and pasting?)

Specifically, the use case is for moving messages in a deadletter topic back into the original topic for reprocessing.

Upvotes: 4

Views: 3985

Answers (3)

Paşa Yazici
Paşa Yazici

Reputation: 1

  • Create a new push subscription under the source topic.
  • Specify an incorrect endpoint as the push endpoint.
  • Select the topic you want to move as the dead letter topic(destination topic) of this subscription.
  • After 5 attempts, you will get an error and the dead letter topic (destination topic) will be redirected.
  • Now you can use Replay message or Snapshot to push subscription in source topic

Upvotes: 0

rmesteves
rmesteves

Reputation: 4075

I suggest that you use a Python script for that. You can use the PubSub CLI to read the messages and publish to another topic like below:

from google.cloud import pubsub
from google.cloud.pubsub import types

# Defining parameters
PROJECT = "<your_project_id>" 
SUBSCRIPTION = "<your_current_subscription_name>"
NEW_TOPIC = "projects/<your_project_id>/topics/<your_new_topic_name>"


# Creating clients for publishing and subscribing. Adjust the max_messages for your purpose
subscriber = pubsub.SubscriberClient()
publisher = pubsub.PublisherClient(
    batch_settings=types.BatchSettings(max_messages=500),
)

# Get your messages. Adjust the max_messages for your purpose
subscription_path = subscriber.subscription_path(PROJECT, SUBSCRIPTION)
response = subscriber.pull(subscription_path, max_messages=500)

# Publish your messages to the new topic
for msg in response.received_messages:
    publisher.publish(NEW_TOPIC, msg.message.data)


# Ack the old subscription if necessary
ack_ids = [msg.ack_id for msg in response.received_messages]
subscriber.acknowledge(subscription_path, ack_ids)

Before running this code you will need to install the PubSub CLI in your Python environment. You can do that running pip install google-cloud-pubsub

An approach to execute your code is using Cloud Functions. If you decide to use it, pay attention in two points:

  1. The maximum time that you function can take to run is 9 minutes. If this timeout get exceeded, your function will terminate without finishing the job.

  2. In Cloud Functions you can just put google-cloud-pubsub in a new line of your requirements file instead of running a pip command.

Upvotes: 3

guillaume blaquiere
guillaume blaquiere

Reputation: 75715

You can't use snapshots, because snapshots can be applied only on subscriptions of the same topics (to avoid message ID overlapping).

The easiest way is to write a function that pull your subscription. Here, how I will do it:

The global process:

  • Publish a message in the transfer-topic
  • The message trigger the function/cloud run with a push HTTP
  • The process pull the messages and republish them into the initial topic
  • If the timeout is reached, the function crash and PubSub perform a retry of the HTTP request (according with an exponential backoff).
  • If all the message are processed, the HTTP 200 response code is returned and the process stopped (and the message into the transfer-topic subscription is acked)

this process allow you to process a very large amount of message without being worried about the timeout.

Upvotes: 2

Related Questions