keynesiancross
keynesiancross

Reputation: 3529

MongoDB Change Stream: How to keep it alive?

I've been trying to implement a change stream that monitors a Mongo collection for new documents. While simple to setup for catching one change, I don't understand how to keep the process running indefinitely.

db = pymongo_util.get_collection("DataDB","XYZ_Collection")
stream = db.watch(full_document="updateLookup"):
document = next(stream)  # it blocks here until a change happens.  Prints the change, and program ends
print(document)

My goal is to create a 'listener' for the database. Listen for new documents, and process those new documents. I'm not sure if asyncio is the way to go, threading, or if its something basic I'm missing.

It looks like I'm not the only one to ask, but none seem to have an answer:

  1. How to actually use pymongo ChangeStreams with Flask in a non-blocking way?

  2. Watch MongoDB Change Streams in Python asynchronous

Upvotes: 2

Views: 2944

Answers (3)

daniele.lena
daniele.lena

Reputation: 31

To run continuously you can use the "with" statement on the "watch" method:

pipeline = [{"$match": {"operationType": "insert"}}]

with db[collection_name].watch(pipeline=pipeline) as stream:
    print("Listening for changes...")

    for change in stream:
        print("Processed change:", change)
        

There is an example on the pymongo documentation which also uses the "resume token": https://pymongo.readthedocs.io/en/stable/api/pymongo/change_stream.html

Upvotes: 1

Saad Aleem
Saad Aleem

Reputation: 1745

Like this:

cursor = db.collection.watch()
while True:
    document = next(cursor)
    # do stuff

If you want it to work like traditional database triggers, you will need to run this continuously. It's important to note that while you can run this as an always-running python process, processes die pretty easily. An alternative would be to run this every minute or so via a cron-job.

You will need to save the resume token of the collection in a separate collection and pass it the next time you run this process

# assuming you saved the token the last time you ran this
resume_token = db.progress_colection.findOne(
    {collectionName: 'name'}
)['resumeToken']
cursor = db.collection.watch(
    resume_after=resume_token
)

# do stuff
resume_token = cursor.resume_token

# save the token to the collection

Upvotes: 1

Alex Blex
Alex Blex

Reputation: 37048

Both referred question were about how to implement asynchronous behaviour. If you are happy with blocking reads, just loop over the iterator:

for document in stream:
    print(document)

It won't run indefinitely of course, but for some significant time. You will need to wrap it in try-except to catch cursor errors, and use resume_token to continue reading from where it aborted, but it's another story.

Upvotes: 1

Related Questions