Reputation: 349
I want to get all data in cassandra table "user"
i have 840000 users and i don't want to get all users in python list. i want get users in packs of 100 users
in cassandra doc https://datastax.github.io/python-driver/query_paging.html i see i can use fetch_size, but in my python code i have database object that contains all cql instruction
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
class Database:
def __init__(self, name, salary):
self.cluster = Cluster(['192.168.1.1', '192.168.1.2'])
self.session = cluster.connect()
def get_users(self):
users_list = []
query = "SELECT * FROM users"
statement = SimpleStatement(query, fetch_size=10)
for user_row in session.execute(statement):
users_list.append(user_row.name)
return users_list
actually get_users return very big list of user name but i want to transform return get_users to a "generator"
i don't want get all users name in 1 list and 1 call of function get_users, but i want to have lot of call get_users and return list with only 100 users max every call function
for example : list1 = database.get_users() list2 = database.get_users() ... listn = database.get_users()
list1 contains 100 first user in query list2 contains 100 "second" users in query listn contains the latest elements in query (<=100)
is this possible ? thanks for advance for your answer
Upvotes: 1
Views: 2232
Reputation: 2982
According to Paging Large Queries:
Whenever there are no more rows in the current page, the next page will be fetched transparently.
So, if you execute your code like this, you will still the whole result set, but this is paged in a transparent manner.
In order to achieve what you need to use callbacks. You can also find some code sample on the link above.
I added below the full code for reference.
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
from threading import Event
class PagedResultHandler(object):
def __init__(self, future):
self.error = None
self.finished_event = Event()
self.future = future
self.future.add_callbacks(
callback=self.handle_page,
errback=self.handle_error)
def handle_page(self, rows):
for row in rows:
process_row(row)
if self.future.has_more_pages:
self.future.start_fetching_next_page()
else:
self.finished_event.set()
def handle_error(self, exc):
self.error = exc
self.finished_event.set()
def process_row(user_row):
print user_row.name, user_row.age, user_row.email
cluster = Cluster()
session = cluster.connect()
query = "SELECT * FROM myschema.users"
statement = SimpleStatement(query, fetch_size=5)
future = session.execute_async(statement)
handler = PagedResultHandler(future)
handler.finished_event.wait()
if handler.error:
raise handler.error
cluster.shutdown()
Moving to next page is done in handle_page
when start_fetching_next_page
is called.
If you replace the if statement with self.finished_event.set()
you will see that the iteration stops after the first 5 rows as defined in fetch_size
Upvotes: 2