Reputation: 17220
I am listening to financial data published by Google Cloud Platform, Monday to Friday. I would like to save all messages to disk. I am doing this in Python.
I need to recover any missing packets if my application goes down. I understand Google will automatically resend un-ack'd messages.
The GCP documentation lists many subscription techniques available (Asynchronous/Synchronous, Push/Pull, Streaming pull etc). There is an asynchronous sample code:
def callback(message):
print(f"Received {message}.")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=5)
except TimeoutError:
streaming_pull_future.cancel()
https://cloud.google.com/pubsub/docs/pull
UPDATE for Kamal Aboul-Hosn
I think I can persist ok but my problem is I need to manually check all messages have indeed been received. To do this I enabled ordered delivery. Our message data contains a sequence number, so I wanted to add a global variable like next_expected_seq_num
. After I receive each message I will process and ack the message and increment next_expected_seq_num
.
However, if I have say 10 threads invoking the callback method, I assume any of the 10 could contain the next message? And I'd have to make my callback method smart enough to block processing on the other 9 threads whilst the 10th thread processes the next message. Something like:
(pseudo code)
def callback(msg)
{
seq_num = getSeqNum(msg.data);
while(seq_num != next_expected_seq_num); // Make atomic
// When we reach here, we have the next message
assert(db.exists(seq_num) == false);
// persist message
++next_expected_seq_num; // make atomic/cannot be earlier
msg.ack();
}
Should I just disable multiple callback threads given i'm preventing multithreading anyway?
Is there a better way to check/guarantee we process every message?
I'm wondering if we should trust GCP like TCP, enable multithreading (and just lock around the database-write)?
def callback(msg)
{
seq_num = getSeqNum(msg.data);
lock();
if(db.exists(seq_num) == false)
{
// persist message
}
unlock();
msg.ack();
}
Upvotes: 0
Views: 1631
Reputation: 17161
The callback is not thread safe if you are running in a Python environment that doesn't have a global interpreter lock. Multiple callbacks could be executed in parallel in that case and you would you have to guard any shared data structures with locks.
Since Cloud Pub/Sub has at-least-once delivery semantics, if you need to ignore duplicate messages then yes, you will need to maintain some kind of data structure with the already-received messages. Note that duplicates could be delivered across subscriber restarts. Therefore, you will probably need this to be some kind of persistent storage. Redis tends to be a popular choice for this type of deduplication.
With ordered delivery, it is guaranteed that the callback will only run for one message for an ordering key at a time. Therefore, you would not have to program expecting multiple messages to be running simultaneously for the key. Note that in general, using ordering keys to totally order all messages in the topic will only work if your throughput is no more than 1MB/s as that is the publish limit for messages with ordering keys. Also, only use ordering keys if it is important to process the messages in order.
With regard to when to use multithreading or not, it really depends on the nature of the processing. If most of the callback would need to be guarded with a lock, then multithreading won't help much. If, though, only small portions need to be guarded by locks, e.g., checking for duplicates, while most of the processing can safely be done in parallel, then multithreading could result in better performance.
If all you want to do is prevent duplicates, then you probably don't need to guard the writes to the database with a lock unless the database doesn't guarantee consistency. Also, keep in mind that the locking only helps if you have a single subscriber client.
Upvotes: 2