mrtksy
mrtksy

Reputation: 113

Google Pub/Sub Subscriber not receiving messages after a while

I have a simple python script that uses Google pubsub to detect new files in the google cloud storage. The script simply adds new messages to a queue where another thread processes those messages:

subscriber = pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_name)

subscriber.subscribe(subscription_path, callback=callback_fun)

while True:
    if not message_queue:
        time.sleep(60)
        continue
    else:
        process_next_message(message_queue.pop())

Here, callback_fun simply adds the message to the queue:

def callback_fun(message):
    message.ack()
    message_queue.append(message)

The problem I am having is that after a while (maybe a couple of days), the subscriber stops receiving new file notifications. If I stop and restart the script, it gets all of the notifications at once.

I was wondering if anyone else is having similar issues and/or can suggest ways to troubleshoot (maybe by printing debugging messages that are normally unseen). I am now trying to stop/restart the subscriber, but I am sure that this is not the best idea for using in a production environment.

I am using google-cloud 0.32.0 and google-cloud-pubsub 0.30.1.

Upvotes: 4

Views: 15768

Answers (4)

danday74
danday74

Reputation: 56936

This is not a Python specific problem. I had this problem also with Node. The problem is that essentially when the subscription errors it falls over and never receives a message again. BTW it should not do this and this is a bug.

The fix is documented on GitHub. Essentially, you have 2 or 3 options.

(1) Use grpc (whatever that is)

Here's the node code (simple enough to translate to your coding language):

const {PubSub} = require('@google-cloud/pubsub');
const grpc = require('grpc');
const pubsub = new PubSub({grpc});

This is the recommended approach with the downside that the grpc package is now deprecated (by grpc themselves not by Google). There is a package called @grpc/grpc-js that has replaced grpc but I've no idea how to use it in conjunction with @google-cloud/pubsub. Using grpc is the solution I used and I can vouch it works! My subscription keeps receiving messages now even after it errors!

OR alternatively

(2) Re-init PubSub subscription on an error

If you are facing this problem, then just re-initialise the subscription on an error:

const initSubscriber = () => {
  const pubsub = new PubSub();
  const subscription = pubsub.subscription(topic, options);
  subscription.on('message', handler.handleMessage);
  subscription.on('error', e => {
    initSubscriber();
  });
};

initSubscriber();

Whilst this approach is reported to work, when the aforementioned bug no longer exists, then this approach may cause problems / have side effects. I cannot vouch for this approach as I've never tried it. If you are desperate give it a go.

(3) Increase acknowledgement deadline

This is not so much a fix as a potential workaround for some scenarios. In my experience, the subscription errors when the message is ack'd after the acknowledgement deadline. By increasing the acknowledgement deadline, you reduce the chance of this happening and thus the subscription won't error and thus the subscription won't fall over and thus you won't have a problem to fix! Obviously, if the subscription did ever fall over then you would be in trouble and solutions (1) and (2) still apply.

(4) General tips

The advice given in @KamalAboul-Hosn answer is useful and may apply to your case. It did not help for me but may help for some.

(5) Bonus tip

In Google Cloud Platform > Pub/Sub > Subscriptions

You can see how many messages have not yet been acknowledged. If acknowledgements are not happening (the unacked message count shown in the graph does not decrease) AFTER your subscription errors THEN you know this solution is the right one for you!

enter image description here

Upvotes: 0

Fabinout
Fabinout

Reputation: 878

I got stuck 1 hour on this problem, so this is how I fixed my problem :

The GOOGLE_APPLICATION_CREDENTIALS environment variable was setup to a different service account, who wasn't on the right project

project_id = "my_project_sandbox" 

And

my_project.json (service account used by project)

{
  "type": "service_account",
  "project_id": "my_project_prod",
  "private_key_id": "---",
  "private_key": "---",
  ...
}

Upvotes: 0

Kamal Aboul-Hosn
Kamal Aboul-Hosn

Reputation: 17161

In general, there can be several reasons why a subscriber may stop receiving messages:

  1. If a subscriber does not ack or nack messages, the flow control limits can be reached, meaning no more messages can be delivered. This does not seem to be the case in your particular instance given that you immediately ack messages. As an aside, I would recommend against acking messages before your queue has processed them unless you are okay with the possibility of messages not being processed. Otherwise, if your app crashes after the ack, but before the message queue processes them, you will have not processed the message and will not get it redelivered since it was acked.
  2. If another subscriber starts up for the same subscription, it could be receiving the messages. In this scenario, one would expect the subscriber to receive a subset of the messages rather than no messages at all.
  3. Publishers just stop publishing messages and therefore there are no messages to receive. If you restart the subscriber and it starts receiving messages again, this probably isn't the case. You can also verify that a backlog is being built up by looking at the Stackdriver metric for subscription/backlog_bytes.

If your problem does not fall into one of those categories, it would be best to reach out to Google Cloud support with your project name, topic name, and subscription name so that they can narrow down the issue to either your user code, the client library, or the service.

Upvotes: 4

Rodrigo C.
Rodrigo C.

Reputation: 1184

Apart from the flow control suggestion I offered in my previous comment, you could also define a Cloud Function that gets triggered any time a new message is published in a Pub/Sub topic. These Cloud Functions act as subscriptions and will get notified every time a certain event (such as a message being published) occurs.

This tutorial will help you to develop a background Cloud Function that will get triggered when a message is published in a Pub/Sub topic.

Upvotes: 0

Related Questions