ilapasle
ilapasle

Reputation: 349

python cassandra get big result of select * in generator (without storage result in ram)

I want to get all data in cassandra table "user"

i have 840000 users and i don't want to get all users in python list. i want get users in packs of 100 users

in cassandra doc https://datastax.github.io/python-driver/query_paging.html i see i can use fetch_size, but in my python code i have database object that contains all cql instruction

from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement

class Database:
   def __init__(self, name, salary):
        self.cluster = Cluster(['192.168.1.1', '192.168.1.2'])
        self.session = cluster.connect()

   def get_users(self):
        users_list = []
        query = "SELECT * FROM users"
        statement = SimpleStatement(query, fetch_size=10)
        for user_row in session.execute(statement):
            users_list.append(user_row.name)
        return users_list

actually get_users return very big list of user name but i want to transform return get_users to a "generator"

i don't want get all users name in 1 list and 1 call of function get_users, but i want to have lot of call get_users and return list with only 100 users max every call function

for example : list1 = database.get_users() list2 = database.get_users() ... listn = database.get_users()

list1 contains 100 first user in query list2 contains 100 "second" users in query listn contains the latest elements in query (<=100)

is this possible ? thanks for advance for your answer

Upvotes: 1

Views: 2232

Answers (1)

Horia
Horia

Reputation: 2982

According to Paging Large Queries:

Whenever there are no more rows in the current page, the next page will be fetched transparently.

So, if you execute your code like this, you will still the whole result set, but this is paged in a transparent manner.

In order to achieve what you need to use callbacks. You can also find some code sample on the link above.

I added below the full code for reference.

from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
from threading import Event

class PagedResultHandler(object):

    def __init__(self, future):
        self.error = None
        self.finished_event = Event()
        self.future = future
        self.future.add_callbacks(
            callback=self.handle_page,
            errback=self.handle_error)

    def handle_page(self, rows):
        for row in rows:
            process_row(row)

        if self.future.has_more_pages:
            self.future.start_fetching_next_page()
        else:
            self.finished_event.set()
    def handle_error(self, exc):
        self.error = exc
        self.finished_event.set()

def process_row(user_row):
    print user_row.name, user_row.age, user_row.email

cluster = Cluster()
session = cluster.connect()

query = "SELECT * FROM myschema.users"
statement = SimpleStatement(query, fetch_size=5)

future = session.execute_async(statement)
handler = PagedResultHandler(future)
handler.finished_event.wait()
if handler.error:
    raise handler.error
cluster.shutdown()

Moving to next page is done in handle_page when start_fetching_next_page is called.

If you replace the if statement with self.finished_event.set() you will see that the iteration stops after the first 5 rows as defined in fetch_size

Upvotes: 2

Related Questions