Pradnya Shinde
Pradnya Shinde

Reputation: 31

Google cloud pubsub python synchronous pull

I have one topic and one subscription with multiple subscribers. My application scenario is I want to process messages on different subscribers with specific number of messages to be processed at a time. Means at first suppose 8 messages are processing then if one message processing done then after acknowledging processed message next message should take from the topic while taking care of no duplicate message to be found on any subscriber and every time 8 message should processed in the background.

For this I have use synchronous pull method with max_messages = 8 but next pulling is done after all messages process completed. So we have created own scheduler where at same time 8 process should be running at background and pulling 1 message at a time but still after all 8 message processing completed next message is delivered.

Here is my code:

    #!/usr/bin/env python3

    import logging
    import multiprocessing
    import time
    import sys
    import random
    from google.cloud import pubsub_v1

    project_id = 'xyz'
    subscription_name = 'abc'

    NUM_MESSAGES = 4
    ACK_DEADLINE = 50
    SLEEP_TIME = 20

    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)

    def worker(msg):
        logger.info("Received message:{}".format(msg.message.data))
        random_sleep = random.randint(200,800)
        logger.info("Received message:{} for {} sec".format(msg.message.data, random_sleep))
        time.sleep(random_sleep)

    def message_puller():
        subscriber = pubsub_v1.SubscriberClient()
        subscription_path = subscriber.subscription_path(project_id, subscription_name)
        while(True):
            try:
                response = subscriber.pull(subscription_path, max_messages=1)
                message = response.received_messages[0]
                msg = message
                ack_id = message.ack_id
                process = multiprocessing.Process(target=worker, args=(message,))
                process.start()
                while process.is_alive():
                    # `ack_deadline_seconds` must be between 10 to 600.
                    subscriber.modify_ack_deadline(subscription_path,[ack_id],ack_deadline_seconds=ACK_DEADLINE)
                    time.sleep(SLEEP_TIME)
                # Final ack.
                subscriber.acknowledge(subscription_path, [ack_id])
                logger.info("Acknowledging message: {}".format(msg.message.data))
    except Exception as e:
        print (e)
        continue

    def synchronous_pull():
        p = []
        for i in range(0,NUM_MESSAGES):
            p.append(multiprocessing.Process(target=message_puller))

        for i in range(0,NUM_MESSAGES):
            p[i].start()

        for i in range(0,NUM_MESSAGES):
            p[i].join()

    if __name__ == '__main__':
        synchronous_pull()

Also for sometime subscriber.pull not pulling any messages even the while loop is always True. It gives me error as list index (0) out of range Concluding that subscriber.pull not pulling in message even messages are on the topic but after sometime it starts pulling. Why it is so?

I have tried with asynchronous pulling and flow control but duplicate message are found on multiple subscriber. If any other method will resolve my issue then let mi know. Thanks in advance.

Upvotes: 2

Views: 3919

Answers (1)

saintlyzero
saintlyzero

Reputation: 1842

Google Cloud PubSub ensures At least Once (docs). Which means, the messages may be delivered more than once. To tackle this, you need to make your program/system idempotent

You have multiple subscribers pulling 8 messages each.
To avoid the same message getting processed by multiple subscribers, acknowledge the message as soon as any subscriber pulls that message and proceeds further for processing rather than acknowledging it at the end, after the entire processing of the message.

Also, instead of running your main script continuously, use sleep for some constant time when there are no messages in the queue.

I had a similar code, where I used synchronous pull except I did not use parallel processing.

Here's the code:

PubSubHandler - Class to handle Pubsub related operations

from google.cloud import pubsub_v1
from google.api_core.exceptions import DeadlineExceeded


class PubSubHandler:

    def __init__(self, subscriber_config):

        self.project_name = subscriber_config['PROJECT_NAME']
        self.subscriber_name = subscriber_config['SUBSCRIBER_NAME']

        self.subscriber = pubsub_v1.SubscriberClient()
        self.subscriber_path = self.subscriber.subscription_path(self.project_name,self.subscriber_name)


    def pull_messages(self,number_of_messages):

        try:
            response = self.subscriber.pull(self.subscriber_path, max_messages = number_of_messages)
            received_messages = response.received_messages
        except DeadlineExceeded as e:
            received_messages = []
            print('No messages caused error')
        return received_messages


    def ack_messages(self,message_ids):

        if len(message_ids) > 0:
            self.subscriber.acknowledge(self.subscriber_path, message_ids)
            return True

Utils - Class for util methods

import json

class Utils:


    def __init__(self):
        pass


    def decoded_data_to_json(self,decoded_data):
        try:
            decoded_data = decoded_data.replace("'", '"')
            json_data = json.loads(decoded_data)
            return json_data
        except Exception as e:
            raise Exception('error while parsing json')


    def raw_data_to_utf(self,raw_data):
        try:
            decoded_data = raw_data.decode('utf8')
            return decoded_data
        except Exception as e:
            raise Exception('error converting to UTF')

Orcestrator - Main script


import time
import json
import logging

from utils import Utils
from db_connection import DbHandler
from pub_sub_handler import PubSubHandler

class Orcestrator:

    def __init__(self):

        self.MAX_NUM_MESSAGES = 2
        self.SLEEP_TIME = 10
        self.util_methods = Utils()
        self.pub_sub_handler = PubSubHandler(subscriber_config)


    def main_handler(self):
        to_ack_ids = []
        pulled_messages = self.pub_sub_handler.pull_messages(self.MAX_NUM_MESSAGES)

        if len(pulled_messages) < 1:
            self.SLEEP_TIME = 1
            print('no messages in queue')
            return

        logging.info('messages in queue')
        self.SLEEP_TIME = 10

        for message in pulled_messages:
            raw_data = message.message.data
            try: 
                decoded_data = self.util_methods.raw_data_to_utf(raw_data)  
                json_data = self.util_methods.decoded_data_to_json(decoded_data)
                print(json_data)

            except Exception as e:
                logging.error(e)
            to_ack_ids.append(message.ack_id)

        if self.pub_sub_handler.ack_messages(to_ack_ids):
            print('acknowledged msg_ids')


if __name__ == "__main__":

    orecestrator = Orcestrator()
    print('Receiving data..')
    while True:
        orecestrator.main_handler()
        time.sleep(orecestrator.SLEEP_TIME)

Upvotes: 5

Related Questions