Jack022
Jack022

Reputation: 1257

Python - how to 'stream' data from my MongoDB collection?

I'm running a script that pushes some data to a MongoDB database. Now I'm trying to have another Python script printing the new entries on my DB each time one is added.

For example:

If the number 80 is added to the DB, the script should fetch 80 from the collection and print it to my console as soon as it's added on the database.

My actual work is running fine. The only problem is that if I remove the time.sleep() it will start printing every entry quickly.

As well, right now, instead of printing the new entry, it prints the whole collections + the new entry, instead of printing only the new one (I'm trying to do that because in the future I want my script to fetch the data and feed it later to a Python array).

  1. I can't use change_stream since my DB is not a replica set, i'm fairly new to this so i don't know much about replica sets.
  2. Could use a tailable cursor, but using a capped database wouldn't be the best choice, since i will be pushing data every 5 second, and having a "limit" (Isn't that what capped means?) would not be the best.

Any advice?

from pymongo import MongoClient
import time
import random
from pprint import pprint

client = MongoClient(port=27017)

arr = []

db = client.one

mycol = client["coll"]



while True:
    cursor = db.mycol.find()
    for document in cursor:
        print(document['num'])
    time.sleep(2)    

Upvotes: 1

Views: 3063

Answers (2)

You can use the below code to connect the mongo client and plot live graph and stream your data

first take a new file named mongo_db_connectivity.py and apply the following code

from pymongo import MongoClient

from bson.objectid import ObjectId

import pandas as pd

from darksky import forecast

import os

import uuid

import warnings

warnings.simplefilter(action='ignore', category=FutureWarning)

pd.options.mode.chained_assignment = None

import collections

import mysql.connector

import json

import numpy as np

import pytz

from pymongo.errors import InvalidName

from datetime import datetime

from dotenv import load_dotenv
import urllib.parse


class Database:

def __init__(self):
    load_dotenv((".env"))
    # load_dotenv(".env")
    print("Loading MongoDB")
    self.db_host = os.getenv('DB_HOST')
    self.db_port = int(os.getenv('DB_PORT'))
    self.db_name = os.getenv('DB_NAME')
    self.db_username = os.getenv('DB_USERNAME')
    self.db_password = os.getenv('DB_PASSWORD')

    # print("loading SQL Connectivity")

    # print("SQL Connection succesfully")

    # db_collection=os.getenv('DB_COLLECTION')
    self.database = self.connection(self.db_host, self.db_port, self.db_name, self.db_username, self.db_password)
    print("mongoDB connected succesfully")

def update_mongo(self, db_collection, data):
    try:
        # Create DataFrame
        df = pd.DataFrame(data, index=[0])
        data1 = df.to_dict(orient='records')

        print("proceeding to mongo Update")

        # print("proceeding to mongo Update")

        self.database[db_collection].insert_many(data1)

        print("mongo update completed")

        # print("mongo update completed")

        ##print(db_host, db_port, db_name, db_username, db_password)
        return True
    except Exception as error:
        print(error)
        return False

def connection(self, host, port, database_name, username, password):
    try:
        if username and password:

            # mongodb_uri= "mongodb://" + urllib.parse.quote_plus("@jete2$") + "@35.154.95.79:27017"

            mongodb_uri = 'mongodb://%s:%s@%s:%s' % (username, urllib.parse.quote_plus(password), host, port)

            client = MongoClient(mongodb_uri)
        else:
            client = MongoClient(host, port)

        # Validating if the database exists

        database_names = client.list_database_names()

        if database_name in database_names:
            return client[database_name]
        else:
            raise InvalidName('Database does not exist')
    except Exception as error:
        print('error', error)

def find(self, collection_name, condition):
    try:
        collection_names = self.database.list_collection_names()
        # print(collection_names)
        if collection_name in collection_names:
            collection = self.database[collection_name].find(condition)
            # print(collection)
            return collection
        else:
            raise InvalidName('Collection does not exist')
    except Exception as error:
        print("collection name error %s", collection_name)
        return -1

create another file named .env which you can load all your environment variables

DB_HOST ="your host"
DB_PORT =27017 
DB_NAME ="database name"
DB_USERNAME ="username"
DB_PASSWORD ="password"

Create last file names proplot.py which you can use the below code

from mongo_db_connectivity import Database
import pandas as pd
import time
from datetime import datetime
import datetime

import random
from itertools import count
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

plt.style.use('fivethirtyeight')

x_vals = []
y_vals = []
class Sample:
def __init__(self):
    self.data=Database()
    self.timepresent=str(datetime.datetime.now()) # sending timestamp also

def values(self,i):
    try:

        col=self.data.find(<collection>,{<your_condition})
        
        df=pd.DataFrame(list(col))
        data=df.reset_index()
     
        x = data['index']
        y1 = data['readings'] #polling readings you can change your column name
        

        plt.cla()

        plt.plot(x, y1, label='Label_name') 
        
        plt.legend(loc='upper left')
        plt.tight_layout()
    except Exception as error:
        print(error)

def function(self):
    ani = FuncAnimation(plt.gcf(), self.values, interval=1000)

    plt.tight_layout()
    plt.show()


def graph_final(self):
    try:
        self.function()
    except Exception as error:
        print("no graph data found, retrying")
        self.graph_final()

Sample().graph_final()

Upvotes: 0

roeen30
roeen30

Reputation: 789

You can save the creation time of documents and repeat queries for documents created since your last query:

import datetime
import time
...

last_query_time = 0
while True:
    now = datetime.datetime.now()
    cursor = db.mycol.find({'created': {'$gt': last_query_time}})
    last_query_time = now
    for document in cursor:
        print(document['num'])
    time.sleep(2)

Upvotes: 2

Related Questions